Repository: samza
Updated Branches:
  refs/heads/master 58f18117c -> 5a31be92e


SAMZA-1903: Samza-sql - Fix stream-table join to work with udfs

Author: Aditya Toomula <[email protected]>

Reviewers: Srinivasulu Punuru <[email protected]>

Closes #762 from atoomula/udf


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5a31be92
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5a31be92
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5a31be92

Branch: refs/heads/master
Commit: 5a31be92eb8f4490879b10a30da81e12075ec6dc
Parents: 58f1811
Author: Aditya Toomula <[email protected]>
Authored: Thu Oct 25 14:28:19 2018 -0700
Committer: Srinivasulu Punuru <[email protected]>
Committed: Thu Oct 25 14:28:19 2018 -0700

----------------------------------------------------------------------
 .../samza/sql/translator/JoinTranslator.java    | 26 +++++++++++----
 .../sql/translator/TestQueryTranslator.java     | 28 ----------------
 .../test/samzasql/TestSamzaSqlEndToEnd.java     | 34 ++++++++++++++++++--
 3 files changed, 51 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5a31be92/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 0761898..0939f7b 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -27,6 +27,7 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
@@ -64,7 +65,6 @@ import static 
org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompo
  *   4. Join condition with a constant is not supported.
  *   5. Compound join condition with only AND operator is supported. AND 
operator with a constant is not supported. No
  *      support for OR operator or any other operator in the join condition.
- *   6. Join condition with UDFs is not supported. Eg: udf1(a.key) = 
udf2(b.key) is not supported.
  *
  * It is assumed that the stream denoted as 'table' is already partitioned by 
the key(s) specified in the join
  * condition. We do not repartition the table as bootstrap semantic is not 
propagated to the intermediate streams.
@@ -252,7 +252,17 @@ class JoinTranslator {
         SqlExplainLevel.EXPPLAN_ATTRIBUTES);
   }
 
-  private SqlIOConfig resolveSourceConfig(RelNode relNode) {
+  private SqlIOConfig resolveSourceConfigForTable(RelNode relNode) {
+    if (relNode instanceof LogicalProject) {
+      return resolveSourceConfigForTable(((LogicalProject) 
relNode).getInput());
+    }
+
+    // We are returning the sourceConfig for the table as null when the table 
is in another join rather than an output
+    // table, that's because the output of stream-table join is considered a 
stream.
+    if (relNode.getInputs().size() > 1) {
+      return null;
+    }
+
     String sourceName = String.join(".", 
relNode.getTable().getQualifiedName());
     SqlIOConfig sourceConfig = ioResolver.fetchSourceInfo(sourceName);
     if (sourceConfig == null) {
@@ -265,8 +275,10 @@ class JoinTranslator {
     // NOTE: Any intermediate form of a join is always a stream. Eg: For the 
second level join of
     // stream-table-table join, the left side of the join is join output, 
which we always
     // assume to be a stream. The intermediate stream won't be an instance of 
EnumerableTableScan.
-    if (relNode instanceof EnumerableTableScan) {
-      return resolveSourceConfig(relNode).getTableDescriptor().isPresent();
+    // The join key(s) for the table could be an udf in which case the relNode 
would be LogicalProject.
+    if (relNode instanceof EnumerableTableScan || relNode instanceof 
LogicalProject) {
+      SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode);
+      return sourceTableConfig != null && 
sourceTableConfig.getTableDescriptor().isPresent();
     } else {
       return false;
     }
@@ -277,9 +289,9 @@ class JoinTranslator {
 
     MessageStream<SamzaSqlRelMessage> relOutputStream = 
context.getMessageStream(relNode.getId());
 
-    SqlIOConfig sourceConfig = resolveSourceConfig(relNode);
+    SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode);
 
-    if (!sourceConfig.getTableDescriptor().isPresent()) {
+    if (sourceTableConfig == null || 
!sourceTableConfig.getTableDescriptor().isPresent()) {
       String errMsg = "Failed to resolve table source in join operation: 
node=" + relNode;
       log.error(errMsg);
       throw new SamzaException(errMsg);
@@ -288,7 +300,7 @@ class JoinTranslator {
     // Create a table backed by RocksDb store with the fields in the join 
condition as composite key and relational
     // message as the value. Send the messages from the input stream denoted 
as 'table' to the created table store.
     Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
-        
context.getStreamAppDescriptor().getTable(sourceConfig.getTableDescriptor().get());
+        
context.getStreamAppDescriptor().getTable(sourceTableConfig.getTableDescriptor().get());
 
     Serde<SamzaSqlCompositeKey> keySerde = new 
JsonSerdeV2<>(SamzaSqlCompositeKey.class);
     SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =

http://git-wip-us.apache.org/repos/asf/samza/blob/5a31be92/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 250253e..c7c82da 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -29,20 +29,16 @@ import 
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.context.ApplicationTaskContext;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
 
 import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchQueryInfo;
 import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchSqlFromConfig;
@@ -461,30 +457,6 @@ public class TestQueryTranslator {
     translator.translate(queryInfo.get(0), streamAppDesc);
   }
 
-  @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableInnerJoinWithUdf() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv"
-            + " join testavro.PROFILE.`$table` as p"
-            + " on MyTest(p.id) = MyTest(pv.profileId)";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-
-    List<String> sqlStmts = fetchSqlFromConfig(config);
-    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
-        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
-
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
-    QueryTranslator translator = new QueryTranslator(streamAppDesc, 
samzaSqlApplicationConfig);
-    translator.translate(queryInfo.get(0), streamAppDesc);
-  }
-
   @Test
   public void testTranslateStreamTableInnerJoin() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10);

http://git-wip-us.apache.org/repos/asf/samza/blob/5a31be92/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 6321764..df4020c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -297,6 +297,36 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
   }
 
   @Test
+  public void testEndToEndStreamTableInnerJoinWithUdf() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    staticConfigs.putAll(configs);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testavro.PROFILE.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on MyTest(p.id) = MyTest(pv.profileId)";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
   public void testEndToEndStreamTableInnerJoinWithNestedRecord() throws 
Exception {
     int numMessages = 20;
 
@@ -468,9 +498,9 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
             + "       p.address as profileAddress "
             + "from testavro.PAGEVIEW as pv "
             + "join testavro.PROFILE.`$table` as p "
-            + " on p.id = pv.profileId "
+            + " on MyTest(p.id) = MyTest(pv.profileId) "
             + " join testavro.COMPANY.`$table` as c "
-            + " on p.companyId = c.id";
+            + " on MyTest(p.companyId) = MyTest(c.id)";
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));

Reply via email to