This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new abeb06f  SAMZA-2425: Add support for sub-query in joins
     new beb5e1b  Merge pull request #1242 from atoomula/join_subquery
abeb06f is described below

commit abeb06f16d2eb3bacd84db94686c4b0fde1cbe3e
Author: Aditya Toomula <[email protected]>
AuthorDate: Thu Jan 2 09:01:16 2020 -0800

    SAMZA-2425: Add support for sub-query in joins
---
 .../samza/sql/translator/JoinTranslator.java       | 26 +++++++++++++-----
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  | 31 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 7 deletions(-)

diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 0d625bc..635d0ba 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -28,6 +28,8 @@ import 
org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rex.RexCall;
@@ -292,17 +294,27 @@ class JoinTranslator {
         SqlExplainLevel.EXPPLAN_ATTRIBUTES);
   }
 
-  private SqlIOConfig resolveSourceConfigForTable(RelNode relNode, 
TranslatorContext context) {
+  private SqlIOConfig resolveSQlIOForTable(RelNode relNode, TranslatorContext 
context) {
+    // Let's recursively get to the TableScan node to identify IO for the 
table.
     if (relNode instanceof LogicalProject) {
-      return resolveSourceConfigForTable(((LogicalProject) 
relNode).getInput(), context);
+      return resolveSQlIOForTable(((LogicalProject) relNode).getInput(), 
context);
     }
 
-    // We are returning the sourceConfig for the table as null when the table 
is in another join rather than an output
-    // table, that's because the output of stream-table join is considered a 
stream.
-    if (relNode.getInputs().size() > 1) {
+    if (relNode instanceof LogicalFilter) {
+      return resolveSQlIOForTable(((LogicalFilter) relNode).getInput(), 
context);
+    }
+
+    // We return null for table IO as the table seems to be involved in 
another join. The output of stream-table join
+    // is considered a stream. Hence, we return null for the table.
+    if (relNode instanceof LogicalJoin && relNode.getInputs().size() > 1) {
       return null;
     }
 
+    if (!(relNode instanceof TableScan)) {
+      throw new SamzaException(String.format("Unsupported query. relNode %s is 
not of type TableScan.",
+          relNode.toString()));
+    }
+
     String sourceName = 
SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
     SqlIOConfig sourceConfig =
         
context.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource().get(sourceName);
@@ -320,7 +332,7 @@ class JoinTranslator {
     // The join key(s) for the table could be an udf in which case the relNode 
would be LogicalProject.
 
     if (relNode instanceof EnumerableTableScan || relNode instanceof 
LogicalProject) {
-      SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode, 
context);
+      SqlIOConfig sourceTableConfig = resolveSQlIOForTable(relNode, context);
       if (sourceTableConfig == null || 
!sourceTableConfig.getTableDescriptor().isPresent()) {
         return JoinInputNode.InputType.STREAM;
       } else if (sourceTableConfig.getTableDescriptor().get() instanceof 
RemoteTableDescriptor ||
@@ -336,7 +348,7 @@ class JoinTranslator {
 
   private Table getTable(JoinInputNode tableNode, TranslatorContext context) {
 
-    SqlIOConfig sourceTableConfig = 
resolveSourceConfigForTable(tableNode.getRelNode(), context);
+    SqlIOConfig sourceTableConfig = 
resolveSQlIOForTable(tableNode.getRelNode(), context);
 
     if (sourceTableConfig == null || 
!sourceTableConfig.getTableDescriptor().isPresent()) {
       String errMsg = "Failed to resolve table source in join operation: 
node=" + tableNode.getRelNode();
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 52fba62..26ba0ec 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -761,6 +761,37 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
+  public void testEndToEndStreamTableJoinWithSubQuery() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey as pageKey, p.address 
as profileAddress, coalesce(null, 'N/A') as companyName"
+            + " from (SELECT * FROM (SELECT * from testavro.PAGEVIEW pv1 where 
pv1.profileId=0) as pv2) as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(1, outMessages.size());
+    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameJoin(1);
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
   public void testEndToEndStreamTableInnerJoinWithUdf() throws Exception {
     int numMessages = 20;
 

Reply via email to