This is an automated email from the ASF dual-hosted git repository.

jhyde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git

commit 3ba81e2fda4a66fcc6acdd24a8688c8292bbfcff
Author: Jark Wu <[email protected]>
AuthorDate: Tue Dec 12 20:02:30 2017 +0800

    [CALCITE-1912] Temporal tables, "FOR SYSTEM_TIME AS OF" SQL clause, and add 
Snapshot relational operator (Jark Wu)
    
    Close apache/calcite#759
---
 core/src/main/codegen/templates/Parser.jj          |  47 ++++---
 .../apache/calcite/prepare/RelOptTableImpl.java    |   5 +
 .../org/apache/calcite/rel/core/RelFactories.java  |  28 ++++-
 .../java/org/apache/calcite/rel/core/Snapshot.java | 110 +++++++++++++++++
 .../calcite/rel/logical/LogicalSnapshot.java       |  74 +++++++++++
 .../calcite/rel/metadata/RelMdCollation.java       |   6 +
 .../calcite/rel/metadata/RelMdDistribution.java    |   7 ++
 .../apache/calcite/runtime/CalciteResource.java    |   6 +
 .../java/org/apache/calcite/schema/Schema.java     |   7 ++
 .../org/apache/calcite/schema/TemporalTable.java   |  35 ++++++
 .../main/java/org/apache/calcite/sql/SqlKind.java  |   6 +
 .../org/apache/calcite/sql/SqlLateralOperator.java |   3 +-
 .../java/org/apache/calcite/sql/SqlSnapshot.java   | 135 +++++++++++++++++++++
 .../calcite/sql/validate/SqlValidatorImpl.java     |  37 ++++++
 .../calcite/sql/validate/SqlValidatorTable.java    |   5 +
 .../sql2rel/RelStructuredTypeFlattener.java        |   9 ++
 .../apache/calcite/sql2rel/SqlToRelConverter.java  |  20 +++
 .../java/org/apache/calcite/tools/RelBuilder.java  |  18 +++
 .../calcite/runtime/CalciteResource.properties     |   2 +
 .../apache/calcite/sql/test/SqlAdvisorTest.java    |   2 +
 .../org/apache/calcite/test/CalciteAssert.java     |   9 ++
 .../org/apache/calcite/test/RelBuilderTest.java    |  44 ++++++-
 .../apache/calcite/test/SqlToRelConverterTest.java |  22 ++++
 .../org/apache/calcite/test/SqlValidatorTest.java  |  21 ++++
 .../java/org/apache/calcite/test/StreamTest.java   |  43 +++++++
 .../calcite/test/catalog/MockCatalogReader.java    |  54 ++++++---
 .../test/catalog/MockCatalogReaderSimple.java      |  15 ++-
 .../apache/calcite/test/SqlToRelConverterTest.xml  |  46 +++++++
 site/_docs/algebra.md                              |   1 +
 site/_docs/reference.md                            |   1 +
 30 files changed, 776 insertions(+), 42 deletions(-)

diff --git a/core/src/main/codegen/templates/Parser.jj 
b/core/src/main/codegen/templates/Parser.jj
index 1cfb764..98e3f1b 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -86,6 +86,7 @@ import org.apache.calcite.sql.SqlSampleSpec;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlSelectKeyword;
 import org.apache.calcite.sql.SqlSetOption;
+import org.apache.calcite.sql.SqlSnapshot;
 import org.apache.calcite.sql.SqlTimeLiteral;
 import org.apache.calcite.sql.SqlTimestampLiteral;
 import org.apache.calcite.sql.SqlUnnestOperator;
@@ -1810,7 +1811,9 @@ SqlNode TableRef() :
 SqlNode TableRef2(boolean lateral) :
 {
     SqlNode tableRef;
-    SqlNode over;
+    final SqlNode over;
+    final SqlNode snapshot;
+    final SqlNode match;
     SqlNodeList extendList = null;
     final SqlIdentifier alias;
     final Span s, s2;
@@ -1833,21 +1836,21 @@ SqlNode TableRef2(boolean lateral) :
                 tableRef = extend(tableRef, extendList);
             }
         ]
-        over = TableOverOpt()
-        {
+        over = TableOverOpt() {
             if (over != null) {
                 tableRef = SqlStdOperatorTable.OVER.createCall(
                     getPos(), tableRef, over);
             }
         }
         [
-            over = MatchRecognizeOpt(tableRef)
-            {
-                if (over != null) {
-                    tableRef = over;
-                }
+            snapshot = Snapshot(tableRef) {
+                tableRef = SqlStdOperatorTable.LATERAL.createCall(
+                    getPos(), snapshot);
             }
         ]
+        [
+            tableRef = MatchRecognize(tableRef)
+        ]
     |
         [ <LATERAL> { lateral = true; } ]
         tableRef = ParenthesizedExpression(ExprContext.ACCEPT_QUERY)
@@ -1862,14 +1865,9 @@ SqlNode TableRef2(boolean lateral) :
                     getPos(), tableRef);
             }
         }
-        (
-            [ over = MatchRecognizeOpt(tableRef) ]
-            {
-                if (over != null) {
-                    tableRef = over;
-                }
-            }
-        )
+        [
+            tableRef = MatchRecognize(tableRef)
+        ]
     |
         <UNNEST> { s = span(); }
         args = ParenthesizedQueryOrCommaList(ExprContext.ACCEPT_SUB_QUERY)
@@ -2503,9 +2501,24 @@ SqlNode OrderItem() :
 }
 
 /**
+ * Parses a FOR SYSTEM_TIME clause following a table expression.
+ */
+SqlSnapshot Snapshot(SqlNode tableRef) :
+{
+    final Span s;
+    final SqlNode e;
+}
+{
+    <FOR> { s = span(); } <SYSTEM_TIME> <AS> <OF>
+    e = Expression(ExprContext.ACCEPT_NON_QUERY) {
+        return new SqlSnapshot(s.end(this), tableRef, e);
+    }
+}
+
+/**
  * Parses a MATCH_RECOGNIZE clause following a table expression.
  */
-SqlMatchRecognize MatchRecognizeOpt(SqlNode tableRef) :
+SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
 {
     final Span s, s0, s1, s2;
     SqlNodeList measureList = SqlNodeList.EMPTY;
diff --git a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java 
b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
index aac238a..b83b153 100644
--- a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
@@ -49,6 +49,7 @@ import org.apache.calcite.schema.SchemaVersion;
 import org.apache.calcite.schema.Schemas;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TemporalTable;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.Wrapper;
 import org.apache.calcite.sql.SqlAccessType;
@@ -327,6 +328,10 @@ public class RelOptTableImpl extends 
Prepare.AbstractPreparingTable {
     }
   }
 
+  @Override public boolean isTemporal() {
+    return table instanceof TemporalTable;
+  }
+
   public List<String> getQualifiedName() {
     return names;
   }
diff --git a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java 
b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
index c7e290c..5980f3d 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
@@ -33,6 +33,7 @@ import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalMatch;
 import org.apache.calcite.rel.logical.LogicalMinus;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalSortExchange;
 import org.apache.calcite.rel.logical.LogicalTableScan;
@@ -99,6 +100,9 @@ public class RelFactories {
   public static final TableScanFactory DEFAULT_TABLE_SCAN_FACTORY =
       new TableScanFactoryImpl();
 
+  public static final SnapshotFactory DEFAULT_SNAPSHOT_FACTORY =
+      new SnapshotFactoryImpl();
+
   /** A {@link RelBuilderFactory} that creates a {@link RelBuilder} that will
    * create logical relational expressions for everything. */
   public static final RelBuilderFactory LOGICAL_BUILDER =
@@ -114,7 +118,8 @@ public class RelFactories {
               DEFAULT_MATCH_FACTORY,
               DEFAULT_SET_OP_FACTORY,
               DEFAULT_VALUES_FACTORY,
-              DEFAULT_TABLE_SCAN_FACTORY));
+              DEFAULT_TABLE_SCAN_FACTORY,
+              DEFAULT_SNAPSHOT_FACTORY));
 
   private RelFactories() {
   }
@@ -494,6 +499,27 @@ public class RelFactories {
   }
 
   /**
+   * Can create a {@link Snapshot} of
+   * the appropriate type for a rule's calling convention.
+   */
+  public interface SnapshotFactory {
+    /**
+     * Creates a {@link Snapshot}.
+     */
+    RelNode createSnapshot(RelNode input, RexNode period);
+  }
+
+  /**
+   * Implementation of {@link RelFactories.SnapshotFactory} that
+   * returns a vanilla {@link LogicalSnapshot}.
+   */
+  public static class SnapshotFactoryImpl implements SnapshotFactory {
+    public RelNode createSnapshot(RelNode input, RexNode period) {
+      return LogicalSnapshot.create(input, period);
+    }
+  }
+
+  /**
    * Can create a {@link Match} of
    * the appropriate type for a rule's calling convention.
    */
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Snapshot.java 
b/core/src/main/java/org/apache/calcite/rel/core/Snapshot.java
new file mode 100644
index 0000000..a77bf10
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/core/Snapshot.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.core;
+
+import org.apache.calcite.config.CalciteSystemProperty;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Litmus;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Relational expression that returns the contents of a relation expression as
+ * it was at a given time in the past.
+ *
+ * <p>For example, if {@code Products} is a temporal table, and
+ * {@link TableScan}(Products) is a relational operator that returns all
+ * versions of the contents of the table, then
+ * {@link Snapshot}(TableScan(Products)) is a relational operator that only
+ * returns the contents whose versions that overlap with the given specific
+ * period (i.e. those that started before given period and ended after it).
+ */
+public abstract class Snapshot extends SingleRel  {
+  //~ Instance fields --------------------------------------------------------
+
+  private final RexNode period;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a Snapshot.
+   *
+   * @param cluster   Cluster that this relational expression belongs to
+   * @param traitSet  The traits of this relational expression
+   * @param input     Input relational expression
+   * @param period    Timestamp expression which as the table was at the given
+   *                  time in the past
+   */
+  protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode 
input,
+      RexNode period) {
+    super(cluster, traitSet, input);
+    this.period = Objects.requireNonNull(period);
+    // Too expensive for everyday use:
+    assert !CalciteSystemProperty.DEBUG.value() || isValid(Litmus.THROW, null);
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @Override public final RelNode copy(RelTraitSet traitSet, List<RelNode> 
inputs) {
+    return copy(traitSet, sole(inputs), getPeriod());
+  }
+
+  public abstract Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode 
period);
+
+  @Override public List<RexNode> getChildExps() {
+    return ImmutableList.of(period);
+  }
+
+  public RelNode accept(RexShuttle shuttle) {
+    RexNode condition = shuttle.apply(this.period);
+    if (this.period == condition) {
+      return this;
+    }
+    return copy(traitSet, getInput(), condition);
+  }
+
+  @Override public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw)
+        .item("period", period);
+  }
+
+  public RexNode getPeriod() {
+    return period;
+  }
+
+  @Override public boolean isValid(Litmus litmus, Context context) {
+    RelDataType dataType = period.getType();
+    if (dataType.getSqlTypeName() != SqlTypeName.TIMESTAMP) {
+      return litmus.fail("The system time period specification expects 
Timestamp type but is '"
+          + dataType.getSqlTypeName() + "'");
+    }
+    return litmus.succeed();
+  }
+}
+
+// End Snapshot.java
diff --git 
a/core/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java 
b/core/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
new file mode 100644
index 0000000..7cb398c
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rel.metadata.RelMdCollation;
+import org.apache.calcite.rel.metadata.RelMdDistribution;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Sub-class of {@link org.apache.calcite.rel.core.Snapshot}
+ * not targeted at any particular engine or calling convention.
+ */
+public class LogicalSnapshot extends Snapshot {
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a LogicalSnapshot.
+   *
+   * <p>Use {@link #create} unless you know what you're doing.
+   *
+   * @param cluster   Cluster that this relational expression belongs to
+   * @param traitSet  The traits of this relational expression
+   * @param input     Input relational expression
+   * @param period    Timestamp expression which as the table was at the given
+   *                  time in the past
+   */
+  public LogicalSnapshot(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode input, RexNode period) {
+    super(cluster, traitSet, input, period);
+  }
+
+  @Override public Snapshot copy(RelTraitSet traitSet, RelNode input,
+      RexNode period) {
+    return new LogicalSnapshot(getCluster(), traitSet, input, period);
+  }
+
+  /** Creates a LogicalSnapshot. */
+  public static LogicalSnapshot create(RelNode input, RexNode period) {
+    final RelOptCluster cluster = input.getCluster();
+    final RelMetadataQuery mq = cluster.getMetadataQuery();
+    final RelTraitSet traitSet = cluster.traitSet()
+        .replace(Convention.NONE)
+        .replaceIfs(RelCollationTraitDef.INSTANCE,
+            () -> RelMdCollation.snapshot(mq, input))
+        .replaceIf(RelDistributionTraitDef.INSTANCE,
+            () -> RelMdDistribution.snapshot(mq, input));
+    return new LogicalSnapshot(cluster, traitSet, input, period);
+  }
+}
+
+// End LogicalSnapshot.java
diff --git 
a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java 
b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
index aa5df07..529a405 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
@@ -213,6 +213,12 @@ public class RelMdCollation
   }
 
   /** Helper method to determine a
+   * {@link org.apache.calcite.rel.core.Snapshot}'s collation. */
+  public static List<RelCollation> snapshot(RelMetadataQuery mq, RelNode 
input) {
+    return mq.collations(input);
+  }
+
+  /** Helper method to determine a
    * {@link org.apache.calcite.rel.core.Sort}'s collation. */
   public static List<RelCollation> sort(RelCollation collation) {
     return ImmutableList.of(collation);
diff --git 
a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdDistribution.java 
b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdDistribution.java
index b594e4d..91b9185 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdDistribution.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdDistribution.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Snapshot;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Values;
@@ -113,6 +114,12 @@ public class RelMdDistribution
   }
 
   /** Helper method to determine a
+   * {@link Snapshot}'s distribution. */
+  public static RelDistribution snapshot(RelMetadataQuery mq, RelNode input) {
+    return mq.distribution(input);
+  }
+
+  /** Helper method to determine a
    * {@link Sort}'s distribution. */
   public static RelDistribution sort(RelMetadataQuery mq, RelNode input) {
     return mq.distribution(input);
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java 
b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 2aeb60d..e356e84 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -713,6 +713,12 @@ public interface CalciteResource {
   @BaseMessage("Invalid number of parameters to COUNT method")
   ExInst<SqlValidatorException> patternCountFunctionArg();
 
+  @BaseMessage("The system time period specification expects Timestamp type 
but is ''{0}''")
+  ExInst<SqlValidatorException> illegalExpressionForTemporal(String type);
+
+  @BaseMessage("Table ''{0}'' is not a temporal table, can not be queried in 
system time period specification")
+  ExInst<SqlValidatorException> notTemporalTable(String tableName);
+
   @BaseMessage("Cannot use RUNNING/FINAL in DEFINE ''{0}''")
   ExInst<SqlValidatorException> patternRunningFunctionInDefine(String call);
 
diff --git a/core/src/main/java/org/apache/calcite/schema/Schema.java 
b/core/src/main/java/org/apache/calcite/schema/Schema.java
index 05843dc..d9e5cfa 100644
--- a/core/src/main/java/org/apache/calcite/schema/Schema.java
+++ b/core/src/main/java/org/apache/calcite/schema/Schema.java
@@ -322,6 +322,13 @@ public interface Schema {
      */
     TYPED_VIEW,
 
+    /**
+     * A temporal table.
+     *
+     * <p>Used by MS SQL, Oracle and others
+     */
+    TEMPORAL_TABLE,
+
     /** Table type not known to Calcite.
      *
      * <p>If you get one of these, please fix the problem by adding an enum
diff --git a/core/src/main/java/org/apache/calcite/schema/TemporalTable.java 
b/core/src/main/java/org/apache/calcite/schema/TemporalTable.java
new file mode 100644
index 0000000..5823cbd
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/schema/TemporalTable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.schema;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Table that is temporal.
+ */
+public interface TemporalTable extends Table {
+
+  /** Returns the name of the system column that contains the start effective
+   * time of each row. */
+  @Nonnull String getSysStartFieldName();
+
+  /** Returns the name of the system column that contains the end effective
+   * time of each row. */
+  @Nonnull String getSysEndFieldName();
+}
+
+// End TemporalTable.java
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlKind.java 
b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
index 521e35b..9f0a735 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlKind.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
@@ -241,6 +241,12 @@ public enum SqlKind {
    * MATCH_RECOGNIZE clause
    */
   MATCH_RECOGNIZE,
+
+  /**
+   * SNAPSHOT operator
+   */
+  SNAPSHOT,
+
   // binary operators
 
   /**
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlLateralOperator.java 
b/core/src/main/java/org/apache/calcite/sql/SqlLateralOperator.java
index d238288..9e27459 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlLateralOperator.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlLateralOperator.java
@@ -35,7 +35,8 @@ public class SqlLateralOperator extends SqlSpecialOperator {
   @Override public void unparse(SqlWriter writer, SqlCall call, int leftPrec,
       int rightPrec) {
     if (call.operandCount() == 1
-        && call.getOperandList().get(0).getKind() == SqlKind.COLLECTION_TABLE) 
{
+        && (call.getOperandList().get(0).getKind() == SqlKind.COLLECTION_TABLE
+            || call.getOperandList().get(0).getKind() == SqlKind.SNAPSHOT)) {
       // do not create ( ) around the following TABLE clause
       writer.keyword(getName());
       call.operand(0).unparse(writer, 0, 0);
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlSnapshot.java 
b/core/src/main/java/org/apache/calcite/sql/SqlSnapshot.java
new file mode 100644
index 0000000..1e9ebbf
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSnapshot.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Parse tree node for "{@code FOR SYSTEM_TIME AS OF}" temporal clause.
+ */
+public class SqlSnapshot extends SqlCall {
+  private static final int OPERAND_TABLE_REF = 0;
+  private static final int OPERAND_PERIOD = 1;
+
+  //~ Instance fields -------------------------------------------
+
+  private SqlNode tableRef;
+  private SqlNode period;
+
+  /** Creates a SqlSnapshot. */
+  public SqlSnapshot(SqlParserPos pos, SqlNode tableRef, SqlNode period) {
+    super(pos);
+    this.tableRef = Objects.requireNonNull(tableRef);
+    this.period = Objects.requireNonNull(period);
+  }
+
+  // ~ Methods
+
+  @Override public SqlOperator getOperator() {
+    return SqlSnapshotOperator.INSTANCE;
+  }
+
+  @Override public List<SqlNode> getOperandList() {
+    return ImmutableNullableList.of(tableRef, period);
+  }
+
+  public SqlNode getTableRef() {
+    return tableRef;
+  }
+
+  public SqlNode getPeriod() {
+    return period;
+  }
+
+  @Override public void setOperand(int i, SqlNode operand) {
+    switch (i) {
+    case OPERAND_TABLE_REF:
+      tableRef = Objects.requireNonNull(operand);
+      break;
+    case OPERAND_PERIOD:
+      period = Objects.requireNonNull(operand);
+      break;
+    default:
+      throw new AssertionError(i);
+    }
+  }
+
+  @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) 
{
+    getOperator().unparse(writer, this, 0, 0);
+  }
+
+  /**
+   * An operator describing a FOR SYSTEM_TIME specification.
+   */
+  public static class SqlSnapshotOperator extends SqlOperator {
+
+    public static final SqlSnapshotOperator INSTANCE = new 
SqlSnapshotOperator();
+
+    private SqlSnapshotOperator() {
+      super("SNAPSHOT", SqlKind.SNAPSHOT, 2, true, null, null, null);
+    }
+
+    @Override public SqlSyntax getSyntax() {
+      return SqlSyntax.SPECIAL;
+    }
+
+    @Override public SqlCall createCall(
+        SqlLiteral functionQualifier,
+        SqlParserPos pos,
+        SqlNode... operands) {
+      assert functionQualifier == null;
+      assert operands.length == 2;
+      return new SqlSnapshot(pos, operands[0], operands[1]);
+    }
+
+    @Override public <R> void acceptCall(
+        SqlVisitor<R> visitor,
+        SqlCall call,
+        boolean onlyExpressions,
+        SqlBasicVisitor.ArgHandler<R> argHandler) {
+      if (onlyExpressions) {
+        List<SqlNode> operands = call.getOperandList();
+        // skip the first operand
+        for (int i = 1; i < operands.size(); i++) {
+          argHandler.visitChild(visitor, call, i, operands.get(i));
+        }
+      } else {
+        super.acceptCall(visitor, call, false, argHandler);
+      }
+    }
+
+    @Override public void unparse(
+        SqlWriter writer,
+        SqlCall call,
+        int leftPrec,
+        int rightPrec) {
+      final SqlSnapshot snapshot = (SqlSnapshot) call;
+
+      snapshot.tableRef.unparse(writer, 0, 0);
+      writer.keyword("FOR SYSTEM_TIME AS OF");
+      snapshot.period.unparse(writer, 0, 0);
+    }
+  }
+}
+
+// End SqlSnapshot.java
diff --git 
a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java 
b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 41fdab5..c0b8846 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -72,6 +72,7 @@ import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlSampleSpec;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlSelectKeyword;
+import org.apache.calcite.sql.SqlSnapshot;
 import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.SqlUnresolvedFunction;
 import org.apache.calcite.sql.SqlUpdate;
@@ -963,6 +964,22 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         node,
         ns.getTable(),
         SqlAccessEnum.SELECT);
+
+    if (node.getKind() == SqlKind.SNAPSHOT) {
+      SqlSnapshot snapshot = (SqlSnapshot) node;
+      SqlNode period = snapshot.getPeriod();
+      RelDataType dataType = deriveType(scope, period);
+      if (dataType.getSqlTypeName() != SqlTypeName.TIMESTAMP) {
+        throw newValidationError(period,
+            
Static.RESOURCE.illegalExpressionForTemporal(dataType.getSqlTypeName().getName()));
+      }
+      if (!ns.getTable().isTemporal()) {
+        List<String> qualifiedName = ns.getTable().getQualifiedName();
+        String tableName = qualifiedName.get(qualifiedName.size() - 1);
+        throw newValidationError(snapshot.getTableRef(),
+            Static.RESOURCE.notTemporalTable(tableName));
+      }
+    }
   }
 
   /**
@@ -1084,6 +1101,7 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         return ns;
       }
       // fall through
+    case SNAPSHOT:
     case OVER:
     case COLLECTION_TABLE:
     case ORDER_BY:
@@ -2272,6 +2290,25 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
           forceNullable,
           lateral);
 
+    case SNAPSHOT:
+      call = (SqlCall) node;
+      operand = call.operand(0);
+      newOperand = registerFrom(
+          tableScope == null ? parentScope : tableScope,
+          usingScope,
+          register,
+          operand,
+          enclosingNode,
+          alias,
+          extendList,
+          forceNullable,
+          true);
+      if (newOperand != operand) {
+        call.setOperand(0, newOperand);
+      }
+      scopes.put(node, parentScope);
+      return newNode;
+
     default:
       throw Util.unexpected(kind);
     }
diff --git 
a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java 
b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java
index b660050..da18656 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java
@@ -49,6 +49,11 @@ public interface SqlValidatorTable extends Wrapper {
   boolean supportsModality(SqlModality modality);
 
   /**
+   * Returns whether the table is temporal.
+   */
+  boolean isTemporal();
+
+  /**
    * Returns whether the ordinal column has a default value.
    */
   @Deprecated // to be removed before 2.0
diff --git 
a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java 
b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
index 4dff252..3a28d84 100644
--- 
a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
+++ 
b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalMatch;
 import org.apache.calcite.rel.logical.LogicalMinus;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
 import org.apache.calcite.rel.logical.LogicalTableModify;
@@ -704,6 +705,14 @@ public class RelStructuredTypeFlattener implements 
ReflectiveVisitor {
     setNewForOldRel(rel, newRel);
   }
 
+  public void rewriteRel(LogicalSnapshot rel) {
+    RelNode newRel =
+        rel.copy(rel.getTraitSet(),
+            getNewForOldRel(rel.getInput()),
+            rel.getPeriod().accept(new RewriteRexShuttle()));
+    setNewForOldRel(rel, newRel);
+  }
+
   public void rewriteRel(LogicalDelta rel) {
     rewriteGeneric(rel);
   }
diff --git 
a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java 
b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 07561b2..95abb79 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -46,6 +46,7 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.core.Sample;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.core.Values;
 import org.apache.calcite.rel.logical.LogicalAggregate;
@@ -123,6 +124,7 @@ import org.apache.calcite.sql.SqlSampleSpec;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlSelectKeyword;
 import org.apache.calcite.sql.SqlSetOperator;
+import org.apache.calcite.sql.SqlSnapshot;
 import org.apache.calcite.sql.SqlUnnestOperator;
 import org.apache.calcite.sql.SqlUpdate;
 import org.apache.calcite.sql.SqlUtil;
@@ -2016,6 +2018,10 @@ public class SqlToRelConverter {
       convertIdentifier(bb, id, extendedColumns);
       return;
 
+    case SNAPSHOT:
+      snapshotTemporalTable(bb, (SqlCall) from);
+      return;
+
     case JOIN:
       final SqlJoin join = (SqlJoin) from;
       final SqlValidatorScope scope = validator.getJoinScope(from);
@@ -2381,6 +2387,20 @@ public class SqlToRelConverter {
       LogicalTableFunctionScan callRel) {
   }
 
+  private void snapshotTemporalTable(Blackboard bb, SqlCall call) {
+    final SqlSnapshot snapshot = (SqlSnapshot) call;
+    final RexNode period = bb.convertExpression(snapshot.getPeriod());
+
+    // convert inner query, could be a table name or a derived table
+    SqlNode expr = snapshot.getTableRef();
+    convertFrom(bb, expr);
+    final TableScan scan = (TableScan) bb.root;
+
+    final RelNode snapshotRel = relBuilder.push(scan).snapshot(period).build();
+
+    bb.setRoot(snapshotRel, false);
+  }
+
   private Set<RelColumnMapping> getColumnMappings(SqlOperator op) {
     SqlReturnTypeInference rti = op.getReturnTypeInference();
     if (rti == null) {
diff --git a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java 
b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
index ccb987c..c62c42c 100644
--- a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++ b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -42,6 +42,7 @@ import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.core.Snapshot;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
@@ -142,6 +143,7 @@ public class RelBuilder {
   private final RelFactories.CorrelateFactory correlateFactory;
   private final RelFactories.ValuesFactory valuesFactory;
   private final RelFactories.TableScanFactory scanFactory;
+  private final RelFactories.SnapshotFactory snapshotFactory;
   private final RelFactories.MatchFactory matchFactory;
   private final Deque<Frame> stack = new ArrayDeque<>();
   private final boolean simplify;
@@ -191,6 +193,9 @@ public class RelBuilder {
     this.scanFactory =
         Util.first(context.unwrap(RelFactories.TableScanFactory.class),
             RelFactories.DEFAULT_TABLE_SCAN_FACTORY);
+    this.snapshotFactory =
+        Util.first(context.unwrap(RelFactories.SnapshotFactory.class),
+            RelFactories.DEFAULT_SNAPSHOT_FACTORY);
     this.matchFactory =
         Util.first(context.unwrap(RelFactories.MatchFactory.class),
             RelFactories.DEFAULT_MATCH_FACTORY);
@@ -1027,6 +1032,19 @@ public class RelBuilder {
     return scan(ImmutableList.copyOf(tableNames));
   }
 
+  /** Creates a {@link Snapshot} of a given snapshot period.
+   *
+   * <p>Returns this builder.
+   *
+   * @param period Name of table (can optionally be qualified)
+   */
+  public RelBuilder snapshot(RexNode period) {
+    final Frame frame = stack.pop();
+    final RelNode snapshot = snapshotFactory.createSnapshot(frame.rel, period);
+    stack.push(new Frame(snapshot, frame.fields));
+    return this;
+  }
+
   /** Creates a {@link Filter} of an array of
    * predicates.
    *
diff --git 
a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties 
b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index fc6ef27..bd31c82 100644
--- 
a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ 
b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -232,6 +232,8 @@ PatternPrevFunctionInMeasure=Cannot use PREV/NEXT in 
MEASURE ''{0}''
 PatternPrevFunctionOrder=Cannot nest PREV/NEXT under LAST/FIRST ''{0}''
 PatternAggregationInNavigation=Cannot use aggregation in navigation ''{0}''
 PatternCountFunctionArg=Invalid number of parameters to COUNT method
+IllegalExpressionForTemporal=The system time period specification expects 
Timestamp type but is ''{0}''
+NotTemporalTable=Table ''{0}'' is not a temporal table, can not be queried in 
system time period specification
 PatternRunningFunctionInDefine=Cannot use RUNNING/FINAL in DEFINE ''{0}''
 PatternFunctionVariableCheck=Multiple pattern variables in ''{0}''
 FunctionMatchRecognizeOnly=Function ''{0}'' can only be used in MATCH_RECOGNIZE
diff --git a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java 
b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
index c79454f..9204b7b 100644
--- a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
+++ b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
@@ -90,6 +90,7 @@ public class SqlAdvisorTest extends SqlValidatorTestCase {
           "TABLE(CATALOG.SALES.SALGRADE)",
           "TABLE(CATALOG.SALES.SHIPMENTS)",
           "TABLE(CATALOG.SALES.PRODUCTS)",
+          "TABLE(CATALOG.SALES.PRODUCTS_TEMPORAL)",
           "TABLE(CATALOG.SALES.SUPPLIERS)",
           "TABLE(CATALOG.SALES.EMP_R)",
           "TABLE(CATALOG.SALES.DEPT_R)");
@@ -328,6 +329,7 @@ public class SqlAdvisorTest extends SqlValidatorTestCase {
   protected static final List<String> JOIN_KEYWORDS =
       Arrays.asList(
           "KEYWORD(FETCH)",
+          "KEYWORD(FOR)",
           "KEYWORD(OFFSET)",
           "KEYWORD(LIMIT)",
           "KEYWORD(UNION)",
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java 
b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
index 35505fd..26e29f2 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
@@ -733,6 +733,7 @@ public class CalciteAssert {
   public static SchemaPlus addSchema(SchemaPlus rootSchema, SchemaSpec schema) 
{
     final SchemaPlus foodmart;
     final SchemaPlus jdbcScott;
+    final SchemaPlus scott;
     final ConnectionSpec cs;
     final DataSource dataSource;
     switch (schema) {
@@ -767,6 +768,13 @@ public class CalciteAssert {
     case SCOTT:
       jdbcScott = addSchemaIfNotExists(rootSchema, SchemaSpec.JDBC_SCOTT);
       return rootSchema.add(schema.schemaName, new CloneSchema(jdbcScott));
+    case SCOTT_WITH_TEMPORAL:
+      scott = addSchemaIfNotExists(rootSchema, SchemaSpec.SCOTT);
+      scott.add("products_temporal", new StreamTest.ProductsTemporalTable());
+      scott.add("orders",
+          new StreamTest.OrdersHistoryTable(
+              StreamTest.OrdersStreamTableFactory.getRowList()));
+      return scott;
     case CLONE_FOODMART:
       foodmart = addSchemaIfNotExists(rootSchema, SchemaSpec.JDBC_FOODMART);
       return rootSchema.add("foodmart2", new CloneSchema(foodmart));
@@ -1837,6 +1845,7 @@ public class CalciteAssert {
     HR("hr"),
     JDBC_SCOTT("JDBC_SCOTT"),
     SCOTT("scott"),
+    SCOTT_WITH_TEMPORAL("scott_temporal"),
     BLANK("BLANK"),
     LINGUAL("SALES"),
     POST("POST"),
diff --git a/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java 
b/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
index fa5a150..1a50e95 100644
--- a/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
@@ -54,6 +54,7 @@ import org.apache.calcite.tools.RelRunner;
 import org.apache.calcite.tools.RelRunners;
 import org.apache.calcite.util.Holder;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.TimestampString;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.mapping.Mappings;
 
@@ -124,7 +125,7 @@ public class RelBuilderTest {
     return Frameworks.newConfigBuilder()
         .parserConfig(SqlParser.Config.DEFAULT)
         .defaultSchema(
-            CalciteAssert.addSchema(rootSchema, 
CalciteAssert.SchemaSpec.SCOTT))
+            CalciteAssert.addSchema(rootSchema, 
CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL))
         .traitDefs((List<RelTraitDef>) null)
         .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2));
   }
@@ -281,6 +282,47 @@ public class RelBuilderTest {
     assertThat(root, hasTree(expected));
   }
 
+  @Test public void testSnapshotTemporalTable() {
+    // Equivalent SQL:
+    //   SELECT *
+    //   FROM products_temporal FOR SYSTEM_TIME AS OF TIMESTAMP '2011-07-20 
12:34:56'
+    final RelBuilder builder = RelBuilder.create(config().build());
+    RelNode root =
+        builder.scan("products_temporal")
+            .snapshot(
+                builder.getRexBuilder().makeTimestampLiteral(
+                    new TimestampString("2011-07-20 12:34:56"), 0))
+            .build();
+    final String expected = "LogicalSnapshot(period=[2011-07-20 12:34:56])\n"
+            + "  LogicalTableScan(table=[[scott, products_temporal]])\n";
+    assertThat(root, hasTree(expected));
+  }
+
+  @Test public void testJoinTemporalTable() {
+    // Equivalent SQL:
+    //   SELECT *
+    //   FROM orders
+    //   JOIN products_temporal FOR SYSTEM_TIME AS OF TIMESTAMP '2011-07-20 
12:34:56'
+    //   ON orders.product = products_temporal.id
+    final RelBuilder builder = RelBuilder.create(config().build());
+    RelNode root =
+        builder.scan("orders")
+            .scan("products_temporal")
+            .snapshot(
+                builder.getRexBuilder().makeTimestampLiteral(
+                    new TimestampString("2011-07-20 12:34:56"), 0))
+            .join(JoinRelType.INNER,
+                builder.call(SqlStdOperatorTable.EQUALS,
+                    builder.field(2, 0, "PRODUCT"),
+                    builder.field(2, 1, "ID")))
+            .build();
+    final String expected = "LogicalJoin(condition=[=($2, $4)], 
joinType=[inner])\n"
+        + "  LogicalTableScan(table=[[scott, orders]])\n"
+        + "  LogicalSnapshot(period=[2011-07-20 12:34:56])\n"
+        + "    LogicalTableScan(table=[[scott, products_temporal]])\n";
+    assertThat(root, hasTree(expected));
+  }
+
   @Test public void testScanFilterOr() {
     // Equivalent SQL:
     //   SELECT *
diff --git 
a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java 
b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index bb322d6..9976321 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -1062,6 +1062,28 @@ public class SqlToRelConverterTest extends 
SqlToRelTestBase {
     sql("select * from dept, lateral table(ramp(deptno))").ok();
   }
 
+  @Test public void testSnapshotOnTemporalTable() {
+    final String sql = "select * from products_temporal "
+        + "for system_time as of TIMESTAMP '2011-01-02 00:00:00'";
+    sql(sql).ok();
+  }
+
+  @Test public void testJoinTemporalTableOnSpecificTime() {
+    final String sql = "select stream *\n"
+        + "from orders,\n"
+        + "  products_temporal for system_time as of\n"
+        + "    TIMESTAMP '2011-01-02 00:00:00'";
+    sql(sql).ok();
+  }
+
+  @Test public void testJoinTemporalTableOnColumnReference() {
+    final String sql = "select stream *\n"
+        + "from orders\n"
+        + "join products_temporal for system_time as of orders.rowtime\n"
+        + "on orders.productid = products_temporal.productid";
+    sql(sql).ok();
+  }
+
   /** Test case for
    * <a 
href="https://issues.apache.org/jira/browse/CALCITE-1732";>[CALCITE-1732]
    * IndexOutOfBoundsException when using LATERAL TABLE with more than one
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java 
b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 158848c..d13788b 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -7945,6 +7945,27 @@ public class SqlValidatorTest extends 
SqlValidatorTestCase {
         "Object 'BLOOP' not found");
   }
 
+  @Test public void testTemporalTable() {
+    checkFails("select stream * from orders, ^products^ for system_time as of"
+            + " TIMESTAMP '2011-01-02 00:00:00'",
+        "Table 'PRODUCTS' is not a temporal table, "
+            + "can not be queried in system time period specification");
+
+    checkFails("select stream * from orders, products_temporal "
+            + "for system_time as of ^'2011-01-02 00:00:00'^",
+        "The system time period specification expects Timestamp type but is 
'CHAR'");
+
+    // verify inner join with a specific timestamp
+    check("select stream * from orders join products_temporal "
+        + "for system_time as of timestamp '2011-01-02 00:00:00' "
+        + "on orders.productid = products_temporal.productid");
+
+    // verify left join with a timestamp expression
+    check("select stream * from orders left join products_temporal "
+        + "for system_time as of orders.rowtime "
+        + "on orders.productid = products_temporal.productid");
+  }
+
   @Test public void testScalarSubQuery() {
     check("SELECT  ename,(select name from dept where deptno=1) FROM emp");
     checkFails(
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java 
b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 2d20200..5895dfc 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -33,6 +33,7 @@ import org.apache.calcite.schema.Statistics;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.TableFactory;
+import org.apache.calcite.schema.TemporalTable;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -547,6 +548,48 @@ public class StreamTest {
       return false;
     }
   }
+
+  /**
+   * Table representing the PRODUCTS_TEMPORAL temporal table.
+   */
+  public static class ProductsTemporalTable implements TemporalTable {
+
+    private final RelProtoDataType protoRowType = a0 -> a0.builder()
+        .add("ID", SqlTypeName.VARCHAR, 32)
+        .add("SUPPLIER", SqlTypeName.INTEGER)
+        .add("SYS_START", SqlTypeName.TIMESTAMP)
+        .add("SYS_END", SqlTypeName.TIMESTAMP)
+        .build();
+
+    @Override public String getSysStartFieldName() {
+      return "SYS_START";
+    }
+
+    @Override public String getSysEndFieldName() {
+      return "SYS_END";
+    }
+
+    @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return protoRowType.apply(typeFactory);
+    }
+
+    @Override public Statistic getStatistic() {
+      return Statistics.of(200d, ImmutableList.of());
+    }
+
+    @Override public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+
+    @Override public boolean isRolledUp(String column) {
+      return false;
+    }
+
+    @Override public boolean rolledUpColumnValidInsideAgg(String column,
+        SqlCall call, SqlNode parent, CalciteConnectionConfig config) {
+      return false;
+    }
+  }
 }
 
 // End StreamTest.java
diff --git 
a/core/src/test/java/org/apache/calcite/test/catalog/MockCatalogReader.java 
b/core/src/test/java/org/apache/calcite/test/catalog/MockCatalogReader.java
index 6224792..5fced0a 100644
--- a/core/src/test/java/org/apache/calcite/test/catalog/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/catalog/MockCatalogReader.java
@@ -293,25 +293,29 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
     protected final Set<String> monotonicColumnSet = new HashSet<>();
     protected StructKind kind = StructKind.FULLY_QUALIFIED;
     protected final ColumnResolver resolver;
+    private final boolean temporal;
     protected final InitializerExpressionFactory initializerFactory;
     protected final Set<String> rolledUpColumns = new HashSet<>();
 
     public MockTable(MockCatalogReader catalogReader, String catalogName,
-        String schemaName, String name, boolean stream, double rowCount,
-        ColumnResolver resolver,
+        String schemaName, String name, boolean stream, boolean temporal,
+        double rowCount, ColumnResolver resolver,
         InitializerExpressionFactory initializerFactory) {
-      this(catalogReader, ImmutableList.of(catalogName, schemaName, name), 
stream, rowCount,
-          resolver, initializerFactory);
+      this(catalogReader, ImmutableList.of(catalogName, schemaName, name),
+          stream, temporal, rowCount, resolver, initializerFactory);
     }
 
     public void registerRolledUpColumn(String columnName) {
       rolledUpColumns.add(columnName);
     }
 
-    private MockTable(MockCatalogReader catalogReader, List<String> names, 
boolean stream,
-        double rowCount, ColumnResolver resolver, InitializerExpressionFactory 
initializerFactory) {
+    private MockTable(MockCatalogReader catalogReader, List<String> names,
+        boolean stream, boolean temporal, double rowCount,
+        ColumnResolver resolver,
+        InitializerExpressionFactory initializerFactory) {
       this.catalogReader = catalogReader;
       this.stream = stream;
+      this.temporal = temporal;
       this.rowCount = rowCount;
       this.names = names;
       this.resolver = resolver;
@@ -321,13 +325,15 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
     /**
      * Copy constructor.
      */
-    protected MockTable(MockCatalogReader catalogReader, boolean stream, 
double rowCount,
+    protected MockTable(MockCatalogReader catalogReader, boolean stream,
+        boolean temporal, double rowCount,
         List<Map.Entry<String, RelDataType>> columnList, List<Integer> keyList,
         RelDataType rowType, List<RelCollation> collationList, List<String> 
names,
         Set<String> monotonicColumnSet, StructKind kind, ColumnResolver 
resolver,
         InitializerExpressionFactory initializerFactory) {
       this.catalogReader = catalogReader;
       this.stream = stream;
+      this.temporal = temporal;
       this.rowCount = rowCount;
       this.rowType = rowType;
       this.collationList = collationList;
@@ -397,7 +403,8 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
     }
 
     @Override protected RelOptTable extend(final Table extendedTable) {
-      return new MockTable(catalogReader, names, stream, rowCount, resolver, 
initializerFactory) {
+      return new MockTable(catalogReader, names, stream, temporal, rowCount,
+          resolver, initializerFactory) {
         @Override public RelDataType getRowType() {
           return extendedTable.getRowType(catalogReader.typeFactory);
         }
@@ -411,7 +418,7 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
 
     public static MockTable create(MockCatalogReader catalogReader,
         List<String> names, boolean stream, double rowCount) {
-      return new MockTable(catalogReader, names, stream, rowCount, null,
+      return new MockTable(catalogReader, names, stream, false, rowCount, null,
           NullInitializerExpressionFactory.INSTANCE);
     }
 
@@ -419,16 +426,18 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
         MockSchema schema, String name, boolean stream, double rowCount,
         ColumnResolver resolver) {
       return create(catalogReader, schema, name, stream, rowCount, resolver,
-          NullInitializerExpressionFactory.INSTANCE);
+          NullInitializerExpressionFactory.INSTANCE, false);
     }
 
     public static MockTable create(MockCatalogReader catalogReader,
         MockSchema schema, String name, boolean stream, double rowCount,
         ColumnResolver resolver,
-        InitializerExpressionFactory initializerExpressionFactory) {
+        InitializerExpressionFactory initializerExpressionFactory,
+        boolean temporal) {
       MockTable table =
           new MockTable(catalogReader, schema.getCatalogName(), schema.name,
-              name, stream, rowCount, resolver, initializerExpressionFactory);
+              name, stream, temporal, rowCount, resolver,
+              initializerExpressionFactory);
       schema.addTable(name);
       return table;
     }
@@ -486,6 +495,10 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
       return modality == (stream ? SqlModality.STREAM : SqlModality.RELATION);
     }
 
+    @Override public boolean isTemporal() {
+      return temporal;
+    }
+
     public void onRegister(RelDataTypeFactory typeFactory) {
       rowType = typeFactory.createStructType(kind, Pair.right(columnList),
           Pair.left(columnList));
@@ -564,8 +577,8 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
         MockCatalogReader catalogReader, String catalogName, String 
schemaName, String name,
         boolean stream, double rowCount, ColumnResolver resolver,
         InitializerExpressionFactory initializerExpressionFactory) {
-      super(catalogReader, ImmutableList.of(catalogName, schemaName, name), 
stream, rowCount,
-          resolver, initializerExpressionFactory);
+      super(catalogReader, ImmutableList.of(catalogName, schemaName, name),
+          stream, false, rowCount, resolver, initializerExpressionFactory);
       this.modifiableViewTable = modifiableViewTable;
     }
 
@@ -578,7 +591,8 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
         RelDataType rowType, List<RelCollation> collationList, List<String> 
names,
         Set<String> monotonicColumnSet, StructKind kind, ColumnResolver 
resolver,
         InitializerExpressionFactory initializerFactory) {
-      super(catalogReader, stream, rowCount, columnList, keyList, rowType, 
collationList, names,
+      super(catalogReader, stream, false, rowCount, columnList, keyList,
+          rowType, collationList, names,
           monotonicColumnSet, kind, resolver, initializerFactory);
       this.modifiableViewTable = modifiableViewTable;
     }
@@ -588,7 +602,7 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
         boolean stream, double rowCount, ColumnResolver resolver) {
       final Table underlying = modifiableViewTable.unwrap(Table.class);
       final InitializerExpressionFactory initializerExpressionFactory =
-          underlying != null && underlying instanceof Wrapper
+          underlying instanceof Wrapper
               ? ((Wrapper) 
underlying).unwrap(InitializerExpressionFactory.class)
               : NullInitializerExpressionFactory.INSTANCE;
       return new MockModifiableViewRelOptTable(modifiableViewTable,
@@ -677,8 +691,8 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
         String schemaName, String name, boolean stream, double rowCount,
         MockTable fromTable, ImmutableIntList mapping, ColumnResolver resolver,
         InitializerExpressionFactory initializerFactory) {
-      super(catalogReader, catalogName, schemaName, name, stream, rowCount,
-          resolver, initializerFactory);
+      super(catalogReader, catalogName, schemaName, name, stream, false,
+          rowCount, resolver, initializerFactory);
       this.fromTable = fromTable;
       this.table = fromTable.unwrap(Table.class);
       this.mapping = mapping;
@@ -808,8 +822,8 @@ public abstract class MockCatalogReader extends 
CalciteCatalogReader {
   public static class MockDynamicTable extends MockTable {
     public MockDynamicTable(MockCatalogReader catalogReader, String 
catalogName,
         String schemaName, String name, boolean stream, double rowCount) {
-      super(catalogReader, catalogName, schemaName, name, stream, rowCount,
-          null, NullInitializerExpressionFactory.INSTANCE);
+      super(catalogReader, catalogName, schemaName, name, stream, false,
+          rowCount, null, NullInitializerExpressionFactory.INSTANCE);
     }
 
     public void onRegister(RelDataTypeFactory typeFactory) {
diff --git 
a/core/src/test/java/org/apache/calcite/test/catalog/MockCatalogReaderSimple.java
 
b/core/src/test/java/org/apache/calcite/test/catalog/MockCatalogReaderSimple.java
index da2b086..538cf97 100644
--- 
a/core/src/test/java/org/apache/calcite/test/catalog/MockCatalogReaderSimple.java
+++ 
b/core/src/test/java/org/apache/calcite/test/catalog/MockCatalogReaderSimple.java
@@ -86,7 +86,7 @@ public class MockCatalogReaderSimple extends 
MockCatalogReader {
     // Register "EMP" table.
     final MockTable empTable =
         MockTable.create(this, salesSchema, "EMP", false, 14, null,
-            countingInitializerExpressionFactory);
+            countingInitializerExpressionFactory, false);
     empTable.addColumn("EMPNO", fixture.intType, true);
     empTable.addColumn("ENAME", fixture.varchar20Type);
     empTable.addColumn("JOB", fixture.varchar10Type);
@@ -115,7 +115,7 @@ public class MockCatalogReaderSimple extends 
MockCatalogReader {
     // Register "EMPDEFAULTS" table with default values for some columns.
     final MockTable empDefaultsTable =
         MockTable.create(this, salesSchema, "EMPDEFAULTS", false, 14, null,
-            new EmpInitializerExpressionFactory());
+            new EmpInitializerExpressionFactory(), false);
     empDefaultsTable.addColumn("EMPNO", fixture.intType, true);
     empDefaultsTable.addColumn("ENAME", fixture.varchar20Type);
     empDefaultsTable.addColumn("JOB", fixture.varchar10TypeNull);
@@ -241,6 +241,17 @@ public class MockCatalogReaderSimple extends 
MockCatalogReader {
     productsTable.addColumn("SUPPLIERID", fixture.intType);
     registerTable(productsTable);
 
+    // Register "PRODUCTS_TEMPORAL" table.
+    MockTable productsTemporalTable =
+        MockTable.create(this, salesSchema, "PRODUCTS_TEMPORAL", false, 200D,
+        null, NullInitializerExpressionFactory.INSTANCE, true);
+    productsTemporalTable.addColumn("PRODUCTID", fixture.intType);
+    productsTemporalTable.addColumn("NAME", fixture.varchar20Type);
+    productsTemporalTable.addColumn("SUPPLIERID", fixture.intType);
+    productsTemporalTable.addColumn("SYS_START", fixture.timestampType);
+    productsTemporalTable.addColumn("SYS_END", fixture.timestampType);
+    registerTable(productsTemporalTable);
+
     // Register "SUPPLIERS" table.
     MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS",
         false, 10D);
diff --git 
a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml 
b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 53d62d8..c24e21b 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -239,6 +239,52 @@ LogicalProject(DEPTNO=[$0], NAME=[$1], I=[$2])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testSnapshotOnTemporalTable">
+        <Resource name="sql">
+            <![CDATA[select * from products_temporal
+for system_time as of TIMESTAMP '2011-01-02 00:00:00']]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(PRODUCTID=[$0], NAME=[$1], SUPPLIERID=[$2], SYS_START=[$3], 
SYS_END=[$4])
+  LogicalSnapshot(period=[2011-01-02 00:00:00])
+    LogicalTableScan(table=[[CATALOG, SALES, PRODUCTS_TEMPORAL]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testJoinTemporalTableOnSpecificTime">
+        <Resource name="sql">
+            <![CDATA[select stream * from orders, products_temporal
+for system_time as of TIMESTAMP '2011-01-02 00:00:00']]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalDelta
+  LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2], PRODUCTID0=[$3], 
NAME=[$4], SUPPLIERID=[$5], SYS_START=[$6], SYS_END=[$7])
+    LogicalJoin(condition=[true], joinType=[inner])
+      LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
+      LogicalSnapshot(period=[2011-01-02 00:00:00])
+        LogicalTableScan(table=[[CATALOG, SALES, PRODUCTS_TEMPORAL]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testJoinTemporalTableOnColumnReference">
+        <Resource name="sql">
+            <![CDATA[select stream * from orders join products_temporal
+for system_time as of orders.rowtime on orders.productid = 
products_temporal.productid]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalDelta
+  LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2], PRODUCTID0=[$3], 
NAME=[$4], SUPPLIERID=[$5], SYS_START=[$6], SYS_END=[$7])
+    LogicalFilter(condition=[=($1, $3)])
+      LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+        LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
+        LogicalSnapshot(period=[$cor0.ROWTIME])
+          LogicalTableScan(table=[[CATALOG, SALES, PRODUCTS_TEMPORAL]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testCollectionTableWithLateral3">
         <Resource name="sql">
             <![CDATA[select * from dept, lateral table(DEDUP(dept.deptno, 
dept.name))]]>
diff --git a/site/_docs/algebra.md b/site/_docs/algebra.md
index 4c45f39..fc51905 100644
--- a/site/_docs/algebra.md
+++ b/site/_docs/algebra.md
@@ -277,6 +277,7 @@ return the `RelBuilder`.
 | `union(all [, n])` | Creates a [Union]({{ site.apiRoot 
}}/org/apache/calcite/rel/core/Union.html) of the `n` (default two) most recent 
relational expressions.
 | `intersect(all [, n])` | Creates an [Intersect]({{ site.apiRoot 
}}/org/apache/calcite/rel/core/Intersect.html) of the `n` (default two) most 
recent relational expressions.
 | `minus(all)` | Creates a [Minus]({{ site.apiRoot 
}}/org/apache/calcite/rel/core/Minus.html) of the two most recent relational 
expressions.
+| `snapshot(period)` | Creates a [Snapshot]({{ site.apiRoot 
}}/org/apache/calcite/rel/core/Snapshot.html) of the given snapshot period.
 | `match(pattern, strictStart,` `strictEnd, patterns, measures,` `after, 
subsets, allRows,` `partitionKeys, orderKeys,` `interval)` | Creates a 
[Match]({{ site.apiRoot }}/org/apache/calcite/rel/core/Match.html).
 
 Argument types:
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index 198a236..31508dd 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -194,6 +194,7 @@ joinCondition:
 
 tableReference:
       tablePrimary
+      [ FOR SYSTEM_TIME AS OF expression ]
       [ matchRecognize ]
       [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
 

Reply via email to