Repository: samza
Updated Branches:
  refs/heads/master c65802a98 -> 492a3d79d


SAMZA-1066 : JavaStorageConfig handling job.changelog.system

SAMZA-1066

Same as change for StorageConfig.java
Allows user to set changelog system in job.changelog.system and specify stream 
only in 'stores.<store>.changelog'

Author: Boris Shkolnik <[email protected]>

Reviewers: Navina Ramesh <[email protected]>

Closes #36 from sborya/JavaStorageConfig


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/492a3d79
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/492a3d79
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/492a3d79

Branch: refs/heads/master
Commit: 492a3d79dc2c1ad4fe83456dc29a5dffd5fc183e
Parents: c65802a
Author: Boris Shkolnik <[email protected]>
Authored: Fri Dec 23 12:29:45 2016 -0800
Committer: Navina Ramesh <[email protected]>
Committed: Fri Dec 23 12:29:45 2016 -0800

----------------------------------------------------------------------
 .../apache/samza/config/JavaStorageConfig.java  | 23 +++++++++++++++-
 .../samza/config/TestJavaStorageConfig.java     | 29 ++++++++++++++++++--
 2 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/492a3d79/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index 4ac689e..7480c93 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -21,6 +21,8 @@ package org.apache.samza.config;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.samza.SamzaException;
+
 
 /**
  * a java version of the storage config
@@ -33,6 +35,7 @@ public class JavaStorageConfig extends MapConfig {
   private static final String KEY_SERDE = "stores.%s.key.serde";
   private static final String MSG_SERDE = "stores.%s.msg.serde";
   private static final String CHANGELOG_STREAM = "stores.%s.changelog";
+  private static final String CHANGELOG_SYSTEM = "job.changelog.system";
 
   public JavaStorageConfig(Config config) {
     super(config);
@@ -50,7 +53,25 @@ public class JavaStorageConfig extends MapConfig {
   }
 
   public String getChangelogStream(String storeName) {
-    return get(String.format(CHANGELOG_STREAM, storeName), null);
+
+    // If the config specifies 'stores.<storename>.changelog' as 
'<system>.<stream>' combination - it will take precedence.
+    // If this config only specifies <astream> and there is a value in 
job.changelog.system=<asystem> -
+    // these values will be combined into <asystem>.<astream>
+    String systemStream = get(String.format(CHANGELOG_STREAM, storeName), 
null);
+    String changelogSystem = get(CHANGELOG_SYSTEM, null);
+
+    String systemStreamRes;
+    if (systemStream != null  && ! systemStream.contains(".")) {
+      // contains only stream name
+      if (changelogSystem != null) {
+        systemStreamRes = changelogSystem + "." + systemStream;
+      } else {
+        throw new SamzaException("changelog system is not defined:" + 
systemStream);
+      }
+    } else {
+      systemStreamRes = systemStream;
+    }
+    return systemStreamRes;
   }
 
   public String getStorageFactoryClassName(String storeName) {

http://git-wip-us.apache.org/repos/asf/samza/blob/492a3d79/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
index 6c93697..1f4d74c 100644
--- 
a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
+++ 
b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
@@ -32,15 +32,40 @@ public class TestJavaStorageConfig {
   public void testStorageConfig() {
     Map<String, String> map = new HashMap<String, String>();
     map.put("stores.test.factory", "testFactory");
-    map.put("stores.test.changelog", "testChangelog");
+    map.put("stores.test.changelog", "testSystem.testChangelog");
     map.put("stores.test.key.serde", "string");
     map.put("stores.test.msg.serde", "integer");
     JavaStorageConfig config = new JavaStorageConfig(new MapConfig(map));
 
     assertEquals("testFactory", config.getStorageFactoryClassName("test"));
-    assertEquals("testChangelog", config.getChangelogStream("test"));
+    assertEquals("testSystem.testChangelog", 
config.getChangelogStream("test"));
     assertEquals("string", config.getStorageKeySerde("test"));
     assertEquals("integer", config.getStorageMsgSerde("test"));
     assertEquals("test", config.getStoreNames().get(0));
   }
+
+
+  @Test
+  public void testIsChangelogSystemSetting() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("stores.store1.changelog", "system1.stream1");
+    configMap.put("job.changelog.system", "system2");
+    configMap.put("stores.store2.changelog", "stream2");
+
+    JavaStorageConfig config = new JavaStorageConfig(new MapConfig(configMap));
+
+    assertEquals("system1.stream1", config.getChangelogStream("store1"));
+    assertEquals("system2.stream2", config.getChangelogStream("store2"));
+
+    Map<String, String> configMapErr = new HashMap<>();
+    configMapErr.put("stores.store4.changelog","stream4"); // incorrect
+    JavaStorageConfig configErr = new JavaStorageConfig(new 
MapConfig(configMapErr));
+
+    try {
+      configErr.getChangelogStream("store4");
+      fail("store4 has no system defined. Should've failed.");
+    } catch (Exception e) {
+       // do nothing, it is expected
+    }
+  }
 }

Reply via email to