Repository: calcite Updated Branches: refs/heads/master 9c86556ff -> 937fc461a
[CALCITE-968] Stream-to-relation and stream-to-stream joins (Milinda Pathirage) Rule to transform Delta(Scan(constant-table)) to Empty; fix NullPointerException in PruneEmptyRules. Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/e9d50602 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/e9d50602 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/e9d50602 Branch: refs/heads/master Commit: e9d506021252e1da6c09cebad3f747cd0e627d90 Parents: 9c86556 Author: Milinda Pathirage <[email protected]> Authored: Fri Nov 20 19:03:21 2015 -0500 Committer: Julian Hyde <[email protected]> Committed: Mon Dec 7 14:22:02 2015 -0800 ---------------------------------------------------------------------- .../calcite/rel/rules/PruneEmptyRules.java | 3 +- .../apache/calcite/rel/stream/StreamRules.java | 75 ++++++++++++- .../apache/calcite/runtime/CalciteResource.java | 3 + .../calcite/sql/validate/SqlValidatorImpl.java | 45 ++++++-- .../calcite/runtime/CalciteResource.properties | 1 + .../apache/calcite/sql/test/SqlAdvisorTest.java | 4 +- .../apache/calcite/test/MockCatalogReader.java | 16 +++ .../apache/calcite/test/SqlValidatorTest.java | 15 +++ .../org/apache/calcite/test/StreamTest.java | 108 +++++++++++++++++++ 9 files changed, 260 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java index 2f4cada..3fccdfa 100644 --- a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java +++ b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java @@ -72,7 +72,8 @@ public abstract class PruneEmptyRules { public void onMatch(RelOptRuleCall call) { LogicalUnion union = call.rel(0); final List<RelNode> childRels = call.getChildRels(union); - final List<RelNode> newChildRels = new ArrayList<RelNode>(); + assert childRels != null; + final List<RelNode> newChildRels = new ArrayList<>(); for (RelNode childRel : childRels) { if (!isEmpty(childRel)) { newChildRels.add(childRel); http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java index 28b9972..4e64dc5 100644 --- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java +++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java @@ -24,16 +24,19 @@ import org.apache.calcite.prepare.RelOptTableImpl; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.Union; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rel.logical.LogicalValues; import org.apache.calcite.schema.StreamableTable; import org.apache.calcite.schema.Table; import org.apache.calcite.util.Util; @@ -56,7 +59,9 @@ public class StreamRules { new DeltaAggregateTransposeRule(), new DeltaSortTransposeRule(), new DeltaUnionTransposeRule(), - new DeltaTableScanRule()); + new DeltaJoinTransposeRule(), + new DeltaTableScanRule(), + new DeltaTableScanToEmptyRule()); /** Planner rule that pushes a {@link Delta} through a {@link Project}. */ public static class DeltaProjectTransposeRule extends RelOptRule { @@ -193,6 +198,74 @@ public class StreamRules { } } } + + /** + * Planner rule that converts {@link Delta} over a {@link TableScan} of + * a table other than {@link org.apache.calcite.schema.StreamableTable} to Empty. + */ + public static class DeltaTableScanToEmptyRule extends RelOptRule { + private DeltaTableScanToEmptyRule() { + super( + operand(Delta.class, + operand(TableScan.class, none()))); + } + + @Override public void onMatch(RelOptRuleCall call) { + final Delta delta = call.rel(0); + final TableScan scan = call.rel(1); + final RelOptCluster cluster = delta.getCluster(); + final RelOptTable relOptTable = scan.getTable(); + final StreamableTable streamableTable = + relOptTable.unwrap(StreamableTable.class); + if (streamableTable == null) { + call.transformTo(LogicalValues.createEmpty(cluster, delta.getRowType())); + } + } + } + + + /** + * Planner rule that pushes a {@link Delta} through a {@link Join}. + * + * Product rule [1] is applied to implement the transpose: + * stream(x join y)" = "x join stream(y) union all stream(x) join y + * + * [1] https://en.wikipedia.org/wiki/Product_rule + */ + public static class DeltaJoinTransposeRule extends RelOptRule { + + public DeltaJoinTransposeRule() { + super( + operand(Delta.class, + operand(Join.class, any()))); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Delta delta = call.rel(0); + final Join join = call.rel(1); + final RelOptCluster cluster = delta.getCluster(); + RelNode left = join.getLeft(); + RelNode right = join.getRight(); + + final LogicalDelta rightWithDelta = LogicalDelta.create(right); + final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, join.getCondition(), + join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(), + ImmutableList.copyOf(join.getSystemFieldList())); + + final LogicalDelta leftWithDelta = LogicalDelta.create(left); + final LogicalJoin joinR = LogicalJoin.create(leftWithDelta, right, join.getCondition(), + join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(), + ImmutableList.copyOf(join.getSystemFieldList())); + + List<RelNode> inputsToUnion = Lists.newArrayList(); + inputsToUnion.add(joinL); + inputsToUnion.add(joinR); + + final LogicalUnion newNode = LogicalUnion.create(inputsToUnion, true); + call.transformTo(newNode); + } + } } // End StreamRules.java http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java index aa701d9..c06f3c3 100644 --- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java +++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java @@ -603,6 +603,9 @@ public interface CalciteResource { @BaseMessage("Table ''{0}'' not found") ExInst<CalciteException> tableNotFound(String tableName); + + @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.") + ExInst<SqlValidatorException> cannotStreamResultsForNonStreamingInputs(String inputs); } // End CalciteResource.java http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java index 0390c07..d430575 100644 --- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java +++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java @@ -82,6 +82,7 @@ import org.apache.calcite.util.trace.CalciteTrace; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -3016,17 +3017,47 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { public boolean validateModality(SqlSelect select, SqlModality modality, boolean fail) { final SelectScope scope = getRawSelectScope(select); - for (Pair<String, SqlValidatorNamespace> namespace : scope.children) { - if (!namespace.right.supportsModality(modality)) { - switch (modality) { - case STREAM: + + switch (modality) { + case STREAM: + if (scope.children.size() == 1) { + for (Pair<String, SqlValidatorNamespace> namespace : scope.children) { + if (!namespace.right.supportsModality(modality)) { + if (fail) { + throw newValidationError(namespace.right.getNode(), + Static.RESOURCE.cannotConvertToStream(namespace.left)); + } else { + return false; + } + } + } + } else { + boolean atLeastOneSupportsModality = false; + for (Pair<String, SqlValidatorNamespace> namespace : scope.children) { + if (namespace.right.supportsModality(modality)) { + atLeastOneSupportsModality = true; + } + } + + if (!atLeastOneSupportsModality) { if (fail) { - throw newValidationError(namespace.right.getNode(), - Static.RESOURCE.cannotConvertToStream(namespace.left)); + List<String> inputList = new ArrayList<String>(); + for (Pair<String, SqlValidatorNamespace> namespace : scope.children) { + inputList.add(namespace.left); + } + String inputs = Joiner.on(", ").join(inputList); + + throw newValidationError(select, + Static.RESOURCE.cannotStreamResultsForNonStreamingInputs(inputs)); } else { return false; } - default: + } + } + break; + default: + for (Pair<String, SqlValidatorNamespace> namespace : scope.children) { + if (!namespace.right.supportsModality(modality)) { if (fail) { throw newValidationError(namespace.right.getNode(), Static.RESOURCE.cannotConvertToRelation(namespace.left)); http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties index 9787ba9..2eb4947 100644 --- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties +++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties @@ -197,4 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must be based on a single MoreThanOneMappedColumn=View is not modifiable. More than one expression maps to column ''{0}'' of base table ''{1}'' NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for NOT NULL column ''{0}'' of base table ''{1}'' TableNotFound=Table ''{0}'' not found +CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream. # End CalciteResource.properties http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java index 1a71ec4..50019e1 100644 --- a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java +++ b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java @@ -72,7 +72,9 @@ public class SqlAdvisorTest extends SqlValidatorTestCase { "TABLE(CATALOG.SALES.BONUS)", "TABLE(CATALOG.SALES.ORDERS)", "TABLE(CATALOG.SALES.SALGRADE)", - "TABLE(CATALOG.SALES.SHIPMENTS)"); + "TABLE(CATALOG.SALES.SHIPMENTS)", + "TABLE(CATALOG.SALES.PRODUCTS)", + "TABLE(CATALOG.SALES.SUPPLIERS)"); private static final List<String> SCHEMAS = Arrays.asList( http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java index 71b3115..b76b908 100644 --- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java +++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java @@ -250,6 +250,22 @@ public class MockCatalogReader implements Prepare.CatalogReader { shipmentsStream.addColumn("ORDERID", intType); registerTable(shipmentsStream); + // Register "PRODUCTS" table. + MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS", + false); + productsTable.addColumn("PRODUCTID", intType); + productsTable.addColumn("NAME", varchar20Type); + productsTable.addColumn("SUPPLIERID", intType); + registerTable(productsTable); + + // Register "SUPPLIERS" table. + MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS", + false); + suppliersTable.addColumn("SUPPLIERID", intType); + suppliersTable.addColumn("NAME", varchar20Type); + suppliersTable.addColumn("CITY", intType); + registerTable(suppliersTable); + // Register "EMP_20" view. // Same columns as "EMP", // but "DEPTNO" not visible and set to 20 by default http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java index 026f41e..7ace229 100644 --- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java +++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java @@ -132,6 +132,12 @@ public class SqlValidatorTest extends SqlValidatorTestCase { return "Cannot convert stream '" + table + "' to relation"; } + private static String cannotStreamResultsForNonStreamingInputs(String inputs) { + return "Cannot stream results of a query with no streaming inputs: '" + + inputs + + "'. At least one input should be convertable to a stream."; + } + @Test public void testMultipleSameAsPass() { check("select 1 as again,2 as \"again\", 3 as AGAiN from (values (true))"); } @@ -7415,6 +7421,15 @@ public class SqlValidatorTest extends SqlValidatorTestCase { + "order by floor(rowtime to hour), rowtime desc").ok(); } + @Test public void testStreamJoin() { + sql("select stream \n" + + "orders.rowtime as rowtime, orders.orderId as orderId, products.supplierId as supplierId \n" + + "from orders join products on orders.productId = products.productId").ok(); + sql("^select stream *\n" + + "from products join suppliers on products.supplierId = suppliers.supplierId^") + .fails(cannotStreamResultsForNonStreamingInputs("PRODUCTS, SUPPLIERS")); + } + @Test public void testNew() { // (To debug individual statements, paste them into this method.) // 1 2 3 4 5 6 http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/StreamTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java index 269a2e5..90d67dd 100644 --- a/core/src/test/java/org/apache/calcite/test/StreamTest.java +++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java @@ -59,6 +59,7 @@ import static org.junit.Assert.assertThat; public class StreamTest { public static final String STREAM_SCHEMA_NAME = "STREAMS"; public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS"; + public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS"; private static String schemaFor(String name, Class<? extends TableFactory> clazz) { return " {\n" @@ -74,6 +75,27 @@ public class StreamTest { + " }"; } + private static final String STREAM_JOINS_MODEL = "{\n" + + " version: '1.0',\n" + + " defaultSchema: 'STREAMJOINS',\n" + + " schemas: [\n" + + " {\n" + + " name: 'STREAMJOINS',\n" + + " tables: [ {\n" + + " type: 'custom',\n" + + " name: 'ORDERS',\n" + + " stream: {\n" + + " stream: true\n" + + " },\n" + + " factory: '" + OrdersStreamTableFactory.class.getName() + "'\n" + + " }, \n" + + " {\n" + + " type: 'custom',\n" + + " name: 'PRODUCTS',\n" + + " factory: '" + ProductsTableFactory.class.getName() + "'\n" + + " }]\n" + + " }]}"; + public static final String STREAM_MODEL = "{\n" + " version: '1.0',\n" + " defaultSchema: 'foodmart',\n" @@ -212,6 +234,32 @@ public class StreamTest { .returnsCount(100); } + @Test public void testStreamToRelaitonJoin() { + CalciteAssert.model(STREAM_JOINS_MODEL) + .withDefaultSchema(STREAMJOINS_SCHEMA_NAME) + .query("select stream " + + "orders.rowtime as rowtime, orders.id as orderId, products.supplier as supplierId " + + "from orders join products on orders.product = products.id") + .convertContains( + "LogicalDelta\n" + + " LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n" + + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n" + + " LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n" + + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n" + + " LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n" + + " LogicalTableScan(table=[[STREAMJOINS, PRODUCTS]])\n") + .explainContains("" + + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n" + + " EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n" + + " EnumerableInterpreter\n" + + " BindableTableScan(table=[[]])\n" + + " EnumerableInterpreter\n" + + " BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])") + .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1", + "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0", + "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1")); + } + private Function<ResultSet, Void> startsWith(String... rows) { final ImmutableList<String> rowList = ImmutableList.copyOf(rows); return new Function<ResultSet, Void>() { @@ -362,6 +410,66 @@ public class StreamTest { return this; } } + + /** + * Mocks simple relation to use for stream joining test. + */ + public static class ProductsTableFactory implements TableFactory<Table> { + + public ProductsTableFactory(){} + + @Override + public Table create(SchemaPlus schema, String name, Map<String, Object> operand, + RelDataType rowType) { + final ImmutableList<Object[]> rows = ImmutableList.of( + new Object[]{"paint", 1}, + new Object[]{"paper", 0}, + new Object[]{"brush", 1} + ); + return new ProductsTable(rows); + } + } + + /** + * Table representing the PRODUCTS relation + */ + public static class ProductsTable implements ScannableTable { + + private final ImmutableList<Object[]> rows; + + public ProductsTable(ImmutableList<Object[]> rows) { + this.rows = rows; + } + + private final RelProtoDataType protoRowType = new RelProtoDataType() { + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder() + .add("ID", SqlTypeName.VARCHAR, 32) + .add("SUPPLIER", SqlTypeName.INTEGER) + .build(); + } + }; + + @Override + public Enumerable<Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of()); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + } } // End StreamTest.java
