Repository: samza Updated Branches: refs/heads/master 28ca72965 -> b0b292200
SAMZA-1821: fix the IllegalStateException complaining duplicate key when fetching systemStream configs ## What changes were proposed in this pull request? This PR is to fix IllegalStateException complaining duplicate key when fetching resource configs in SamzaSqlApplicationConfig. ## How was this patch tested? Pass the local build and current unit tests. Added a new unit test. Author: Weiqing Yang <[email protected]> Reviewers: Aditya Toomula <[email protected]>, Srinivasulu Punuru <[email protected]> Closes #616 from weiqingy/SAMZA-1821 and squashes the following commits: e85df23f [Weiqing Yang] use distinct() 61245627 [Weiqing Yang] SAMZA-1821: fix the IllegalStateException complaining duplicate key when fetching systemStream configs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b0b29220 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b0b29220 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b0b29220 Branch: refs/heads/master Commit: b0b2922002c3e983ca18243bd3d42b3be4e17df8 Parents: 28ca729 Author: Weiqing Yang <[email protected]> Authored: Tue Sep 4 16:27:30 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Tue Sep 4 16:27:30 2018 -0700 ---------------------------------------------------------------------- .../sql/runner/SamzaSqlApplicationConfig.java | 6 ++++-- .../runner/TestSamzaSqlApplicationConfig.java | 22 +++++++++++++++++++- .../sql/testutil/TestSamzaSqlFileParser.java | 1 - 3 files changed, 25 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 316d174..997312f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -113,13 +113,15 @@ public class SamzaSqlApplicationConfig { inputSystemStreamConfigBySource = queryInfo.stream() .map(QueryInfo::getSources) .flatMap(Collection::stream) - .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src))); + .distinct() + .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSourceInfo)); Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values()); outputSystemStreamConfigsBySource = queryInfo.stream() .map(QueryInfo::getSink) - .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x))); + .distinct() + .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSinkInfo)); systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values()); relSchemaProvidersBySource = systemStreamConfigs.stream() http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java index dac5d02..dda0e14 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java @@ -19,13 +19,16 @@ package org.apache.samza.sql.runner; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.samza.SamzaException; import org.apache.samza.config.MapConfig; import org.apache.samza.sql.impl.ConfigBasedUdfResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.testutil.JsonUtil; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.junit.Assert; import org.junit.Test; @@ -76,6 +79,23 @@ public class TestSamzaSqlApplicationConfig { testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER); } + @Test + public void testGetInputAndOutputStreamConfigs() { + List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1", + "insert into testavro.Profile select * from testavro.SIMPLE1"); + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + + Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet(); + Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet(); + Assert.assertEquals(1, inputKeys.size()); + Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1")); + Assert.assertEquals(2, outputKeys.size()); + Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1")); + Assert.assertTrue(outputKeys.contains("testavro.Profile")); + } + private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) { Map<String, String> badConfigs = new HashMap<>(config); badConfigs.remove(configKey); http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java index a84f347..1723e0e 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.List; -import org.apache.samza.sql.testutil.SqlFileParser; import org.junit.Assert; import org.junit.Test;
