Fix up [CALCITE-968]
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/937fc461 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/937fc461 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/937fc461 Branch: refs/heads/master Commit: 937fc461a375818921e8147c2d23e26b7e8dfca0 Parents: e9d5060 Author: Julian Hyde <[email protected]> Authored: Sat Dec 5 14:36:02 2015 -0800 Committer: Julian Hyde <[email protected]> Committed: Mon Dec 7 14:23:57 2015 -0800 ---------------------------------------------------------------------- .../apache/calcite/rel/stream/StreamRules.java | 26 +++--- .../apache/calcite/runtime/CalciteResource.java | 2 +- .../calcite/sql/validate/SqlValidatorImpl.java | 6 +- .../calcite/runtime/CalciteResource.properties | 2 +- .../apache/calcite/test/MockCatalogReader.java | 4 +- .../apache/calcite/test/SqlValidatorTest.java | 2 +- .../org/apache/calcite/test/StreamTest.java | 84 +++++++++----------- 7 files changed, 60 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java index 4e64dc5..48e7bbf 100644 --- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java +++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java @@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.core.Values; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; @@ -36,9 +37,9 @@ import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSort; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.logical.LogicalUnion; -import org.apache.calcite.rel.logical.LogicalValues; import org.apache.calcite.schema.StreamableTable; import org.apache.calcite.schema.Table; +import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.Util; import com.google.common.collect.ImmutableList; @@ -201,7 +202,8 @@ public class StreamRules { /** * Planner rule that converts {@link Delta} over a {@link TableScan} of - * a table other than {@link org.apache.calcite.schema.StreamableTable} to Empty. + * a table other than {@link org.apache.calcite.schema.StreamableTable} to + * an empty {@link Values}. */ public static class DeltaTableScanToEmptyRule extends RelOptRule { private DeltaTableScanToEmptyRule() { @@ -213,24 +215,25 @@ public class StreamRules { @Override public void onMatch(RelOptRuleCall call) { final Delta delta = call.rel(0); final TableScan scan = call.rel(1); - final RelOptCluster cluster = delta.getCluster(); final RelOptTable relOptTable = scan.getTable(); final StreamableTable streamableTable = relOptTable.unwrap(StreamableTable.class); + final RelBuilder builder = call.builder(); if (streamableTable == null) { - call.transformTo(LogicalValues.createEmpty(cluster, delta.getRowType())); + call.transformTo(builder.values(delta.getRowType()).build()); } } } - /** * Planner rule that pushes a {@link Delta} through a {@link Join}. * - * Product rule [1] is applied to implement the transpose: - * stream(x join y)" = "x join stream(y) union all stream(x) join y + * <p>We apply something analogous to the + * <a href="https://en.wikipedia.org/wiki/Product_rule">product rule of + * differential calculus</a> to implement the transpose: * - * [1] https://en.wikipedia.org/wiki/Product_rule + * <blockquote><code>stream(x join y) → + * x join stream(y) union all stream(x) join y</code></blockquote> */ public static class DeltaJoinTransposeRule extends RelOptRule { @@ -240,13 +243,12 @@ public class StreamRules { operand(Join.class, any()))); } - @Override public void onMatch(RelOptRuleCall call) { final Delta delta = call.rel(0); + Util.discard(delta); final Join join = call.rel(1); - final RelOptCluster cluster = delta.getCluster(); - RelNode left = join.getLeft(); - RelNode right = join.getRight(); + final RelNode left = join.getLeft(); + final RelNode right = join.getRight(); final LogicalDelta rightWithDelta = LogicalDelta.create(right); final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, join.getCondition(), http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java index c06f3c3..6eabafd 100644 --- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java +++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java @@ -604,7 +604,7 @@ public interface CalciteResource { @BaseMessage("Table ''{0}'' not found") ExInst<CalciteException> tableNotFound(String tableName); - @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.") + @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertible to a stream") ExInst<SqlValidatorException> cannotStreamResultsForNonStreamingInputs(String inputs); } http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java index d430575..1218d5a 100644 --- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java +++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java @@ -3032,14 +3032,14 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { } } } else { - boolean atLeastOneSupportsModality = false; + int supportsModalityCount = 0; for (Pair<String, SqlValidatorNamespace> namespace : scope.children) { if (namespace.right.supportsModality(modality)) { - atLeastOneSupportsModality = true; + ++supportsModalityCount; } } - if (!atLeastOneSupportsModality) { + if (supportsModalityCount == 0) { if (fail) { List<String> inputList = new ArrayList<String>(); for (Pair<String, SqlValidatorNamespace> namespace : scope.children) { http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties index 2eb4947..7e97a51 100644 --- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties +++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties @@ -197,5 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must be based on a single MoreThanOneMappedColumn=View is not modifiable. More than one expression maps to column ''{0}'' of base table ''{1}'' NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for NOT NULL column ''{0}'' of base table ''{1}'' TableNotFound=Table ''{0}'' not found -CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream. +CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertible to a stream # End CalciteResource.properties http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java index b76b908..529df04 100644 --- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java +++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java @@ -252,7 +252,7 @@ public class MockCatalogReader implements Prepare.CatalogReader { // Register "PRODUCTS" table. MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS", - false); + false, 200D); productsTable.addColumn("PRODUCTID", intType); productsTable.addColumn("NAME", varchar20Type); productsTable.addColumn("SUPPLIERID", intType); @@ -260,7 +260,7 @@ public class MockCatalogReader implements Prepare.CatalogReader { // Register "SUPPLIERS" table. MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS", - false); + false, 10D); suppliersTable.addColumn("SUPPLIERID", intType); suppliersTable.addColumn("NAME", varchar20Type); suppliersTable.addColumn("CITY", intType); http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java index 7ace229..64d1c56 100644 --- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java +++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java @@ -135,7 +135,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase { private static String cannotStreamResultsForNonStreamingInputs(String inputs) { return "Cannot stream results of a query with no streaming inputs: '" + inputs - + "'. At least one input should be convertable to a stream."; + + "'. At least one input should be convertible to a stream"; } @Test public void testMultipleSameAsPass() { http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/StreamTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java index 90d67dd..649db3d 100644 --- a/core/src/test/java/org/apache/calcite/test/StreamTest.java +++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java @@ -59,7 +59,7 @@ import static org.junit.Assert.assertThat; public class StreamTest { public static final String STREAM_SCHEMA_NAME = "STREAMS"; public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS"; - public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS"; + public static final String STREAM_JOINS_SCHEMA_NAME = "STREAM_JOINS"; private static String schemaFor(String name, Class<? extends TableFactory> clazz) { return " {\n" @@ -77,10 +77,10 @@ public class StreamTest { private static final String STREAM_JOINS_MODEL = "{\n" + " version: '1.0',\n" - + " defaultSchema: 'STREAMJOINS',\n" + + " defaultSchema: 'STREAM_JOINS',\n" + " schemas: [\n" + " {\n" - + " name: 'STREAMJOINS',\n" + + " name: 'STREAM_JOINS',\n" + " tables: [ {\n" + " type: 'custom',\n" + " name: 'ORDERS',\n" @@ -234,30 +234,31 @@ public class StreamTest { .returnsCount(100); } - @Test public void testStreamToRelaitonJoin() { + @Test public void testStreamToRelationJoin() { CalciteAssert.model(STREAM_JOINS_MODEL) - .withDefaultSchema(STREAMJOINS_SCHEMA_NAME) + .withDefaultSchema(STREAM_JOINS_SCHEMA_NAME) .query("select stream " + "orders.rowtime as rowtime, orders.id as orderId, products.supplier as supplierId " + "from orders join products on orders.product = products.id") - .convertContains( - "LogicalDelta\n" - + " LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n" - + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n" - + " LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n" - + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n" - + " LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n" - + " LogicalTableScan(table=[[STREAMJOINS, PRODUCTS]])\n") + .convertContains("LogicalDelta\n" + + " LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n" + + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n" + + " LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n" + + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n" + + " LogicalTableScan(table=[[STREAM_JOINS, ORDERS]])\n" + + " LogicalTableScan(table=[[STREAM_JOINS, PRODUCTS]])\n") .explainContains("" - + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n" + + "EnumerableCalc(expr#0..6=[{inputs}], proj#0..1=[{exprs}], SUPPLIERID=[$t6])\n" + + " EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n" + " EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n" + " EnumerableInterpreter\n" + " BindableTableScan(table=[[]])\n" + " EnumerableInterpreter\n" - + " BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])") - .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1", - "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0", - "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1")); + + " BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]])") + .returns( + startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1", + "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0", + "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1")); } private Function<ResultSet, Void> startsWith(String... rows) { @@ -324,14 +325,14 @@ public class StreamTest { public Table create(SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) { - final ImmutableList<Object[]> rows = ImmutableList.of( - new Object[] {ts(10, 15, 0), 1, "paint", 10}, - new Object[] {ts(10, 24, 15), 2, "paper", 5}, - new Object[] {ts(10, 24, 45), 3, "brush", 12}, - new Object[] {ts(10, 58, 0), 4, "paint", 3}, - new Object[] {ts(11, 10, 0), 5, "paint", 3}); - - return new OrdersTable(rows); + final Object[][] rows = { + {ts(10, 15, 0), 1, "paint", 10}, + {ts(10, 24, 15), 2, "paper", 5}, + {ts(10, 24, 45), 3, "brush", 12}, + {ts(10, 58, 0), 4, "paint", 3}, + {ts(11, 10, 0), 5, "paint", 3} + }; + return new OrdersTable(ImmutableList.copyOf(rows)); } private Object ts(int h, int m, int s) { @@ -406,35 +407,30 @@ public class StreamTest { }); } - @Override public Table stream() { + public Table stream() { return this; } } /** - * Mocks simple relation to use for stream joining test. + * Mocks a simple relation to use for stream joining test. */ public static class ProductsTableFactory implements TableFactory<Table> { - - public ProductsTableFactory(){} - - @Override - public Table create(SchemaPlus schema, String name, Map<String, Object> operand, - RelDataType rowType) { - final ImmutableList<Object[]> rows = ImmutableList.of( - new Object[]{"paint", 1}, - new Object[]{"paper", 0}, - new Object[]{"brush", 1} - ); - return new ProductsTable(rows); + public Table create(SchemaPlus schema, String name, + Map<String, Object> operand, RelDataType rowType) { + final Object[][] rows = { + {"paint", 1}, + {"paper", 0}, + {"brush", 1} + }; + return new ProductsTable(ImmutableList.copyOf(rows)); } } /** - * Table representing the PRODUCTS relation + * Table representing the PRODUCTS relation. */ public static class ProductsTable implements ScannableTable { - private final ImmutableList<Object[]> rows; public ProductsTable(ImmutableList<Object[]> rows) { @@ -450,22 +446,18 @@ public class StreamTest { } }; - @Override public Enumerable<Object[]> scan(DataContext root) { return Linq4j.asEnumerable(rows); } - @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { return protoRowType.apply(typeFactory); } - @Override public Statistic getStatistic() { return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of()); } - @Override public Schema.TableType getJdbcTableType() { return Schema.TableType.TABLE; }
