Repository: samza Updated Branches: refs/heads/master 210631cd5 -> bee30f5df
SAMZA-2014: Samza-sql: Support table as both source (for join) and destination in the same application While parsing queries in an application, with in SamzaSqlApplicationConfig, we collect all input sources and output sources from all queries and create descriptors for input sources first followed by output sources. But there could be only one table descriptor instance per table. Writable table is a readable table but vice versa is not true. If we go through input sources, we will end up creating readable table descriptor and would not be able to create writable table descriptor again when we go through output sources (the code will be ugly if we have to achieve this). There are couple of ways to solve this: - Always make a table readable and writable - Go through output sources first followed by input sources. Choosing option 2 as making a table always read-writable does not make sense. Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #834 from atoomula/table Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bee30f5d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bee30f5d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bee30f5d Branch: refs/heads/master Commit: bee30f5df64634f4acb4b5f1aca0d23c231948a3 Parents: 210631c Author: Aditya Toomula <[email protected]> Authored: Fri Nov 30 13:40:05 2018 -0800 Committer: Srinivasulu Punuru <[email protected]> Committed: Fri Nov 30 13:40:05 2018 -0800 ---------------------------------------------------------------------- .../sql/runner/SamzaSqlApplicationConfig.java | 9 +++-- .../samza/sql/e2e/TestSamzaSqlRemoteTable.java | 37 +++++++++++++++++--- 2 files changed, 38 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/bee30f5d/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index e608794..b2a5efe 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -127,12 +127,15 @@ public class SamzaSqlApplicationConfig { Set<String> inputSystemStreamSet = new HashSet<>(inputSystemStreams); Set<String> outputSystemStreamSet = new HashSet<>(outputSystemStreams); - inputSystemStreamConfigBySource = inputSystemStreamSet.stream() - .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src))); - + // Let's get the output system stream configs before input system stream configs. This is to account for + // table descriptor that could be both input and output. Please note that there could be only one + // instance of table descriptor and writable table is a readable table but vice versa is not true. outputSystemStreamConfigsBySource = outputSystemStreamSet.stream() .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x))); + inputSystemStreamConfigBySource = inputSystemStreamSet.stream() + .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src))); + Map<String, SqlIOConfig> systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource); systemStreamConfigsBySource.putAll(outputSystemStreamConfigsBySource); http://git-wip-us.apache.org/repos/asf/samza/blob/bee30f5d/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java index 51311eb..dc9fd27 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java @@ -97,7 +97,7 @@ public class TestSamzaSqlRemoteTable { TestAvroSystemFactory.messages.clear(); RemoteStoreIOResolverTestFactory.records.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - populateProfileTable(staticConfigs); + populateProfileTable(staticConfigs, numMessages); String sql = "Insert into testavro.enrichedPageViewTopic " @@ -130,7 +130,7 @@ public class TestSamzaSqlRemoteTable { RemoteStoreIOResolverTestFactory.records.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); - populateProfileTable(staticConfigs); + populateProfileTable(staticConfigs, numMessages); String sql = "Insert into testavro.enrichedPageViewTopic " @@ -163,7 +163,7 @@ public class TestSamzaSqlRemoteTable { RemoteStoreIOResolverTestFactory.records.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); - populateProfileTable(staticConfigs); + populateProfileTable(staticConfigs, numMessages); String sql = "Insert into testavro.enrichedPageViewTopic " @@ -188,9 +188,36 @@ public class TestSamzaSqlRemoteTable { Assert.assertEquals(expectedOutMessages, outMessages); } - private void populateProfileTable(Map<String, String> staticConfigs) { - int numMessages = 20; + @Test + public void testSameJoinTargetSinkEndToEndRightOuterJoin() { + int numMessages = 21; + + TestAvroSystemFactory.messages.clear(); + RemoteStoreIOResolverTestFactory.records.clear(); + Map<String, String> staticConfigs = + SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); + populateProfileTable(staticConfigs, numMessages); + + // The below query reads messages from a stream and deletes the corresponding records from the table. + // Since the stream has alternate messages with null foreign key, only half of the messages will have + // successful joins and hence only half of the records in the table will be deleted. Although join is + // redundant here, keeping it just for testing purpose. + String sql = + "Insert into testRemoteStore.Profile.`$table` " + + "select p.__key__ as __key__ " + + "from testRemoteStore.Profile.`$table` as p " + + "join testavro.PAGEVIEW as pv " + + " on p.__key__ = pv.profileId "; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + appRunnable.runAndWaitForFinish(); + + Assert.assertEquals((numMessages + 1) / 2, RemoteStoreIOResolverTestFactory.records.size()); + } + private void populateProfileTable(Map<String, String> staticConfigs, int numMessages) { RemoteStoreIOResolverTestFactory.records.clear(); String sql = "Insert into testRemoteStore.Profile.`$table` select * from testavro.PROFILE";
