Repository: samza
Updated Branches:
  refs/heads/master 210631cd5 -> bee30f5df


SAMZA-2014: Samza-sql: Support table as both source (for join) and destination 
in the same application

While parsing queries in an application, with in SamzaSqlApplicationConfig, we 
collect all input sources and output sources from all queries and create 
descriptors for input sources first followed by output sources. But there could 
be only one table descriptor instance per table. Writable table is a readable 
table but vice versa is not true. If we go through input sources, we will end 
up creating readable table descriptor and would not be able to create writable 
table descriptor again when we go through output sources (the code will be ugly 
if we have to achieve this). There are couple of ways to solve this:
- Always make a table readable and writable
- Go through output sources first followed by input sources.

Choosing option 2 as making a table always read-writable does not make sense.

Author: Aditya Toomula <[email protected]>

Reviewers: Srinivasulu Punuru <[email protected]>

Closes #834 from atoomula/table


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bee30f5d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bee30f5d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bee30f5d

Branch: refs/heads/master
Commit: bee30f5df64634f4acb4b5f1aca0d23c231948a3
Parents: 210631c
Author: Aditya Toomula <[email protected]>
Authored: Fri Nov 30 13:40:05 2018 -0800
Committer: Srinivasulu Punuru <[email protected]>
Committed: Fri Nov 30 13:40:05 2018 -0800

----------------------------------------------------------------------
 .../sql/runner/SamzaSqlApplicationConfig.java   |  9 +++--
 .../samza/sql/e2e/TestSamzaSqlRemoteTable.java  | 37 +++++++++++++++++---
 2 files changed, 38 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bee30f5d/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index e608794..b2a5efe 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -127,12 +127,15 @@ public class SamzaSqlApplicationConfig {
     Set<String> inputSystemStreamSet = new HashSet<>(inputSystemStreams);
     Set<String> outputSystemStreamSet = new HashSet<>(outputSystemStreams);
 
-    inputSystemStreamConfigBySource = inputSystemStreamSet.stream()
-         .collect(Collectors.toMap(Function.identity(), src -> 
ioResolver.fetchSourceInfo(src)));
-
+    // Let's get the output system stream configs before input system stream 
configs. This is to account for
+    // table descriptor that could be both input and output. Please note that 
there could be only one
+    // instance of table descriptor and writable table is a readable table but 
vice versa is not true.
     outputSystemStreamConfigsBySource = outputSystemStreamSet.stream()
          .collect(Collectors.toMap(Function.identity(), x -> 
ioResolver.fetchSinkInfo(x)));
 
+    inputSystemStreamConfigBySource = inputSystemStreamSet.stream()
+        .collect(Collectors.toMap(Function.identity(), src -> 
ioResolver.fetchSourceInfo(src)));
+
     Map<String, SqlIOConfig> systemStreamConfigsBySource = new 
HashMap<>(inputSystemStreamConfigBySource);
     systemStreamConfigsBySource.putAll(outputSystemStreamConfigsBySource);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/bee30f5d/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java 
b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
index 51311eb..dc9fd27 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java
@@ -97,7 +97,7 @@ public class TestSamzaSqlRemoteTable {
     TestAvroSystemFactory.messages.clear();
     RemoteStoreIOResolverTestFactory.records.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
-    populateProfileTable(staticConfigs);
+    populateProfileTable(staticConfigs, numMessages);
 
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
@@ -130,7 +130,7 @@ public class TestSamzaSqlRemoteTable {
     RemoteStoreIOResolverTestFactory.records.clear();
     Map<String, String> staticConfigs =
         SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
-    populateProfileTable(staticConfigs);
+    populateProfileTable(staticConfigs, numMessages);
 
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
@@ -163,7 +163,7 @@ public class TestSamzaSqlRemoteTable {
     RemoteStoreIOResolverTestFactory.records.clear();
     Map<String, String> staticConfigs =
         SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
-    populateProfileTable(staticConfigs);
+    populateProfileTable(staticConfigs, numMessages);
 
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
@@ -188,9 +188,36 @@ public class TestSamzaSqlRemoteTable {
     Assert.assertEquals(expectedOutMessages, outMessages);
   }
 
-  private void populateProfileTable(Map<String, String> staticConfigs) {
-    int numMessages = 20;
+  @Test
+  public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
+    int numMessages = 21;
+
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs =
+        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
+    populateProfileTable(staticConfigs, numMessages);
+
+    // The below query reads messages from a stream and deletes the 
corresponding records from the table.
+    // Since the stream has alternate messages with null foreign key, only 
half of the messages will have
+    // successful joins and hence only half of the records in the table will 
be deleted. Although join is
+    // redundant here, keeping it just for testing purpose.
+    String sql =
+        "Insert into testRemoteStore.Profile.`$table` "
+            + "select p.__key__ as __key__ "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId ";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner appRunnable = new 
SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    appRunnable.runAndWaitForFinish();
+
+    Assert.assertEquals((numMessages + 1) / 2, 
RemoteStoreIOResolverTestFactory.records.size());
+  }
 
+  private void populateProfileTable(Map<String, String> staticConfigs, int 
numMessages) {
     RemoteStoreIOResolverTestFactory.records.clear();
 
     String sql = "Insert into testRemoteStore.Profile.`$table` select * from 
testavro.PROFILE";

Reply via email to