Fix up [CALCITE-968]

Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/937fc461
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/937fc461
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/937fc461

Branch: refs/heads/master
Commit: 937fc461a375818921e8147c2d23e26b7e8dfca0
Parents: e9d5060
Author: Julian Hyde <[email protected]>
Authored: Sat Dec 5 14:36:02 2015 -0800
Committer: Julian Hyde <[email protected]>
Committed: Mon Dec 7 14:23:57 2015 -0800

----------------------------------------------------------------------
 .../apache/calcite/rel/stream/StreamRules.java  | 26 +++---
 .../apache/calcite/runtime/CalciteResource.java |  2 +-
 .../calcite/sql/validate/SqlValidatorImpl.java  |  6 +-
 .../calcite/runtime/CalciteResource.properties  |  2 +-
 .../apache/calcite/test/MockCatalogReader.java  |  4 +-
 .../apache/calcite/test/SqlValidatorTest.java   |  2 +-
 .../org/apache/calcite/test/StreamTest.java     | 84 +++++++++-----------
 7 files changed, 60 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java 
b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 4e64dc5..48e7bbf 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.core.Values;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -36,9 +37,9 @@ import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalUnion;
-import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
@@ -201,7 +202,8 @@ public class StreamRules {
 
   /**
    * Planner rule that converts {@link Delta} over a {@link TableScan} of
-   * a table other than {@link org.apache.calcite.schema.StreamableTable} to 
Empty.
+   * a table other than {@link org.apache.calcite.schema.StreamableTable} to
+   * an empty {@link Values}.
    */
   public static class DeltaTableScanToEmptyRule extends RelOptRule {
     private DeltaTableScanToEmptyRule() {
@@ -213,24 +215,25 @@ public class StreamRules {
     @Override public void onMatch(RelOptRuleCall call) {
       final Delta delta = call.rel(0);
       final TableScan scan = call.rel(1);
-      final RelOptCluster cluster = delta.getCluster();
       final RelOptTable relOptTable = scan.getTable();
       final StreamableTable streamableTable =
           relOptTable.unwrap(StreamableTable.class);
+      final RelBuilder builder = call.builder();
       if (streamableTable == null) {
-        call.transformTo(LogicalValues.createEmpty(cluster, 
delta.getRowType()));
+        call.transformTo(builder.values(delta.getRowType()).build());
       }
     }
   }
 
-
   /**
    * Planner rule that pushes a {@link Delta} through a {@link Join}.
    *
-   * Product rule [1] is applied to implement the transpose:
-   * stream(x join y)" = "x join stream(y) union all stream(x) join y
+   * <p>We apply something analogous to the
+   * <a href="https://en.wikipedia.org/wiki/Product_rule";>product rule of
+   * differential calculus</a> to implement the transpose:
    *
-   * [1] https://en.wikipedia.org/wiki/Product_rule
+   * <blockquote><code>stream(x join y) &rarr;
+   * x join stream(y) union all stream(x) join y</code></blockquote>
    */
   public static class DeltaJoinTransposeRule extends RelOptRule {
 
@@ -240,13 +243,12 @@ public class StreamRules {
               operand(Join.class, any())));
     }
 
-    @Override
     public void onMatch(RelOptRuleCall call) {
       final Delta delta = call.rel(0);
+      Util.discard(delta);
       final Join join = call.rel(1);
-      final RelOptCluster cluster = delta.getCluster();
-      RelNode left = join.getLeft();
-      RelNode right = join.getRight();
+      final RelNode left = join.getLeft();
+      final RelNode right = join.getRight();
 
       final LogicalDelta rightWithDelta = LogicalDelta.create(right);
       final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, 
join.getCondition(),

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java 
b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index c06f3c3..6eabafd 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -604,7 +604,7 @@ public interface CalciteResource {
   @BaseMessage("Table ''{0}'' not found")
   ExInst<CalciteException> tableNotFound(String tableName);
 
-  @BaseMessage("Cannot stream results of a query with no streaming inputs: 
''{0}''. At least one input should be convertable to a stream.")
+  @BaseMessage("Cannot stream results of a query with no streaming inputs: 
''{0}''. At least one input should be convertible to a stream")
   ExInst<SqlValidatorException> 
cannotStreamResultsForNonStreamingInputs(String inputs);
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java 
b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index d430575..1218d5a 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -3032,14 +3032,14 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
           }
         }
       } else {
-        boolean atLeastOneSupportsModality = false;
+        int supportsModalityCount = 0;
         for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
           if (namespace.right.supportsModality(modality)) {
-            atLeastOneSupportsModality = true;
+            ++supportsModalityCount;
           }
         }
 
-        if (!atLeastOneSupportsModality) {
+        if (supportsModalityCount == 0) {
           if (fail) {
             List<String> inputList = new ArrayList<String>();
             for (Pair<String, SqlValidatorNamespace> namespace : 
scope.children) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git 
a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties 
b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 2eb4947..7e97a51 100644
--- 
a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ 
b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -197,5 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must 
be based on a single
 MoreThanOneMappedColumn=View is not modifiable. More than one expression maps 
to column ''{0}'' of base table ''{1}''
 NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for 
NOT NULL column ''{0}'' of base table ''{1}''
 TableNotFound=Table ''{0}'' not found
-CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with 
no streaming inputs: ''{0}''. At least one input should be convertable to a 
stream.
+CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with 
no streaming inputs: ''{0}''. At least one input should be convertible to a 
stream
 # End CalciteResource.properties

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java 
b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
index b76b908..529df04 100644
--- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
@@ -252,7 +252,7 @@ public class MockCatalogReader implements 
Prepare.CatalogReader {
 
     // Register "PRODUCTS" table.
     MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS",
-        false);
+        false, 200D);
     productsTable.addColumn("PRODUCTID", intType);
     productsTable.addColumn("NAME", varchar20Type);
     productsTable.addColumn("SUPPLIERID", intType);
@@ -260,7 +260,7 @@ public class MockCatalogReader implements 
Prepare.CatalogReader {
 
     // Register "SUPPLIERS" table.
     MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS",
-        false);
+        false, 10D);
     suppliersTable.addColumn("SUPPLIERID", intType);
     suppliersTable.addColumn("NAME", varchar20Type);
     suppliersTable.addColumn("CITY", intType);

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java 
b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 7ace229..64d1c56 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -135,7 +135,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
   private static String cannotStreamResultsForNonStreamingInputs(String 
inputs) {
     return "Cannot stream results of a query with no streaming inputs: '"
         + inputs
-        + "'. At least one input should be convertable to a stream.";
+        + "'. At least one input should be convertible to a stream";
   }
 
   @Test public void testMultipleSameAsPass() {

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java 
b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 90d67dd..649db3d 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -59,7 +59,7 @@ import static org.junit.Assert.assertThat;
 public class StreamTest {
   public static final String STREAM_SCHEMA_NAME = "STREAMS";
   public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
-  public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS";
+  public static final String STREAM_JOINS_SCHEMA_NAME = "STREAM_JOINS";
 
   private static String schemaFor(String name, Class<? extends TableFactory> 
clazz) {
     return "     {\n"
@@ -77,10 +77,10 @@ public class StreamTest {
 
   private static final String STREAM_JOINS_MODEL = "{\n"
       + "  version: '1.0',\n"
-      + "  defaultSchema: 'STREAMJOINS',\n"
+      + "  defaultSchema: 'STREAM_JOINS',\n"
       + "   schemas: [\n"
       + "     {\n"
-      + "       name: 'STREAMJOINS',\n"
+      + "       name: 'STREAM_JOINS',\n"
       + "       tables: [ {\n"
       + "         type: 'custom',\n"
       + "         name: 'ORDERS',\n"
@@ -234,30 +234,31 @@ public class StreamTest {
         .returnsCount(100);
   }
 
-  @Test public void testStreamToRelaitonJoin() {
+  @Test public void testStreamToRelationJoin() {
     CalciteAssert.model(STREAM_JOINS_MODEL)
-        .withDefaultSchema(STREAMJOINS_SCHEMA_NAME)
+        .withDefaultSchema(STREAM_JOINS_SCHEMA_NAME)
         .query("select stream "
             + "orders.rowtime as rowtime, orders.id as orderId, 
products.supplier as supplierId "
             + "from orders join products on orders.product = products.id")
-        .convertContains(
-            "LogicalDelta\n"
-                + "  LogicalProject(ROWTIME=[$0], ORDERID=[$1], 
SUPPLIERID=[$5])\n"
-                + "    LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], 
UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
-                + "      LogicalJoin(condition=[=($4, $5)], 
joinType=[inner])\n"
-                + "        LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], 
UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE 
\"ISO-8859-1$en_US$primary\" NOT NULL])\n"
-                + "          LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n"
-                + "        LogicalTableScan(table=[[STREAMJOINS, 
PRODUCTS]])\n")
+        .convertContains("LogicalDelta\n"
+            + "  LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n"
+            + "    LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], 
UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
+            + "      LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n"
+            + "        LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], 
UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE 
\"ISO-8859-1$en_US$primary\" NOT NULL])\n"
+            + "          LogicalTableScan(table=[[STREAM_JOINS, ORDERS]])\n"
+            + "        LogicalTableScan(table=[[STREAM_JOINS, PRODUCTS]])\n")
         .explainContains(""
-            + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
+            + "EnumerableCalc(expr#0..6=[{inputs}], proj#0..1=[{exprs}], 
SUPPLIERID=[$t6])\n"
+            + "  EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
             + "    EnumerableCalc(expr#0..3=[{inputs}], 
expr#4=[CAST($t2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE 
\"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n"
             + "      EnumerableInterpreter\n"
             + "        BindableTableScan(table=[[]])\n"
             + "    EnumerableInterpreter\n"
-            + "      BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])")
-        .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; 
SUPPLIERID=1",
-            "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
-            "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
+            + "      BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]])")
+        .returns(
+            startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1",
+                "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
+                "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
   }
 
   private Function<ResultSet, Void> startsWith(String... rows) {
@@ -324,14 +325,14 @@ public class StreamTest {
 
     public Table create(SchemaPlus schema, String name,
         Map<String, Object> operand, RelDataType rowType) {
-      final ImmutableList<Object[]> rows = ImmutableList.of(
-          new Object[] {ts(10, 15, 0), 1, "paint", 10},
-          new Object[] {ts(10, 24, 15), 2, "paper", 5},
-          new Object[] {ts(10, 24, 45), 3, "brush", 12},
-          new Object[] {ts(10, 58, 0), 4, "paint", 3},
-          new Object[] {ts(11, 10, 0), 5, "paint", 3});
-
-      return new OrdersTable(rows);
+      final Object[][] rows = {
+        {ts(10, 15, 0), 1, "paint", 10},
+        {ts(10, 24, 15), 2, "paper", 5},
+        {ts(10, 24, 45), 3, "brush", 12},
+        {ts(10, 58, 0), 4, "paint", 3},
+        {ts(11, 10, 0), 5, "paint", 3}
+      };
+      return new OrdersTable(ImmutableList.copyOf(rows));
     }
 
     private Object ts(int h, int m, int s) {
@@ -406,35 +407,30 @@ public class StreamTest {
       });
     }
 
-    @Override public Table stream() {
+    public Table stream() {
       return this;
     }
   }
 
   /**
-   * Mocks simple relation to use for stream joining test.
+   * Mocks a simple relation to use for stream joining test.
    */
   public static class ProductsTableFactory implements TableFactory<Table> {
-
-    public ProductsTableFactory(){}
-
-    @Override
-    public Table create(SchemaPlus schema, String name, Map<String, Object> 
operand,
-                        RelDataType rowType) {
-      final ImmutableList<Object[]> rows = ImmutableList.of(
-        new Object[]{"paint", 1},
-        new Object[]{"paper", 0},
-        new Object[]{"brush", 1}
-      );
-      return new ProductsTable(rows);
+    public Table create(SchemaPlus schema, String name,
+        Map<String, Object> operand, RelDataType rowType) {
+      final Object[][] rows = {
+        {"paint", 1},
+        {"paper", 0},
+        {"brush", 1}
+      };
+      return new ProductsTable(ImmutableList.copyOf(rows));
     }
   }
 
   /**
-   * Table representing the PRODUCTS relation
+   * Table representing the PRODUCTS relation.
    */
   public static class ProductsTable implements ScannableTable {
-
     private final ImmutableList<Object[]> rows;
 
     public ProductsTable(ImmutableList<Object[]> rows) {
@@ -450,22 +446,18 @@ public class StreamTest {
       }
     };
 
-    @Override
     public Enumerable<Object[]> scan(DataContext root) {
       return Linq4j.asEnumerable(rows);
     }
 
-    @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
       return protoRowType.apply(typeFactory);
     }
 
-    @Override
     public Statistic getStatistic() {
       return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of());
     }
 
-    @Override
     public Schema.TableType getJdbcTableType() {
       return Schema.TableType.TABLE;
     }

Reply via email to