This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 5e52b28 SAMZA-2479: Add configurable default for min compaction lag
ms (#1305)
5e52b28 is described below
commit 5e52b281c21c1749b981c5e45bc3766ee634a413
Author: bkonold <[email protected]>
AuthorDate: Tue Mar 17 08:50:29 2020 -0700
SAMZA-2479: Add configurable default for min compaction lag ms (#1305)
---
.../versioned/jobs/configuration-table.html | 20 ++++++++++++++++++++
.../versioned/jobs/samza-configurations.md | 2 ++
.../java/org/apache/samza/config/StorageConfig.java | 13 ++++++++++++-
.../org/apache/samza/config/TestStorageConfig.java | 19 +++++++++++++------
4 files changed, 47 insertions(+), 7 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 52047f9..b588e64 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1586,6 +1586,26 @@
</tr>
<tr>
+ <td class="property"
id="store-default-changelog-min-compaction-lag-ms">stores.default.changelog.min.compaction.lag.ms</td>
+ <td class="default">14400000</td>
+ <td class="description">
+ This property defines the default minimum period that
must pass before a changelog message can be compacted.
+ Be mindful that the larger this value, the larger in
size changelog topics will be in Kafka due to un-compacted data and the longer
your application's time to restore from changelog will be.
+ This may impact your application's startup time if
host affinity is not enabled. For changelog topics which have "compact,delete"
cleanup policy, this value should be < retention.ms.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property"
id="store-changelog-min-compaction-lag-ms">stores.<span
class="store">store-name</span>.changelog.min.compaction.lag.ms</td>
+ <td
class="default">stores.default.changelog.min.compaction.lag.ms</td>
+ <td class="description">
+ This property defines the minimum period that must
pass before a message in the store's changelog can be compacted.
+ Be mindful that the larger this value, the larger in
size the changelog topic will be in Kafka due to un-compacted data and the
longer your application's time to restore from changelog will be.
+ This may impact your application's startup time if
host affinity is not enabled. For changelog topics which have "compact,delete"
cleanup policy, this value should be < retention.ms.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="regex-rewriter">
Consuming all Kafka topics matching a regular
expression<br>
<span class="subtitle">
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 1a2e04b..baf1ea8 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -264,6 +264,8 @@ These properties define Samza's storage mechanism for
efficient [stateful stream
##### <a name="advanced-storage-configurations"></a>[4.1 Advanced Storage
Configurations](#advanced-storage-configurations)
|Name|Default|Description|
|--- |--- |--- |
+|stores.default.changelog.<br>min.compaction.lag.ms|14400000|This property
defines the default minimum period that must pass before a changelog message
can be compacted. Be mindful that the larger this value, the larger in size
changelog topics will be in Kafka due to un-compacted data and the longer your
application's time to restore from changelog will be. This may impact your
application's startup time if host affinity is not enabled. For changelog
topics which have "compact,delete" c [...]
+|stores.**_store-name_**.changelog.<br>min.compaction.lag.ms|stores.default.changelog.<br>min.compaction.lag.ms|This
property defines the minimum period that must pass before a message in the
store's changelog can be compacted. Be mindful that the larger this value, the
larger in size the changelog topic will be in Kafka due to un-compacted data
and the longer your application's time to restore from changelog will be. This
may impact your application's startup time if host affinity is no [...]
|stores.default.changelog.<br>replication.factor|2|This property defines the
default number of replicas to use for the change log stream.|
|stores.**_store-name_**.changelog.<br>replication.factor|stores.default.changelog.<br>replication.factor|The
property defines the number of replicas to use for the change log stream.|
|stores.**_store-name_**.changelog.<br>kafka.topic-level-property| |The
property allows you to specify topic level settings for the changelog topic to
be created. For e.g., you can specify the clean up policy as
"stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka
documentation](http://kafka.apache.org/documentation.html#configuration) for
more topic level configurations.|
diff --git
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index b5687c8..a8b8702 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -166,6 +166,17 @@ public class StorageConfig extends MapConfig {
}
/**
+ * Gets the configured default for stores' changelog min.compaction.lag.ms,
or if not defined uses the default
+ * value defined in this class.
+ *
+ * @return the default changelog min.compaction.lag.ms
+ */
+ private long getDefaultChangelogMinCompactionLagMs() {
+ String defaultMinCompactLagConfigName = STORE_PREFIX +
"default.changelog." + MIN_COMPACTION_LAG_MS;
+ return getLong(defaultMinCompactLagConfigName,
DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+ }
+
+ /**
* Gets the side inputs for the store. A store can have multiple side input
streams which can be
* provided as a comma separated list.
*
@@ -226,7 +237,7 @@ public class StorageConfig extends MapConfig {
checkArgument(get("stores." + storeName + ".changelog.kafka." +
MIN_COMPACTION_LAG_MS) == null,
"Use " + minCompactLagConfigName + " to set kafka
min.compaction.lag.ms property.");
- return getLong(minCompactLagConfigName,
DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+ return getLong(minCompactLagConfigName,
getDefaultChangelogMinCompactionLagMs());
}
/**
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index baecf99..88fbbe0 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -20,7 +20,9 @@
package org.apache.samza.config;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -313,13 +315,18 @@ public class TestStorageConfig {
@Test
public void testGetChangelogMinCompactionLagMs() {
// empty config, return default lag ms
+ Map<String, String> configMap = new HashMap<>();
assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
- new StorageConfig(new
MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0));
+ new StorageConfig(new
MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
- long lagOverride = TimeUnit.HOURS.toMillis(6);
- StorageConfig storageConfig = new StorageConfig(
- new
MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS,
STORE_NAME0),
- String.valueOf(lagOverride))));
- assertEquals(lagOverride,
storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0));
+ // override with configured default
+ long defaultLagOverride = TimeUnit.HOURS.toMillis(8);
+ configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, "default"),
String.valueOf(defaultLagOverride));
+ assertEquals(defaultLagOverride, new StorageConfig(new
MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
+
+ // override for specific store
+ long storeSpecificLagOverride = TimeUnit.HOURS.toMillis(6);
+ configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0),
String.valueOf(storeSpecificLagOverride));
+ assertEquals(storeSpecificLagOverride, new StorageConfig(new
MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
}
}