Repository: calcite
Updated Branches:
  refs/heads/master 9c86556ff -> 937fc461a


[CALCITE-968] Stream-to-relation and stream-to-stream joins (Milinda Pathirage)

Rule to transform Delta(Scan(constant-table)) to Empty;
fix NullPointerException in PruneEmptyRules.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/e9d50602
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/e9d50602
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/e9d50602

Branch: refs/heads/master
Commit: e9d506021252e1da6c09cebad3f747cd0e627d90
Parents: 9c86556
Author: Milinda Pathirage <[email protected]>
Authored: Fri Nov 20 19:03:21 2015 -0500
Committer: Julian Hyde <[email protected]>
Committed: Mon Dec 7 14:22:02 2015 -0800

----------------------------------------------------------------------
 .../calcite/rel/rules/PruneEmptyRules.java      |   3 +-
 .../apache/calcite/rel/stream/StreamRules.java  |  75 ++++++++++++-
 .../apache/calcite/runtime/CalciteResource.java |   3 +
 .../calcite/sql/validate/SqlValidatorImpl.java  |  45 ++++++--
 .../calcite/runtime/CalciteResource.properties  |   1 +
 .../apache/calcite/sql/test/SqlAdvisorTest.java |   4 +-
 .../apache/calcite/test/MockCatalogReader.java  |  16 +++
 .../apache/calcite/test/SqlValidatorTest.java   |  15 +++
 .../org/apache/calcite/test/StreamTest.java     | 108 +++++++++++++++++++
 9 files changed, 260 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java 
b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
index 2f4cada..3fccdfa 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
@@ -72,7 +72,8 @@ public abstract class PruneEmptyRules {
         public void onMatch(RelOptRuleCall call) {
           LogicalUnion union = call.rel(0);
           final List<RelNode> childRels = call.getChildRels(union);
-          final List<RelNode> newChildRels = new ArrayList<RelNode>();
+          assert childRels != null;
+          final List<RelNode> newChildRels = new ArrayList<>();
           for (RelNode childRel : childRels) {
             if (!isEmpty(childRel)) {
               newChildRels.add(childRel);

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java 
b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 28b9972..4e64dc5 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -24,16 +24,19 @@ import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.Util;
@@ -56,7 +59,9 @@ public class StreamRules {
           new DeltaAggregateTransposeRule(),
           new DeltaSortTransposeRule(),
           new DeltaUnionTransposeRule(),
-          new DeltaTableScanRule());
+          new DeltaJoinTransposeRule(),
+          new DeltaTableScanRule(),
+          new DeltaTableScanToEmptyRule());
 
   /** Planner rule that pushes a {@link Delta} through a {@link Project}. */
   public static class DeltaProjectTransposeRule extends RelOptRule {
@@ -193,6 +198,74 @@ public class StreamRules {
       }
     }
   }
+
+  /**
+   * Planner rule that converts {@link Delta} over a {@link TableScan} of
+   * a table other than {@link org.apache.calcite.schema.StreamableTable} to 
Empty.
+   */
+  public static class DeltaTableScanToEmptyRule extends RelOptRule {
+    private DeltaTableScanToEmptyRule() {
+      super(
+          operand(Delta.class,
+              operand(TableScan.class, none())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      final TableScan scan = call.rel(1);
+      final RelOptCluster cluster = delta.getCluster();
+      final RelOptTable relOptTable = scan.getTable();
+      final StreamableTable streamableTable =
+          relOptTable.unwrap(StreamableTable.class);
+      if (streamableTable == null) {
+        call.transformTo(LogicalValues.createEmpty(cluster, 
delta.getRowType()));
+      }
+    }
+  }
+
+
+  /**
+   * Planner rule that pushes a {@link Delta} through a {@link Join}.
+   *
+   * Product rule [1] is applied to implement the transpose:
+   * stream(x join y)" = "x join stream(y) union all stream(x) join y
+   *
+   * [1] https://en.wikipedia.org/wiki/Product_rule
+   */
+  public static class DeltaJoinTransposeRule extends RelOptRule {
+
+    public DeltaJoinTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Join.class, any())));
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      final Join join = call.rel(1);
+      final RelOptCluster cluster = delta.getCluster();
+      RelNode left = join.getLeft();
+      RelNode right = join.getRight();
+
+      final LogicalDelta rightWithDelta = LogicalDelta.create(right);
+      final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, 
join.getCondition(),
+          join.getJoinType(), join.getVariablesStopped(), 
join.isSemiJoinDone(),
+          ImmutableList.copyOf(join.getSystemFieldList()));
+
+      final LogicalDelta leftWithDelta = LogicalDelta.create(left);
+      final LogicalJoin joinR = LogicalJoin.create(leftWithDelta, right, 
join.getCondition(),
+          join.getJoinType(), join.getVariablesStopped(), 
join.isSemiJoinDone(),
+          ImmutableList.copyOf(join.getSystemFieldList()));
+
+      List<RelNode> inputsToUnion = Lists.newArrayList();
+      inputsToUnion.add(joinL);
+      inputsToUnion.add(joinR);
+
+      final LogicalUnion newNode = LogicalUnion.create(inputsToUnion, true);
+      call.transformTo(newNode);
+    }
+  }
 }
 
 // End StreamRules.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java 
b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index aa701d9..c06f3c3 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -603,6 +603,9 @@ public interface CalciteResource {
 
   @BaseMessage("Table ''{0}'' not found")
   ExInst<CalciteException> tableNotFound(String tableName);
+
+  @BaseMessage("Cannot stream results of a query with no streaming inputs: 
''{0}''. At least one input should be convertable to a stream.")
+  ExInst<SqlValidatorException> 
cannotStreamResultsForNonStreamingInputs(String inputs);
 }
 
 // End CalciteResource.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java 
b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 0390c07..d430575 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -82,6 +82,7 @@ import org.apache.calcite.util.trace.CalciteTrace;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -3016,17 +3017,47 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
   public boolean validateModality(SqlSelect select, SqlModality modality,
       boolean fail) {
     final SelectScope scope = getRawSelectScope(select);
-    for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
-      if (!namespace.right.supportsModality(modality)) {
-        switch (modality) {
-        case STREAM:
+
+    switch (modality) {
+    case STREAM:
+      if (scope.children.size() == 1) {
+        for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+          if (!namespace.right.supportsModality(modality)) {
+            if (fail) {
+              throw newValidationError(namespace.right.getNode(),
+                  Static.RESOURCE.cannotConvertToStream(namespace.left));
+            } else {
+              return false;
+            }
+          }
+        }
+      } else {
+        boolean atLeastOneSupportsModality = false;
+        for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+          if (namespace.right.supportsModality(modality)) {
+            atLeastOneSupportsModality = true;
+          }
+        }
+
+        if (!atLeastOneSupportsModality) {
           if (fail) {
-            throw newValidationError(namespace.right.getNode(),
-                Static.RESOURCE.cannotConvertToStream(namespace.left));
+            List<String> inputList = new ArrayList<String>();
+            for (Pair<String, SqlValidatorNamespace> namespace : 
scope.children) {
+              inputList.add(namespace.left);
+            }
+            String inputs = Joiner.on(", ").join(inputList);
+
+            throw newValidationError(select,
+                
Static.RESOURCE.cannotStreamResultsForNonStreamingInputs(inputs));
           } else {
             return false;
           }
-        default:
+        }
+      }
+      break;
+    default:
+      for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+        if (!namespace.right.supportsModality(modality)) {
           if (fail) {
             throw newValidationError(namespace.right.getNode(),
                 Static.RESOURCE.cannotConvertToRelation(namespace.left));

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git 
a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties 
b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 9787ba9..2eb4947 100644
--- 
a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ 
b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -197,4 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must 
be based on a single
 MoreThanOneMappedColumn=View is not modifiable. More than one expression maps 
to column ''{0}'' of base table ''{1}''
 NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for 
NOT NULL column ''{0}'' of base table ''{1}''
 TableNotFound=Table ''{0}'' not found
+CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with 
no streaming inputs: ''{0}''. At least one input should be convertable to a 
stream.
 # End CalciteResource.properties

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java 
b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
index 1a71ec4..50019e1 100644
--- a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
+++ b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
@@ -72,7 +72,9 @@ public class SqlAdvisorTest extends SqlValidatorTestCase {
           "TABLE(CATALOG.SALES.BONUS)",
           "TABLE(CATALOG.SALES.ORDERS)",
           "TABLE(CATALOG.SALES.SALGRADE)",
-          "TABLE(CATALOG.SALES.SHIPMENTS)");
+          "TABLE(CATALOG.SALES.SHIPMENTS)",
+          "TABLE(CATALOG.SALES.PRODUCTS)",
+          "TABLE(CATALOG.SALES.SUPPLIERS)");
 
   private static final List<String> SCHEMAS =
       Arrays.asList(

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java 
b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
index 71b3115..b76b908 100644
--- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
@@ -250,6 +250,22 @@ public class MockCatalogReader implements 
Prepare.CatalogReader {
     shipmentsStream.addColumn("ORDERID", intType);
     registerTable(shipmentsStream);
 
+    // Register "PRODUCTS" table.
+    MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS",
+        false);
+    productsTable.addColumn("PRODUCTID", intType);
+    productsTable.addColumn("NAME", varchar20Type);
+    productsTable.addColumn("SUPPLIERID", intType);
+    registerTable(productsTable);
+
+    // Register "SUPPLIERS" table.
+    MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS",
+        false);
+    suppliersTable.addColumn("SUPPLIERID", intType);
+    suppliersTable.addColumn("NAME", varchar20Type);
+    suppliersTable.addColumn("CITY", intType);
+    registerTable(suppliersTable);
+
     // Register "EMP_20" view.
     // Same columns as "EMP",
     // but "DEPTNO" not visible and set to 20 by default

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java 
b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 026f41e..7ace229 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -132,6 +132,12 @@ public class SqlValidatorTest extends SqlValidatorTestCase 
{
     return "Cannot convert stream '" + table + "' to relation";
   }
 
+  private static String cannotStreamResultsForNonStreamingInputs(String 
inputs) {
+    return "Cannot stream results of a query with no streaming inputs: '"
+        + inputs
+        + "'. At least one input should be convertable to a stream.";
+  }
+
   @Test public void testMultipleSameAsPass() {
     check("select 1 as again,2 as \"again\", 3 as AGAiN from (values (true))");
   }
@@ -7415,6 +7421,15 @@ public class SqlValidatorTest extends 
SqlValidatorTestCase {
         + "order by floor(rowtime to hour), rowtime desc").ok();
   }
 
+  @Test public void testStreamJoin() {
+    sql("select stream \n"
+        + "orders.rowtime as rowtime, orders.orderId as orderId, 
products.supplierId as supplierId \n"
+        + "from orders join products on orders.productId = 
products.productId").ok();
+    sql("^select stream *\n"
+        + "from products join suppliers on products.supplierId = 
suppliers.supplierId^")
+        .fails(cannotStreamResultsForNonStreamingInputs("PRODUCTS, 
SUPPLIERS"));
+  }
+
   @Test public void testNew() {
     // (To debug individual statements, paste them into this method.)
     //            1         2         3         4         5         6

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java 
b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 269a2e5..90d67dd 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -59,6 +59,7 @@ import static org.junit.Assert.assertThat;
 public class StreamTest {
   public static final String STREAM_SCHEMA_NAME = "STREAMS";
   public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
+  public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS";
 
   private static String schemaFor(String name, Class<? extends TableFactory> 
clazz) {
     return "     {\n"
@@ -74,6 +75,27 @@ public class StreamTest {
       + "     }";
   }
 
+  private static final String STREAM_JOINS_MODEL = "{\n"
+      + "  version: '1.0',\n"
+      + "  defaultSchema: 'STREAMJOINS',\n"
+      + "   schemas: [\n"
+      + "     {\n"
+      + "       name: 'STREAMJOINS',\n"
+      + "       tables: [ {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'ORDERS',\n"
+      + "         stream: {\n"
+      + "           stream: true\n"
+      + "         },\n"
+      + "         factory: '" + OrdersStreamTableFactory.class.getName() + 
"'\n"
+      + "       }, \n"
+      + "       {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'PRODUCTS',\n"
+      + "         factory: '" + ProductsTableFactory.class.getName() + "'\n"
+      + "       }]\n"
+      + "     }]}";
+
   public static final String STREAM_MODEL = "{\n"
       + "  version: '1.0',\n"
       + "  defaultSchema: 'foodmart',\n"
@@ -212,6 +234,32 @@ public class StreamTest {
         .returnsCount(100);
   }
 
+  @Test public void testStreamToRelaitonJoin() {
+    CalciteAssert.model(STREAM_JOINS_MODEL)
+        .withDefaultSchema(STREAMJOINS_SCHEMA_NAME)
+        .query("select stream "
+            + "orders.rowtime as rowtime, orders.id as orderId, 
products.supplier as supplierId "
+            + "from orders join products on orders.product = products.id")
+        .convertContains(
+            "LogicalDelta\n"
+                + "  LogicalProject(ROWTIME=[$0], ORDERID=[$1], 
SUPPLIERID=[$5])\n"
+                + "    LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], 
UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
+                + "      LogicalJoin(condition=[=($4, $5)], 
joinType=[inner])\n"
+                + "        LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], 
UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE 
\"ISO-8859-1$en_US$primary\" NOT NULL])\n"
+                + "          LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n"
+                + "        LogicalTableScan(table=[[STREAMJOINS, 
PRODUCTS]])\n")
+        .explainContains(""
+            + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
+            + "    EnumerableCalc(expr#0..3=[{inputs}], 
expr#4=[CAST($t2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE 
\"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n"
+            + "      EnumerableInterpreter\n"
+            + "        BindableTableScan(table=[[]])\n"
+            + "    EnumerableInterpreter\n"
+            + "      BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])")
+        .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; 
SUPPLIERID=1",
+            "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
+            "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
+  }
+
   private Function<ResultSet, Void> startsWith(String... rows) {
     final ImmutableList<String> rowList = ImmutableList.copyOf(rows);
     return new Function<ResultSet, Void>() {
@@ -362,6 +410,66 @@ public class StreamTest {
       return this;
     }
   }
+
+  /**
+   * Mocks simple relation to use for stream joining test.
+   */
+  public static class ProductsTableFactory implements TableFactory<Table> {
+
+    public ProductsTableFactory(){}
+
+    @Override
+    public Table create(SchemaPlus schema, String name, Map<String, Object> 
operand,
+                        RelDataType rowType) {
+      final ImmutableList<Object[]> rows = ImmutableList.of(
+        new Object[]{"paint", 1},
+        new Object[]{"paper", 0},
+        new Object[]{"brush", 1}
+      );
+      return new ProductsTable(rows);
+    }
+  }
+
+  /**
+   * Table representing the PRODUCTS relation
+   */
+  public static class ProductsTable implements ScannableTable {
+
+    private final ImmutableList<Object[]> rows;
+
+    public ProductsTable(ImmutableList<Object[]> rows) {
+      this.rows = rows;
+    }
+
+    private final RelProtoDataType protoRowType = new RelProtoDataType() {
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder()
+            .add("ID", SqlTypeName.VARCHAR, 32)
+            .add("SUPPLIER", SqlTypeName.INTEGER)
+            .build();
+      }
+    };
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root) {
+      return Linq4j.asEnumerable(rows);
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return protoRowType.apply(typeFactory);
+    }
+
+    @Override
+    public Statistic getStatistic() {
+      return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of());
+    }
+
+    @Override
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+  }
 }
 
 // End StreamTest.java

Reply via email to