Repository: samza Updated Branches: refs/heads/master 58f18117c -> 5a31be92e
SAMZA-1903: Samza-sql - Fix stream-table join to work with udfs Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #762 from atoomula/udf Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5a31be92 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5a31be92 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5a31be92 Branch: refs/heads/master Commit: 5a31be92eb8f4490879b10a30da81e12075ec6dc Parents: 58f1811 Author: Aditya Toomula <[email protected]> Authored: Thu Oct 25 14:28:19 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Thu Oct 25 14:28:19 2018 -0700 ---------------------------------------------------------------------- .../samza/sql/translator/JoinTranslator.java | 26 +++++++++++---- .../sql/translator/TestQueryTranslator.java | 28 ---------------- .../test/samzasql/TestSamzaSqlEndToEnd.java | 34 ++++++++++++++++++-- 3 files changed, 51 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5a31be92/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java index 0761898..0939f7b 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -27,6 +27,7 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; @@ -64,7 +65,6 @@ import static org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompo * 4. Join condition with a constant is not supported. * 5. Compound join condition with only AND operator is supported. AND operator with a constant is not supported. No * support for OR operator or any other operator in the join condition. - * 6. Join condition with UDFs is not supported. Eg: udf1(a.key) = udf2(b.key) is not supported. * * It is assumed that the stream denoted as 'table' is already partitioned by the key(s) specified in the join * condition. We do not repartition the table as bootstrap semantic is not propagated to the intermediate streams. @@ -252,7 +252,17 @@ class JoinTranslator { SqlExplainLevel.EXPPLAN_ATTRIBUTES); } - private SqlIOConfig resolveSourceConfig(RelNode relNode) { + private SqlIOConfig resolveSourceConfigForTable(RelNode relNode) { + if (relNode instanceof LogicalProject) { + return resolveSourceConfigForTable(((LogicalProject) relNode).getInput()); + } + + // We are returning the sourceConfig for the table as null when the table is in another join rather than an output + // table, that's because the output of stream-table join is considered a stream. + if (relNode.getInputs().size() > 1) { + return null; + } + String sourceName = String.join(".", relNode.getTable().getQualifiedName()); SqlIOConfig sourceConfig = ioResolver.fetchSourceInfo(sourceName); if (sourceConfig == null) { @@ -265,8 +275,10 @@ class JoinTranslator { // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of // stream-table-table join, the left side of the join is join output, which we always // assume to be a stream. The intermediate stream won't be an instance of EnumerableTableScan. - if (relNode instanceof EnumerableTableScan) { - return resolveSourceConfig(relNode).getTableDescriptor().isPresent(); + // The join key(s) for the table could be an udf in which case the relNode would be LogicalProject. + if (relNode instanceof EnumerableTableScan || relNode instanceof LogicalProject) { + SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode); + return sourceTableConfig != null && sourceTableConfig.getTableDescriptor().isPresent(); } else { return false; } @@ -277,9 +289,9 @@ class JoinTranslator { MessageStream<SamzaSqlRelMessage> relOutputStream = context.getMessageStream(relNode.getId()); - SqlIOConfig sourceConfig = resolveSourceConfig(relNode); + SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode); - if (!sourceConfig.getTableDescriptor().isPresent()) { + if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) { String errMsg = "Failed to resolve table source in join operation: node=" + relNode; log.error(errMsg); throw new SamzaException(errMsg); @@ -288,7 +300,7 @@ class JoinTranslator { // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational // message as the value. Send the messages from the input stream denoted as 'table' to the created table store. Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table = - context.getStreamAppDescriptor().getTable(sourceConfig.getTableDescriptor().get()); + context.getStreamAppDescriptor().getTable(sourceTableConfig.getTableDescriptor().get()); Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde = http://git-wip-us.apache.org/repos/asf/samza/blob/5a31be92/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java index 250253e..c7c82da 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -29,20 +29,16 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; -import org.apache.samza.context.ApplicationTaskContext; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationContext; import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchQueryInfo; import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.fetchSqlFromConfig; @@ -461,30 +457,6 @@ public class TestQueryTranslator { translator.translate(queryInfo.get(0), streamAppDesc); } - @Test (expected = SamzaException.class) - public void testTranslateStreamTableInnerJoinWithUdf() { - Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); - String sql = - "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)" - + " select p.name as profileName, pv.pageKey" - + " from testavro.PAGEVIEW as pv" - + " join testavro.PROFILE.`$table` as p" - + " on MyTest(p.id) = MyTest(pv.profileId)"; - config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); - Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); - - List<String> sqlStmts = fetchSqlFromConfig(config); - List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts); - SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream) - .collect(Collectors.toSet()), - queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet())); - - StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig); - QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig); - translator.translate(queryInfo.get(0), streamAppDesc); - } - @Test public void testTranslateStreamTableInnerJoin() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); http://git-wip-us.apache.org/repos/asf/samza/blob/5a31be92/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index 6321764..df4020c 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -297,6 +297,36 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { } @Test + public void testEndToEndStreamTableInnerJoinWithUdf() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + staticConfigs.putAll(configs); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," + + " p.name as profileName, p.address as profileAddress " + + "from testavro.PROFILE.`$table` as p " + + "join testavro.PAGEVIEW as pv " + + " on MyTest(p.id) = MyTest(pv.profileId)"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages); + Assert.assertEquals(expectedOutMessages, outMessages); + } + + @Test public void testEndToEndStreamTableInnerJoinWithNestedRecord() throws Exception { int numMessages = 20; @@ -468,9 +498,9 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { + " p.address as profileAddress " + "from testavro.PAGEVIEW as pv " + "join testavro.PROFILE.`$table` as p " - + " on p.id = pv.profileId " + + " on MyTest(p.id) = MyTest(pv.profileId) " + " join testavro.COMPANY.`$table` as c " - + " on p.companyId = c.id"; + + " on MyTest(p.companyId) = MyTest(c.id)"; List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
