Repository: samza Updated Branches: refs/heads/master c65802a98 -> 492a3d79d
SAMZA-1066 : JavaStorageConfig handling job.changelog.system SAMZA-1066 Same as change for StorageConfig.java Allows user to set changelog system in job.changelog.system and specify stream only in 'stores.<store>.changelog' Author: Boris Shkolnik <[email protected]> Reviewers: Navina Ramesh <[email protected]> Closes #36 from sborya/JavaStorageConfig Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/492a3d79 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/492a3d79 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/492a3d79 Branch: refs/heads/master Commit: 492a3d79dc2c1ad4fe83456dc29a5dffd5fc183e Parents: c65802a Author: Boris Shkolnik <[email protected]> Authored: Fri Dec 23 12:29:45 2016 -0800 Committer: Navina Ramesh <[email protected]> Committed: Fri Dec 23 12:29:45 2016 -0800 ---------------------------------------------------------------------- .../apache/samza/config/JavaStorageConfig.java | 23 +++++++++++++++- .../samza/config/TestJavaStorageConfig.java | 29 ++++++++++++++++++-- 2 files changed, 49 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/492a3d79/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java index 4ac689e..7480c93 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java @@ -21,6 +21,8 @@ package org.apache.samza.config; import java.util.ArrayList; import java.util.List; +import org.apache.samza.SamzaException; + /** * a java version of the storage config @@ -33,6 +35,7 @@ public class JavaStorageConfig extends MapConfig { private static final String KEY_SERDE = "stores.%s.key.serde"; private static final String MSG_SERDE = "stores.%s.msg.serde"; private static final String CHANGELOG_STREAM = "stores.%s.changelog"; + private static final String CHANGELOG_SYSTEM = "job.changelog.system"; public JavaStorageConfig(Config config) { super(config); @@ -50,7 +53,25 @@ public class JavaStorageConfig extends MapConfig { } public String getChangelogStream(String storeName) { - return get(String.format(CHANGELOG_STREAM, storeName), null); + + // If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence. + // If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> - + // these values will be combined into <asystem>.<astream> + String systemStream = get(String.format(CHANGELOG_STREAM, storeName), null); + String changelogSystem = get(CHANGELOG_SYSTEM, null); + + String systemStreamRes; + if (systemStream != null && ! systemStream.contains(".")) { + // contains only stream name + if (changelogSystem != null) { + systemStreamRes = changelogSystem + "." + systemStream; + } else { + throw new SamzaException("changelog system is not defined:" + systemStream); + } + } else { + systemStreamRes = systemStream; + } + return systemStreamRes; } public String getStorageFactoryClassName(String storeName) { http://git-wip-us.apache.org/repos/asf/samza/blob/492a3d79/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java index 6c93697..1f4d74c 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java @@ -32,15 +32,40 @@ public class TestJavaStorageConfig { public void testStorageConfig() { Map<String, String> map = new HashMap<String, String>(); map.put("stores.test.factory", "testFactory"); - map.put("stores.test.changelog", "testChangelog"); + map.put("stores.test.changelog", "testSystem.testChangelog"); map.put("stores.test.key.serde", "string"); map.put("stores.test.msg.serde", "integer"); JavaStorageConfig config = new JavaStorageConfig(new MapConfig(map)); assertEquals("testFactory", config.getStorageFactoryClassName("test")); - assertEquals("testChangelog", config.getChangelogStream("test")); + assertEquals("testSystem.testChangelog", config.getChangelogStream("test")); assertEquals("string", config.getStorageKeySerde("test")); assertEquals("integer", config.getStorageMsgSerde("test")); assertEquals("test", config.getStoreNames().get(0)); } + + + @Test + public void testIsChangelogSystemSetting() { + Map<String, String> configMap = new HashMap<>(); + configMap.put("stores.store1.changelog", "system1.stream1"); + configMap.put("job.changelog.system", "system2"); + configMap.put("stores.store2.changelog", "stream2"); + + JavaStorageConfig config = new JavaStorageConfig(new MapConfig(configMap)); + + assertEquals("system1.stream1", config.getChangelogStream("store1")); + assertEquals("system2.stream2", config.getChangelogStream("store2")); + + Map<String, String> configMapErr = new HashMap<>(); + configMapErr.put("stores.store4.changelog","stream4"); // incorrect + JavaStorageConfig configErr = new JavaStorageConfig(new MapConfig(configMapErr)); + + try { + configErr.getChangelogStream("store4"); + fail("store4 has no system defined. Should've failed."); + } catch (Exception e) { + // do nothing, it is expected + } + } }
