This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e52b28  SAMZA-2479: Add configurable default for min compaction lag 
ms (#1305)
5e52b28 is described below

commit 5e52b281c21c1749b981c5e45bc3766ee634a413
Author: bkonold <[email protected]>
AuthorDate: Tue Mar 17 08:50:29 2020 -0700

    SAMZA-2479: Add configurable default for min compaction lag ms (#1305)
---
 .../versioned/jobs/configuration-table.html          | 20 ++++++++++++++++++++
 .../versioned/jobs/samza-configurations.md           |  2 ++
 .../java/org/apache/samza/config/StorageConfig.java  | 13 ++++++++++++-
 .../org/apache/samza/config/TestStorageConfig.java   | 19 +++++++++++++------
 4 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 52047f9..b588e64 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1586,6 +1586,26 @@
                 </tr>
 
                 <tr>
+                    <td class="property" 
id="store-default-changelog-min-compaction-lag-ms">stores.default.changelog.min.compaction.lag.ms</td>
+                    <td class="default">14400000</td>
+                    <td class="description">
+                        This property defines the default minimum period that 
must pass before a changelog message can be compacted.
+                        Be mindful that the larger this value, the larger in 
size changelog topics will be in Kafka due to un-compacted data and the longer 
your application's time to restore from changelog will be.
+                        This may impact your application's startup time if 
host affinity is not enabled. For changelog topics which have "compact,delete" 
cleanup policy, this value should be < retention.ms.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="store-changelog-min-compaction-lag-ms">stores.<span 
class="store">store-name</span>.changelog.min.compaction.lag.ms</td>
+                    <td 
class="default">stores.default.changelog.min.compaction.lag.ms</td>
+                    <td class="description">
+                        This property defines the minimum period that must 
pass before a message in the store's changelog can be compacted.
+                        Be mindful that the larger this value, the larger in 
size the changelog topic will be in Kafka due to un-compacted data and the 
longer your application's time to restore from changelog will be.
+                        This may impact your application's startup time if 
host affinity is not enabled. For changelog topics which have "compact,delete" 
cleanup policy, this value should be < retention.ms.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="regex-rewriter">
                         Consuming all Kafka topics matching a regular 
expression<br>
                         <span class="subtitle">
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 1a2e04b..baf1ea8 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -264,6 +264,8 @@ These properties define Samza's storage mechanism for 
efficient [stateful stream
 ##### <a name="advanced-storage-configurations"></a>[4.1 Advanced Storage 
Configurations](#advanced-storage-configurations)
 |Name|Default|Description|
 |--- |--- |--- |
+|stores.default.changelog.<br>min.compaction.lag.ms|14400000|This property 
defines the default minimum period that must pass before a changelog message 
can be compacted. Be mindful that the larger this value, the larger in size 
changelog topics will be in Kafka due to un-compacted data and the longer your 
application's time to restore from changelog will be. This may impact your 
application's startup time if host affinity is not enabled. For changelog 
topics which have "compact,delete" c [...]
+|stores.**_store-name_**.changelog.<br>min.compaction.lag.ms|stores.default.changelog.<br>min.compaction.lag.ms|This
 property defines the minimum period that must pass before a message in the 
store's changelog can be compacted. Be mindful that the larger this value, the 
larger in size the changelog topic will be in Kafka due to un-compacted data 
and the longer your application's time to restore from changelog will be. This 
may impact your application's startup time if host affinity is no [...]
 |stores.default.changelog.<br>replication.factor|2|This property defines the 
default number of replicas to use for the change log stream.|
 
|stores.**_store-name_**.changelog.<br>replication.factor|stores.default.changelog.<br>replication.factor|The
 property defines the number of replicas to use for the change log stream.|
 |stores.**_store-name_**.changelog.<br>kafka.topic-level-property| |The 
property allows you to specify topic level settings for the changelog topic to 
be created. For e.g., you can specify the clean up policy as 
"stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka 
documentation](http://kafka.apache.org/documentation.html#configuration) for 
more topic level configurations.|
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index b5687c8..a8b8702 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -166,6 +166,17 @@ public class StorageConfig extends MapConfig {
   }
 
   /**
+   * Gets the configured default for stores' changelog min.compaction.lag.ms, 
or if not defined uses the default
+   * value defined in this class.
+   *
+   * @return the default changelog min.compaction.lag.ms
+   */
+  private long getDefaultChangelogMinCompactionLagMs() {
+    String defaultMinCompactLagConfigName = STORE_PREFIX + 
"default.changelog." + MIN_COMPACTION_LAG_MS;
+    return getLong(defaultMinCompactLagConfigName, 
DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+  }
+
+  /**
    * Gets the side inputs for the store. A store can have multiple side input 
streams which can be
    * provided as a comma separated list.
    *
@@ -226,7 +237,7 @@ public class StorageConfig extends MapConfig {
     checkArgument(get("stores." + storeName + ".changelog.kafka." + 
MIN_COMPACTION_LAG_MS) == null,
         "Use " + minCompactLagConfigName + " to set kafka 
min.compaction.lag.ms property.");
 
-    return getLong(minCompactLagConfigName, 
DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+    return getLong(minCompactLagConfigName, 
getDefaultChangelogMinCompactionLagMs());
   }
 
   /**
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index baecf99..88fbbe0 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -20,7 +20,9 @@
 package org.apache.samza.config;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -313,13 +315,18 @@ public class TestStorageConfig {
   @Test
   public void testGetChangelogMinCompactionLagMs() {
     // empty config, return default lag ms
+    Map<String, String> configMap = new HashMap<>();
     assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
-        new StorageConfig(new 
MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0));
+        new StorageConfig(new 
MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
 
-    long lagOverride = TimeUnit.HOURS.toMillis(6);
-    StorageConfig storageConfig = new StorageConfig(
-        new 
MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, 
STORE_NAME0),
-            String.valueOf(lagOverride))));
-    assertEquals(lagOverride, 
storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0));
+    // override with configured default
+    long defaultLagOverride = TimeUnit.HOURS.toMillis(8);
+    configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, "default"), 
String.valueOf(defaultLagOverride));
+    assertEquals(defaultLagOverride, new StorageConfig(new 
MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
+
+    // override for specific store
+    long storeSpecificLagOverride = TimeUnit.HOURS.toMillis(6);
+    configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), 
String.valueOf(storeSpecificLagOverride));
+    assertEquals(storeSpecificLagOverride, new StorageConfig(new 
MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
   }
 }

Reply via email to