This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new e6fe169 SAMZA-2325 : Adding logic to read system config for
repl-factor when creating a topic (#1157)
e6fe169 is described below
commit e6fe169b9152fa0f6fa8884237d2649e25146ee7
Author: rmatharu <[email protected]>
AuthorDate: Wed Sep 18 15:27:44 2019 -0700
SAMZA-2325 : Adding logic to read system config for repl-factor when
creating a topic (#1157)
* Adding logic to read system config for repl-factor when creating a topic
---
.../org/apache/samza/config/StorageConfig.java | 2 +-
.../org/apache/samza/config/TestStorageConfig.java | 53 ++++++++++++++--------
.../samza/system/kafka/KafkaSystemAdmin.java | 8 ++++
.../org/apache/samza/config/KafkaConfig.scala | 38 +++++++++++-----
.../org/apache/samza/config/TestKafkaConfig.scala | 16 +++++--
5 files changed, 83 insertions(+), 34 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 7bc6cb4..86c7e7d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -148,7 +148,7 @@ public class StorageConfig extends MapConfig {
*
* @return the name of the system to use by default for all changelogs, if
defined.
*/
- public Optional<String> getChangelogSystem() {
+ private Optional<String> getChangelogSystem() {
return Optional.ofNullable(get(CHANGELOG_SYSTEM,
get(JobConfig.JOB_DEFAULT_SYSTEM)));
}
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 2cde5df..e094de2 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.samza.SamzaException;
import org.junit.Test;
+import static org.apache.samza.config.StorageConfig.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -81,6 +82,37 @@ public class TestStorageConfig {
ApplicationConfig.ApplicationMode.BATCH.name().toLowerCase(),
ApplicationConfig.APP_RUN_ID, "run-id")));
assertEquals(Optional.of("changelog-system.changelog-stream0-run-id"),
storageConfig.getChangelogStream(STORE_NAME0));
+
+ // job has no changelog stream defined
+ storageConfig = new StorageConfig(new MapConfig(
+ ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system",
JobConfig.JOB_DEFAULT_SYSTEM,
+ "should-not-be-used")));
+ assertEquals(Optional.empty(),
storageConfig.getChangelogStream(STORE_NAME0));
+
+ // job.changelog.system takes precedence over job.default.system when
changelog is specified as just streamName
+ storageConfig = new StorageConfig(new MapConfig(
+ ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system",
JobConfig.JOB_DEFAULT_SYSTEM,
+ "should-not-be-used", String.format(CHANGELOG_STREAM,
STORE_NAME0), "streamName")));
+ assertEquals("changelog-system.streamName",
storageConfig.getChangelogStream(STORE_NAME0).get());
+
+ // job.changelog.system takes precedence over job.default.system when
changelog is specified as {systemName}.{streamName}
+ storageConfig = new StorageConfig(new MapConfig(
+ ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system",
JobConfig.JOB_DEFAULT_SYSTEM,
+ "should-not-be-used", String.format(CHANGELOG_STREAM,
STORE_NAME0), "changelog-system.streamName")));
+ assertEquals("changelog-system.streamName",
storageConfig.getChangelogStream(STORE_NAME0).get());
+
+ // systemName specified using stores.{storeName}.changelog =
{systemName}.{streamName} should take precedence even
+ // when job.changelog.system and job.default.system are specified
+ storageConfig = new StorageConfig(new MapConfig(
+ ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM,
"default-changelog-system",
+ JobConfig.JOB_DEFAULT_SYSTEM, "default-system",
+ String.format(CHANGELOG_STREAM, STORE_NAME0),
"nondefault-changelog-system.streamName")));
+ assertEquals("nondefault-changelog-system.streamName",
storageConfig.getChangelogStream(STORE_NAME0).get());
+
+ // fall back to job.default.system if job.changelog.system is not specified
+ storageConfig = new StorageConfig(new MapConfig(
+ ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system",
String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName")));
+ assertEquals("default-system.streamName",
storageConfig.getChangelogStream(STORE_NAME0).get());
}
@Test(expected = SamzaException.class)
@@ -153,23 +185,6 @@ public class TestStorageConfig {
}
@Test
- public void testGetChangelogSystem() {
- // empty config, so no system
- assertEquals(Optional.empty(), new StorageConfig(new
MapConfig()).getChangelogSystem());
-
- // job.changelog.system takes precedence over job.default.system
- StorageConfig storageConfig = new StorageConfig(new MapConfig(
- ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system",
JobConfig.JOB_DEFAULT_SYSTEM,
- "should-not-be-used")));
- assertEquals(Optional.of("changelog-system"),
storageConfig.getChangelogSystem());
-
- // fall back to job.default.system if job.changelog.system is not specified
- storageConfig =
- new StorageConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system")));
- assertEquals(Optional.of("default-system"),
storageConfig.getChangelogSystem());
- }
-
- @Test
public void testGetSideInputs() {
// empty config, so no system
assertEquals(Collections.emptyList(), new StorageConfig(new
MapConfig()).getSideInputs(STORE_NAME0));
@@ -232,7 +247,7 @@ public class TestStorageConfig {
StorageConfig storageConfig = new StorageConfig(new
MapConfig(ImmutableMap.of(
// store0 has a changelog stream
String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class",
- String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0),
"system0.changelog-stream",
+ String.format(CHANGELOG_STREAM, STORE_NAME0),
"system0.changelog-stream",
// store1 does not have a changelog stream
String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class")));
assertTrue(storageConfig.isChangelogSystem("system0"));
@@ -248,7 +263,7 @@ public class TestStorageConfig {
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0),
"factory.class",
- String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0),
"system0.changelog-stream")));
+ String.format(CHANGELOG_STREAM, STORE_NAME0),
"system0.changelog-stream")));
assertTrue(storageConfig.hasDurableStores());
}
diff --git
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index c18c82d..1986bea 100644
---
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -546,6 +546,14 @@ public class KafkaSystemAdmin implements SystemAdmin {
kafkaSpec = kafkaSpec.copyWithProperties(properties);
} else {
kafkaSpec = KafkaStreamSpec.fromSpec(spec);
+
+ // we check if there is a system-level rf config specified, else we use
KafkaConfig.topic-default-rf
+ int replicationFactorFromSystemConfig = Integer.valueOf(
+ new
KafkaConfig(config).getSystemDefaultReplicationFactor(spec.getSystemName(),
+ KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
+ LOG.info("Using replication-factor: {} for StreamSpec: {}",
replicationFactorFromSystemConfig, spec);
+ return new KafkaStreamSpec(kafkaSpec.getId(),
kafkaSpec.getPhysicalName(), kafkaSpec.getSystemName(),
+ kafkaSpec.getPartitionCount(), replicationFactorFromSystemConfig,
kafkaSpec.getProperties());
}
return kafkaSpec;
}
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index b9543fb..993f0e4 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -118,7 +118,7 @@ class KafkaConfig(config: Config) extends
ScalaMapConfig(config) {
Option(replicationFactor)
}
- private def getSystemDefaultReplicationFactor(systemName: String,
defaultValue: String) = {
+ def getSystemDefaultReplicationFactor(systemName: String, defaultValue:
String) = {
val defaultReplicationFactor = new
SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR,
defaultValue)
defaultReplicationFactor
}
@@ -243,19 +243,35 @@ class KafkaConfig(config: Config) extends
ScalaMapConfig(config) {
/**
* Gets the replication factor for the changelog topics. Uses the following
precedence.
*
- * 1. If stores.myStore.changelog.replication.factor is configured, that
value is used.
- * 2. If systems.changelog-system.default.stream.replication.factor is
configured, that value is used.
- * 3. 2
- *
- * Note that the changelog-system has a similar precedence. See
[[StorageConfig]]
+ * 1. If stores.{storeName}.changelog.replication.factor is configured,
that value is used.
+ * 2. If it is not configured, the value configured for
stores.default.changelog.replication.factor is used.
+ * 3. If it is not configured, the RF value configured for the store's
changelog's system, configured using
+ * stores.{storeName}.changelog={systemName}.{streamName}, is used.
+ * 4. If it is not configured, the value for the RF of job.changelog.system
is used.
+ * 5. If it is not configured, the value for the RF of job.default.system
is used.
+ * 6. If it is not configured, the RF is chosen as 2.
*/
- def getChangelogStreamReplicationFactor(name: String) =
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format
name).getOrElse(getDefaultChangelogStreamReplicationFactor)
+ def getChangelogStreamReplicationFactor(storeName: String) = {
+ var changelogRF =
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format storeName)
+
+ if(!changelogRF.isDefined) {
+ changelogRF =
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR)
+ }
- def getDefaultChangelogStreamReplicationFactor() = {
- val changelogSystem = new
StorageConfig(config).getChangelogSystem.orElse(null)
-
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem,
"2"))
+ if(!changelogRF.isDefined) {
+ val changelogSystemStream = new
StorageConfig(config).getChangelogStream(storeName)
+ if (!changelogSystemStream.isPresent) {
+ throw new SamzaException("Cannot deduce replication factor. Changelog
system-stream not defined for store " + storeName)
+ }
+
+ val changelogSystem =
StreamUtil.getSystemStreamFromNames(changelogSystemStream.get()).getSystem
+ changelogRF =
Option.apply(getSystemDefaultReplicationFactor(changelogSystem, null))
+ }
+
+ changelogRF.getOrElse(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR)
}
+
/**
* Gets the max message bytes for the changelog topics. Uses the following
precedence.
*
@@ -268,7 +284,7 @@ class KafkaConfig(config: Config) extends
ScalaMapConfig(config) {
def getChangelogStreamMaxMessageByte(name: String) =
getOption(KafkaConfig.CHANGELOG_MAX_MESSAGE_BYTES format name) match {
case Some(maxMessageBytes) => maxMessageBytes
case _ =>
- val changelogSystem = new
StorageConfig(config).getChangelogSystem.orElse(null)
+ val changelogSystem =
StreamUtil.getSystemStreamFromNames(JavaOptionals.toRichOptional(new
StorageConfig(config).getChangelogStream(name)).toOption.getOrElse(throw new
SamzaException("System-stream not defined for store:"+name))).getSystem
val systemMaxMessageBytes = new
SystemConfig(config).getDefaultStreamProperties(changelogSystem).getOrDefault(KafkaConfig.MAX_MESSAGE_BYTES,
KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
systemMaxMessageBytes
}
diff --git
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index bb1b337..d94f414 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -212,15 +212,25 @@ class TestKafkaConfig {
}
@Test
+ def testGetSystemDefaultReplicationFactor(): Unit = {
+ assertEquals(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR, new
KafkaConfig(new
MapConfig()).getSystemDefaultReplicationFactor("kafka-system",KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR))
+
+
props.setProperty("systems.kafka-system.default.stream.replication.factor", "8")
+ val mapConfig = new MapConfig(props.asScala.asJava)
+ val kafkaConfig = new KafkaConfig(mapConfig)
+ assertEquals("8",
kafkaConfig.getSystemDefaultReplicationFactor("kafka-system","2"))
+ }
+
+ @Test
def testChangeLogReplicationFactor() {
props.setProperty("stores.store-with-override.changelog",
"kafka-system.changelog-topic")
props.setProperty("stores.store-with-override.changelog.replication.factor",
"3")
+ props.setProperty("stores.default.changelog.replication.factor", "2")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("3",
kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
assertEquals("2",
kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
- assertEquals("2", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
}
@Test
@@ -231,11 +241,11 @@ class TestKafkaConfig {
// Override the "default" default value
props.setProperty("stores.default.changelog.replication.factor", "5")
+
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("4",
kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
assertEquals("5",
kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
- assertEquals("5", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
}
@Test
@@ -243,12 +253,12 @@ class TestKafkaConfig {
props.setProperty(StorageConfig.CHANGELOG_SYSTEM, "kafka-system")
props.setProperty("systems.kafka-system.default.stream.replication.factor", "8")
props.setProperty("stores.store-with-override.changelog.replication.factor",
"4")
+ props.setProperty("stores.store-without-override.changelog",
"change-for-store-without-override")
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
assertEquals("4",
kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
assertEquals("8",
kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
- assertEquals("8", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
}
@Test