This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new abeb06f SAMZA-2425: Add support for sub-query in joins
new beb5e1b Merge pull request #1242 from atoomula/join_subquery
abeb06f is described below
commit abeb06f16d2eb3bacd84db94686c4b0fde1cbe3e
Author: Aditya Toomula <[email protected]>
AuthorDate: Thu Jan 2 09:01:16 2020 -0800
SAMZA-2425: Add support for sub-query in joins
---
.../samza/sql/translator/JoinTranslator.java | 26 +++++++++++++-----
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 31 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 7 deletions(-)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 0d625bc..635d0ba 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -28,6 +28,8 @@ import
org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexCall;
@@ -292,17 +294,27 @@ class JoinTranslator {
SqlExplainLevel.EXPPLAN_ATTRIBUTES);
}
- private SqlIOConfig resolveSourceConfigForTable(RelNode relNode,
TranslatorContext context) {
+ private SqlIOConfig resolveSQlIOForTable(RelNode relNode, TranslatorContext
context) {
+ // Let's recursively get to the TableScan node to identify IO for the
table.
if (relNode instanceof LogicalProject) {
- return resolveSourceConfigForTable(((LogicalProject)
relNode).getInput(), context);
+ return resolveSQlIOForTable(((LogicalProject) relNode).getInput(),
context);
}
- // We are returning the sourceConfig for the table as null when the table
is in another join rather than an output
- // table, that's because the output of stream-table join is considered a
stream.
- if (relNode.getInputs().size() > 1) {
+ if (relNode instanceof LogicalFilter) {
+ return resolveSQlIOForTable(((LogicalFilter) relNode).getInput(),
context);
+ }
+
+ // We return null for table IO as the table seems to be involved in
another join. The output of stream-table join
+ // is considered a stream. Hence, we return null for the table.
+ if (relNode instanceof LogicalJoin && relNode.getInputs().size() > 1) {
return null;
}
+ if (!(relNode instanceof TableScan)) {
+ throw new SamzaException(String.format("Unsupported query. relNode %s is
not of type TableScan.",
+ relNode.toString()));
+ }
+
String sourceName =
SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
SqlIOConfig sourceConfig =
context.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource().get(sourceName);
@@ -320,7 +332,7 @@ class JoinTranslator {
// The join key(s) for the table could be an udf in which case the relNode
would be LogicalProject.
if (relNode instanceof EnumerableTableScan || relNode instanceof
LogicalProject) {
- SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode,
context);
+ SqlIOConfig sourceTableConfig = resolveSQlIOForTable(relNode, context);
if (sourceTableConfig == null ||
!sourceTableConfig.getTableDescriptor().isPresent()) {
return JoinInputNode.InputType.STREAM;
} else if (sourceTableConfig.getTableDescriptor().get() instanceof
RemoteTableDescriptor ||
@@ -336,7 +348,7 @@ class JoinTranslator {
private Table getTable(JoinInputNode tableNode, TranslatorContext context) {
- SqlIOConfig sourceTableConfig =
resolveSourceConfigForTable(tableNode.getRelNode(), context);
+ SqlIOConfig sourceTableConfig =
resolveSQlIOForTable(tableNode.getRelNode(), context);
if (sourceTableConfig == null ||
!sourceTableConfig.getTableDescriptor().isPresent()) {
String errMsg = "Failed to resolve table source in join operation:
node=" + tableNode.getRelNode();
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 52fba62..26ba0ec 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -761,6 +761,37 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
+ public void testEndToEndStreamTableJoinWithSubQuery() throws Exception {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey as pageKey, p.address
as profileAddress, coalesce(null, 'N/A') as companyName"
+ + " from (SELECT * FROM (SELECT * from testavro.PAGEVIEW pv1 where
pv1.profileId=0) as pv2) as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
+ + (((GenericRecord) x.getMessage()).get("profileName") == null ?
"null" :
+ ((GenericRecord) x.getMessage()).get("profileName").toString()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, outMessages.size());
+ List<String> expectedOutMessages =
TestAvroSystemFactory.getPageKeyProfileNameJoin(1);
+ Assert.assertEquals(expectedOutMessages, outMessages);
+ }
+
+ @Test
public void testEndToEndStreamTableInnerJoinWithUdf() throws Exception {
int numMessages = 20;