This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e6fe169  SAMZA-2325 : Adding logic to read system config for 
repl-factor when creating a topic (#1157)
e6fe169 is described below

commit e6fe169b9152fa0f6fa8884237d2649e25146ee7
Author: rmatharu <[email protected]>
AuthorDate: Wed Sep 18 15:27:44 2019 -0700

    SAMZA-2325 : Adding logic to read system config for repl-factor when 
creating a topic (#1157)
    
    * Adding logic to read system config for repl-factor when creating a topic
---
 .../org/apache/samza/config/StorageConfig.java     |  2 +-
 .../org/apache/samza/config/TestStorageConfig.java | 53 ++++++++++++++--------
 .../samza/system/kafka/KafkaSystemAdmin.java       |  8 ++++
 .../org/apache/samza/config/KafkaConfig.scala      | 38 +++++++++++-----
 .../org/apache/samza/config/TestKafkaConfig.scala  | 16 +++++--
 5 files changed, 83 insertions(+), 34 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 7bc6cb4..86c7e7d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -148,7 +148,7 @@ public class StorageConfig extends MapConfig {
    *
    * @return the name of the system to use by default for all changelogs, if 
defined.
    */
-  public Optional<String> getChangelogSystem() {
+  private Optional<String> getChangelogSystem() {
     return Optional.ofNullable(get(CHANGELOG_SYSTEM, 
get(JobConfig.JOB_DEFAULT_SYSTEM)));
   }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 2cde5df..e094de2 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.samza.SamzaException;
 import org.junit.Test;
 
+import static org.apache.samza.config.StorageConfig.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -81,6 +82,37 @@ public class TestStorageConfig {
             ApplicationConfig.ApplicationMode.BATCH.name().toLowerCase(), 
ApplicationConfig.APP_RUN_ID, "run-id")));
     assertEquals(Optional.of("changelog-system.changelog-stream0-run-id"),
         storageConfig.getChangelogStream(STORE_NAME0));
+
+    // job has no changelog stream defined
+    storageConfig = new StorageConfig(new MapConfig(
+        ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", 
JobConfig.JOB_DEFAULT_SYSTEM,
+            "should-not-be-used")));
+    assertEquals(Optional.empty(), 
storageConfig.getChangelogStream(STORE_NAME0));
+
+    // job.changelog.system takes precedence over job.default.system when 
changelog is specified as just streamName
+    storageConfig = new StorageConfig(new MapConfig(
+        ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", 
JobConfig.JOB_DEFAULT_SYSTEM,
+            "should-not-be-used", String.format(CHANGELOG_STREAM, 
STORE_NAME0), "streamName")));
+    assertEquals("changelog-system.streamName", 
storageConfig.getChangelogStream(STORE_NAME0).get());
+
+    // job.changelog.system takes precedence over job.default.system when 
changelog is specified as {systemName}.{streamName}
+    storageConfig = new StorageConfig(new MapConfig(
+        ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", 
JobConfig.JOB_DEFAULT_SYSTEM,
+            "should-not-be-used", String.format(CHANGELOG_STREAM, 
STORE_NAME0), "changelog-system.streamName")));
+    assertEquals("changelog-system.streamName", 
storageConfig.getChangelogStream(STORE_NAME0).get());
+
+    // systemName specified using stores.{storeName}.changelog = 
{systemName}.{streamName} should take precedence even
+    // when job.changelog.system and job.default.system are specified
+    storageConfig = new StorageConfig(new MapConfig(
+        ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, 
"default-changelog-system",
+            JobConfig.JOB_DEFAULT_SYSTEM, "default-system",
+            String.format(CHANGELOG_STREAM, STORE_NAME0), 
"nondefault-changelog-system.streamName")));
+    assertEquals("nondefault-changelog-system.streamName", 
storageConfig.getChangelogStream(STORE_NAME0).get());
+
+    // fall back to job.default.system if job.changelog.system is not specified
+    storageConfig = new StorageConfig(new MapConfig(
+        ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system", 
String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName")));
+    assertEquals("default-system.streamName", 
storageConfig.getChangelogStream(STORE_NAME0).get());
   }
 
   @Test(expected = SamzaException.class)
@@ -153,23 +185,6 @@ public class TestStorageConfig {
   }
 
   @Test
-  public void testGetChangelogSystem() {
-    // empty config, so no system
-    assertEquals(Optional.empty(), new StorageConfig(new 
MapConfig()).getChangelogSystem());
-
-    // job.changelog.system takes precedence over job.default.system
-    StorageConfig storageConfig = new StorageConfig(new MapConfig(
-        ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", 
JobConfig.JOB_DEFAULT_SYSTEM,
-            "should-not-be-used")));
-    assertEquals(Optional.of("changelog-system"), 
storageConfig.getChangelogSystem());
-
-    // fall back to job.default.system if job.changelog.system is not specified
-    storageConfig =
-        new StorageConfig(new 
MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system")));
-    assertEquals(Optional.of("default-system"), 
storageConfig.getChangelogSystem());
-  }
-
-  @Test
   public void testGetSideInputs() {
     // empty config, so no system
     assertEquals(Collections.emptyList(), new StorageConfig(new 
MapConfig()).getSideInputs(STORE_NAME0));
@@ -232,7 +247,7 @@ public class TestStorageConfig {
     StorageConfig storageConfig = new StorageConfig(new 
MapConfig(ImmutableMap.of(
         // store0 has a changelog stream
         String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class",
-        String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), 
"system0.changelog-stream",
+        String.format(CHANGELOG_STREAM, STORE_NAME0), 
"system0.changelog-stream",
         // store1 does not have a changelog stream
         String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class")));
     assertTrue(storageConfig.isChangelogSystem("system0"));
@@ -248,7 +263,7 @@ public class TestStorageConfig {
 
     storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), 
"factory.class",
-            String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), 
"system0.changelog-stream")));
+            String.format(CHANGELOG_STREAM, STORE_NAME0), 
"system0.changelog-stream")));
     assertTrue(storageConfig.hasDurableStores());
   }
 
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index c18c82d..1986bea 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -546,6 +546,14 @@ public class KafkaSystemAdmin implements SystemAdmin {
       kafkaSpec = kafkaSpec.copyWithProperties(properties);
     } else {
       kafkaSpec = KafkaStreamSpec.fromSpec(spec);
+
+      // we check if there is a system-level rf config specified, else we use 
KafkaConfig.topic-default-rf
+      int replicationFactorFromSystemConfig = Integer.valueOf(
+          new 
KafkaConfig(config).getSystemDefaultReplicationFactor(spec.getSystemName(),
+              KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()));
+      LOG.info("Using replication-factor: {} for StreamSpec: {}", 
replicationFactorFromSystemConfig, spec);
+      return new KafkaStreamSpec(kafkaSpec.getId(), 
kafkaSpec.getPhysicalName(), kafkaSpec.getSystemName(),
+          kafkaSpec.getPartitionCount(), replicationFactorFromSystemConfig, 
kafkaSpec.getProperties());
     }
     return kafkaSpec;
   }
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index b9543fb..993f0e4 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -118,7 +118,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
     Option(replicationFactor)
   }
 
-  private def getSystemDefaultReplicationFactor(systemName: String, 
defaultValue: String) = {
+  def getSystemDefaultReplicationFactor(systemName: String, defaultValue: 
String) = {
     val defaultReplicationFactor = new 
SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR,
 defaultValue)
     defaultReplicationFactor
   }
@@ -243,19 +243,35 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   /**
     * Gets the replication factor for the changelog topics. Uses the following 
precedence.
     *
-    * 1. If stores.myStore.changelog.replication.factor is configured, that 
value is used.
-    * 2. If systems.changelog-system.default.stream.replication.factor is 
configured, that value is used.
-    * 3. 2
-    *
-    * Note that the changelog-system has a similar precedence. See 
[[StorageConfig]]
+    * 1. If stores.{storeName}.changelog.replication.factor is configured, 
that value is used.
+    * 2. If it is not configured, the value configured for 
stores.default.changelog.replication.factor is used.
+    * 3. If it is not configured, the RF value configured for the store's 
changelog's system, configured using
+    * stores.{storeName}.changelog={systemName}.{streamName}, is used.
+    * 4. If it is not configured, the value for the RF of job.changelog.system 
is used.
+    * 5. If it is not configured, the value for the RF of job.default.system 
is used.
+    * 6. If it is not configured, the RF is chosen as 2.
     */
-  def getChangelogStreamReplicationFactor(name: String) = 
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format 
name).getOrElse(getDefaultChangelogStreamReplicationFactor)
+  def getChangelogStreamReplicationFactor(storeName: String) = {
+    var changelogRF = 
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format storeName)
+
+    if(!changelogRF.isDefined) {
+      changelogRF = 
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR)
+    }
 
-  def getDefaultChangelogStreamReplicationFactor() = {
-    val changelogSystem = new 
StorageConfig(config).getChangelogSystem.orElse(null)
-    
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem,
 "2"))
+    if(!changelogRF.isDefined) {
+      val changelogSystemStream = new 
StorageConfig(config).getChangelogStream(storeName)
+      if (!changelogSystemStream.isPresent) {
+        throw new SamzaException("Cannot deduce replication factor. Changelog 
system-stream not defined for store " + storeName)
+      }
+
+      val changelogSystem = 
StreamUtil.getSystemStreamFromNames(changelogSystemStream.get()).getSystem
+      changelogRF = 
Option.apply(getSystemDefaultReplicationFactor(changelogSystem, null))
+    }
+
+    changelogRF.getOrElse(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR)
   }
 
+
   /**
     * Gets the max message bytes for the changelog topics. Uses the following 
precedence.
     *
@@ -268,7 +284,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   def getChangelogStreamMaxMessageByte(name: String) = 
getOption(KafkaConfig.CHANGELOG_MAX_MESSAGE_BYTES format name) match {
     case Some(maxMessageBytes) => maxMessageBytes
     case _ =>
-      val changelogSystem = new 
StorageConfig(config).getChangelogSystem.orElse(null)
+      val changelogSystem = 
StreamUtil.getSystemStreamFromNames(JavaOptionals.toRichOptional(new 
StorageConfig(config).getChangelogStream(name)).toOption.getOrElse(throw new 
SamzaException("System-stream not defined for store:"+name))).getSystem
       val systemMaxMessageBytes = new 
SystemConfig(config).getDefaultStreamProperties(changelogSystem).getOrDefault(KafkaConfig.MAX_MESSAGE_BYTES,
 KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
       systemMaxMessageBytes
   }
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index bb1b337..d94f414 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -212,15 +212,25 @@ class TestKafkaConfig {
   }
 
   @Test
+  def testGetSystemDefaultReplicationFactor(): Unit = {
+    assertEquals(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR, new 
KafkaConfig(new 
MapConfig()).getSystemDefaultReplicationFactor("kafka-system",KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR))
+
+    
props.setProperty("systems.kafka-system.default.stream.replication.factor", "8")
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    assertEquals("8", 
kafkaConfig.getSystemDefaultReplicationFactor("kafka-system","2"))
+  }
+
+  @Test
   def testChangeLogReplicationFactor() {
     props.setProperty("stores.store-with-override.changelog", 
"kafka-system.changelog-topic")
     
props.setProperty("stores.store-with-override.changelog.replication.factor", 
"3")
+    props.setProperty("stores.default.changelog.replication.factor", "2")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     assertEquals("3", 
kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
     assertEquals("2", 
kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
-    assertEquals("2", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
   }
 
   @Test
@@ -231,11 +241,11 @@ class TestKafkaConfig {
     // Override the "default" default value
     props.setProperty("stores.default.changelog.replication.factor", "5")
 
+
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     assertEquals("4", 
kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
     assertEquals("5", 
kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
-    assertEquals("5", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
   }
 
   @Test
@@ -243,12 +253,12 @@ class TestKafkaConfig {
     props.setProperty(StorageConfig.CHANGELOG_SYSTEM, "kafka-system")
     
props.setProperty("systems.kafka-system.default.stream.replication.factor", "8")
     
props.setProperty("stores.store-with-override.changelog.replication.factor", 
"4")
+    props.setProperty("stores.store-without-override.changelog", 
"change-for-store-without-override")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     assertEquals("4", 
kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"))
     assertEquals("8", 
kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"))
-    assertEquals("8", kafkaConfig.getDefaultChangelogStreamReplicationFactor)
   }
 
   @Test

Reply via email to