This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dac9f78  [FLINK-12611][table-planner-blink] Make time indicator 
nullable in blink
dac9f78 is described below

commit dac9f78658efda3339977e05591989cc363f3722
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 27 16:16:25 2019 +0800

    [FLINK-12611][table-planner-blink] Make time indicator nullable in blink
    
    This closes #8530
---
 .../sql/ProctimeMaterializeSqlFunction.java        |  5 ++-
 .../table/functions/sql/ProctimeSqlFunction.java   | 14 +++-----
 .../flink/table/calcite/FlinkTypeFactory.scala     | 38 ++++++++++------------
 .../plan/nodes/calcite/WatermarkAssigner.scala     |  2 +-
 .../plan/schema/TimeIndicatorRelDataType.scala     | 13 +++-----
 .../table/plan/batch/sql/join/LookupJoinTest.xml   | 25 +++++++-------
 .../stream/sql/RelTimeIndicatorConverterTest.xml   | 10 +++---
 .../table/plan/stream/sql/TableSourceTest.xml      |  2 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 28 ++++++++--------
 .../table/plan/batch/sql/join/LookupJoinTest.scala | 15 +++++++--
 .../plan/schema/TimeIndicatorRelDataTypeTest.scala |  4 +--
 11 files changed, 80 insertions(+), 76 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeMaterializeSqlFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeMaterializeSqlFunction.java
index 17b9ea3..a59145c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeMaterializeSqlFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeMaterializeSqlFunction.java
@@ -28,6 +28,7 @@ import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeTransforms;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 
 /**
@@ -40,7 +41,9 @@ public class ProctimeMaterializeSqlFunction extends 
SqlFunction {
                super(
                        "PROCTIME_MATERIALIZE",
                        SqlKind.OTHER_FUNCTION,
-                       ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
+                       ReturnTypes.cascade(
+                                       
ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
+                                       SqlTypeTransforms.TO_NULLABLE),
                        InferTypes.RETURN_TYPE,
                        OperandTypes.family(SqlTypeFamily.TIMESTAMP),
                        SqlFunctionCategory.SYSTEM);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
index b884390..989d554 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
@@ -20,9 +20,6 @@ package org.apache.flink.table.functions.sql;
 
 import org.apache.flink.table.calcite.FlinkTypeFactory;
 
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
@@ -37,16 +34,15 @@ public class ProctimeSqlFunction extends SqlFunction {
                super(
                                "PROCTIME",
                                SqlKind.OTHER_FUNCTION,
-                               ReturnTypes.explicit(new 
ProctimeRelProtoDataType()),
+                               ReturnTypes.explicit(factory ->
+                                               ((FlinkTypeFactory) 
factory).createProctimeIndicatorType(false)),
                                null,
                                OperandTypes.NILADIC,
                                SqlFunctionCategory.TIMEDATE);
        }
 
-       private static class ProctimeRelProtoDataType implements 
RelProtoDataType {
-               @Override
-               public RelDataType apply(RelDataTypeFactory factory) {
-                       return ((FlinkTypeFactory) 
factory).createProctimeIndicatorType();
-               }
+       @Override
+       public boolean isDeterministic() {
+               return false;
        }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 4f6f831..ce40ae2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -128,8 +128,8 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
       case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
         val timestampType = t.asInstanceOf[TimestampType]
         timestampType.getKind match {
-          case TimestampKind.PROCTIME => createProctimeIndicatorType()
-          case TimestampKind.ROWTIME => createRowtimeIndicatorType()
+          case TimestampKind.PROCTIME => createProctimeIndicatorType(true)
+          case TimestampKind.ROWTIME => createRowtimeIndicatorType(true)
           case TimestampKind.REGULAR => createSqlType(TIMESTAMP)
         }
       case _ =>
@@ -148,27 +148,25 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   /**
     * Creates a indicator type for processing-time, but with similar 
properties as SQL timestamp.
     */
-  def createProctimeIndicatorType(): RelDataType = {
-    val originalType = createFieldTypeFromLogicalType(new TimestampType(3))
-    canonize(
-      new TimeIndicatorRelDataType(
-        getTypeSystem,
-        originalType.asInstanceOf[BasicSqlType],
-        isEventTime = false)
-    )
+  def createProctimeIndicatorType(isNullable: Boolean): RelDataType = {
+    val originalType = createFieldTypeFromLogicalType(new 
TimestampType(isNullable, 3))
+    canonize(new TimeIndicatorRelDataType(
+      getTypeSystem,
+      originalType.asInstanceOf[BasicSqlType],
+      isNullable,
+      isEventTime = false))
   }
 
   /**
     * Creates a indicator type for event-time, but with similar properties as 
SQL timestamp.
     */
-  def createRowtimeIndicatorType(): RelDataType = {
-    val originalType = createFieldTypeFromLogicalType(new TimestampType(3))
-    canonize(
-      new TimeIndicatorRelDataType(
-        getTypeSystem,
-        originalType.asInstanceOf[BasicSqlType],
-        isEventTime = true)
-    )
+  def createRowtimeIndicatorType(isNullable: Boolean): RelDataType = {
+    val originalType = createFieldTypeFromLogicalType(new 
TimestampType(isNullable, 3))
+    canonize(new TimeIndicatorRelDataType(
+      getTypeSystem,
+      originalType.asInstanceOf[BasicSqlType],
+      isNullable,
+      isEventTime = true))
   }
 
   /**
@@ -276,8 +274,8 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
       case generic: GenericRelDataType =>
         new GenericRelDataType(generic.genericType, isNullable, typeSystem)
 
-      case timeIndicator: TimeIndicatorRelDataType =>
-        timeIndicator
+      case it: TimeIndicatorRelDataType =>
+        new TimeIndicatorRelDataType(it.typeSystem, it.originalType, 
isNullable, it.isEventTime)
 
       case _ =>
         super.createTypeWithNullability(relDataType, isNullable)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
index 2ac9e34..1db813c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
@@ -44,7 +44,7 @@ abstract class WatermarkAssigner(
     val newFieldList = inputRowType.getFieldList.map { f =>
       rowtimeFieldIndex match {
         case Some(index) if f.getIndex == index =>
-          val rowtimeIndicatorType = typeFactory.createRowtimeIndicatorType()
+          val rowtimeIndicatorType = 
typeFactory.createRowtimeIndicatorType(f.getType.isNullable)
           new RelDataTypeFieldImpl(f.getName, f.getIndex, rowtimeIndicatorType)
         case _ => f
       }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
index c6b4bcc..0a4689c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -28,20 +28,17 @@ import java.lang
   * as a basic SQL type.
   */
 class TimeIndicatorRelDataType(
-    typeSystem: RelDataTypeSystem,
-    originalType: BasicSqlType,
+    val typeSystem: RelDataTypeSystem,
+    val originalType: BasicSqlType,
+    val nullable: Boolean,
     val isEventTime: Boolean)
   extends BasicSqlType(
     typeSystem,
     originalType.getSqlTypeName,
     originalType.getPrecision) {
 
-  override def equals(other: Any): Boolean = other match {
-    case that: TimeIndicatorRelDataType =>
-      super.equals(that) &&
-        isEventTime == that.isEventTime
-    case _ => false
-  }
+  this.isNullable = nullable
+  computeDigest()
 
   override def hashCode(): Int = {
     super.hashCode() + 42 // we change the hash code to differentiate from 
regular timestamps
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index d906d06..e281fb4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -79,8 +79,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME() AS proctime, id, name, age])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], 
async=[false], on=[a=id], select=[a, b, c, id, name, age])
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], 
async=[false], on=[a=id], select=[a, b, c, proctime, id, name, age])
++- Calc(select=[a, b, c, PROCTIME() AS proctime])
    +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -108,9 +108,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME() AS proctime, id, name, CAST(10) AS age])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], 
async=[false], on=[a=id], where=[=(age, 10)], select=[a, b, c, id, name])
-   +- Calc(select=[a, b, c], where=[>(c, 1000)])
+Calc(select=[a, b, c, proctime, id, name, CAST(10) AS age])
++- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], 
async=[false], on=[a=id], where=[=(age, 10)], select=[a, b, c, proctime, id, 
name])
+   +- Calc(select=[a, b, c, PROCTIME() AS proctime], where=[>(c, 1000)])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -134,10 +134,9 @@ LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], 
name=[$4], age=[$5])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, PROCTIME() AS proctime, id, name, age])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], 
async=[false], on=[a=id], select=[a, b, id, name, age])
-   +- Calc(select=[a, b], where=[>(c, 1000)])
-      +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], 
async=[false], on=[a=id], select=[a, b, proctime, id, name, age])
++- Calc(select=[a, b, PROCTIME() AS proctime], where=[>(c, 1000)])
+   +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -158,8 +157,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME() AS proctime, id, name, age])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], 
joinType=[LeftOuterJoin], async=[false], on=[a=id], select=[a, b, c, id, name, 
age])
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[LeftOuterJoin], 
async=[false], on=[a=id], select=[a, b, c, proctime, id, name, age])
++- Calc(select=[a, b, c, PROCTIME() AS proctime])
    +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -186,8 +185,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME() AS proctime, id])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], 
async=[false], on=[a=id], where=[], select=[a, b, c, id])
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], 
async=[false], on=[a=id], where=[], select=[a, b, c, proctime, id])
++- Calc(select=[a, b, c, PROCTIME() AS proctime])
    +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RelTimeIndicatorConverterTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RelTimeIndicatorConverterTest.xml
index 3191737..381c804 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RelTimeIndicatorConverterTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RelTimeIndicatorConverterTest.xml
@@ -147,7 +147,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], 
EXPR$2=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CAST(w$end) AS EXPR$0, long, EXPR$2])
+Calc(select=[w$end AS EXPR$0, long, EXPR$2])
 +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS 
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[long]])
       +- Calc(select=[w$rowtime AS $f0, long, int])
@@ -213,7 +213,7 @@ LogicalProject(rowtime=[$0], proctime=[$3], s=[$4])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, f0 AS s])
-+- Correlate(invocation=[tableFunc(CAST($cor0.rowtime):TIMESTAMP(3) NOT NULL, 
PROCTIME_MATERIALIZE($cor0.proctime), _UTF-16LE'')], 
correlate=[table(tableFunc(CAST($cor0.rowtime),PROCTIME_MATERIALIZE($cor0.proctime),_UTF-16LE''))],
 select=[rowtime,long,int,proctime,f0], rowType=[RecordType(TIME 
ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, TIME ATTRIBUTE(PROCTIME) 
proctime, VARCHAR(2147483647) f0)], joinType=[INNER])
++- Correlate(invocation=[tableFunc(CAST($cor0.rowtime):TIMESTAMP(3), 
PROCTIME_MATERIALIZE($cor0.proctime), _UTF-16LE'')], 
correlate=[table(tableFunc(CAST($cor0.rowtime),PROCTIME_MATERIALIZE($cor0.proctime),_UTF-16LE''))],
 select=[rowtime,long,int,proctime,f0], rowType=[RecordType(TIME 
ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, TIME ATTRIBUTE(PROCTIME) 
proctime, VARCHAR(2147483647) f0)], joinType=[INNER])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[rowtime, long, int, proctime])
 ]]>
     </Resource>
@@ -285,7 +285,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], 
EXPR$2=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CAST(w$end) AS EXPR$0, long, EXPR$2])
+Calc(select=[w$end AS EXPR$0, long, EXPR$2])
 +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS 
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[long]])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], 
fields=[rowtime, long, int])
@@ -311,7 +311,7 @@ LogicalProject(EXPR$0=[$2], long=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0, long], where=[=(EXTRACT(FLAG(QUARTER), CAST(w$end)), 
1:BIGINT)])
+Calc(select=[EXPR$0, long], where=[=(EXTRACT(FLAG(QUARTER), w$end), 1:BIGINT)])
 +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, MIN(rowtime0) 
AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[long]])
       +- Calc(select=[long, rowtime, CAST(rowtime) AS rowtime0])
@@ -340,7 +340,7 @@ LogicalProject(rowtime=[TUMBLE_END($1)], long=[$0], 
EXPR$2=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CAST(w$end) AS rowtime, long, EXPR$2])
+Calc(select=[w$end AS rowtime, long, EXPR$2])
 +- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS 
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[long]])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], 
fields=[rowtime, long, int])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
index 1a599a8..ce0243c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
@@ -108,7 +108,7 @@ LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], 
EXPR$2=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[name, CAST(w$end) AS EXPR$1, EXPR$2])
+Calc(select=[name, w$end AS EXPR$1, EXPR$2])
 +- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[name, AVG(val) AS 
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[name]])
       +- Calc(select=[name, rowtime, val], where=[>(val, 100)])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/WindowAggregateTest.xml
index ce1da72..5847055 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -40,7 +40,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], 
EXPR$3=[$4], EXPR$4=[TUMBL
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0, 
/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 
1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2), 
0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), 
CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3, 
CAST(w$start) AS EXPR$4, CAST(w$end) AS EXPR$5])
+Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0, 
/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 
1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2), 
0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), 
CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3, w$start 
AS EXPR$4, w$end AS EXPR$5])
 +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, 
w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0, SUM(c) AS $f1, COUNT(c) 
AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime, c, *(CAST(c), CAST(c)) AS $f2])
@@ -67,7 +67,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[+(TUMBLE_END($0), 
60000:INTERVAL MINUTE)])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0, +(CAST(w$end), 60000:INTERVAL MINUTE) AS EXPR$1])
+Calc(select=[EXPR$0, +(w$end, 60000:INTERVAL MINUTE) AS EXPR$1])
 +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, 
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime])
@@ -97,7 +97,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[HOP_START($0)])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0, CAST(w$start) AS EXPR$1], where=[AND(>($f1, 0), 
=(EXTRACT(FLAG(QUARTER), CAST(w$start)), 1:BIGINT))])
+Calc(select=[EXPR$0, w$start AS EXPR$1], where=[AND(>($f1, 0), 
=(EXTRACT(FLAG(QUARTER), w$start), 1:BIGINT))])
 +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 60000, 
900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) 
AS EXPR$0, SUM(a) AS $f1, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime, a])
@@ -126,7 +126,7 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2], 
EXPR$2=[HOP_START($0)], EXPR$3=[HOP_END($
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0, wAvg, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3])
+Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
 +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, proctime, 3600000, 
900000)], properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$0, 
weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[proctime, c, a])
@@ -203,12 +203,12 @@ LogicalUnion(all=[true])
     <Resource name="planAfter">
       <![CDATA[
 Union(all=[true], union=[EXPR$0, EXPR$1, EXPR$2, EXPR$3])
-:- Calc(select=[CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, EXPR$2, 
EXPR$3])
+:- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3])
 :  +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 
60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) 
AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
 :     +- Exchange(distribution=[single], reuse_id=[1])
 :        +- Calc(select=[rowtime, c])
 :           +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
-+- Calc(select=[CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, EXPR$2, 
EXPR$3])
++- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3])
    +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 86400000, 
60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) 
AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
       +- Reused(reference_id=[1])
 ]]>
@@ -253,15 +253,15 @@ LogicalProject(hs1=[$0], he1=[$1], c1=[$2], s1=[$3], 
hs2=[$4], he2=[$5], c2=[$6]
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Join(joinType=[InnerJoin], where=[=(he1, he2)], select=[hs1, he1, c1, s1, hs2, 
he2, c2, s2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+Join(joinType=[InnerJoin], where=[=(he1, he2)], select=[hs1, he1, c1, s1, hs2, 
he2, c2, s2], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
 :- Exchange(distribution=[hash[he1]])
-:  +- Calc(select=[CAST(w$start) AS hs1, CAST(w$end) AS he1, c1, s1], 
where=[IS NOT NULL(s1)])
+:  +- Calc(select=[w$start AS hs1, w$end AS he1, c1, s1], where=[IS NOT 
NULL(s1)])
 :     +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 
3600000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[COUNT(*) AS c1, SUM(c) AS s1, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
 :        +- Exchange(distribution=[single], reuse_id=[1])
 :           +- Calc(select=[rowtime, c])
 :              +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 +- Exchange(distribution=[hash[he2]])
-   +- Calc(select=[CAST(w$start) AS hs2, CAST(w$end) AS he2, c2, s2])
+   +- Calc(select=[w$start AS hs2, w$end AS he2, c2, s2])
       +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 
86400000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[COUNT(*) AS c2, SUM(c) AS s2, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
          +- Reused(reference_id=[1])
 ]]>
@@ -296,7 +296,7 @@ LogicalProject(EXPR$0=[TUMBLE_ROWTIME($0)], 
EXPR$1=[TUMBLE_END($0)], a=[$1])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[w$rowtime AS EXPR$0, CAST(w$end) AS EXPR$1, a])
+Calc(select=[w$rowtime AS EXPR$0, w$end AS EXPR$1, a])
 +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, 
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS a, start('w$) AS w$start, 
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[w$rowtime AS $f0, a])
@@ -361,7 +361,7 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2], 
EXPR$2=[SESSION_START($0)], EXPR$3=[SESSI
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0, wAvg, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3])
+Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
 +- GroupWindowAggregate(window=[SessionGroupWindow('w$, proctime, 900000)], 
properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$0, 
weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[proctime, c, a])
@@ -395,7 +395,7 @@ LogicalProject(EXPR$0=[$3])
 Calc(select=[EXPR$0])
 +- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, 
weightedAvg(c, a) AS EXPR$0])
    +- Exchange(distribution=[hash[b, d, ping_start]])
-      +- Calc(select=[b, d, CAST(w$start) AS ping_start, c, a])
+      +- Calc(select=[b, d, w$start AS ping_start, c, a])
          +- GroupWindowAggregate(groupBy=[a, b, c], 
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, 
w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             +- Exchange(distribution=[hash[a, b, c]])
                +- Calc(select=[a, b, c, rowtime])
@@ -430,7 +430,7 @@ LogicalProject(EXPR$0=[$2])
 Calc(select=[EXPR$0])
 +- GroupAggregate(groupBy=[b, ping_start], select=[b, ping_start, 
weightedAvg(c, a) AS EXPR$0])
    +- Exchange(distribution=[hash[b, ping_start]])
-      +- Calc(select=[b, CAST(w$start) AS ping_start, c, a])
+      +- Calc(select=[b, w$start AS ping_start, c, a])
          +- GroupWindowAggregate(groupBy=[a, b, c], 
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, 
w$proctime], select=[a, b, c, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             +- Exchange(distribution=[hash[a, b, c]])
                +- Calc(select=[a, b, c, rowtime])
@@ -459,7 +459,7 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2], 
EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0, wAvg, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3])
+Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
 +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, 
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS 
wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime, c, a])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 017e1a1..8bc5cc2 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -56,11 +56,22 @@ class LookupJoinTest extends TableTestBase {
     )
 
     // only support left or inner join
+    // Calcite does not allow FOR SYSTEM_TIME AS OF non-nullable left table 
field to Right Join.
+    // There is a exception:
+    // java.lang.AssertionError
+    //    at SqlToRelConverter.getCorrelationUse(SqlToRelConverter.java:2517)
+    //    at SqlToRelConverter.createJoin(SqlToRelConverter.java:2426)
+    //    at SqlToRelConverter.convertFrom(SqlToRelConverter.java:2071)
+    //    at SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
+    //    at SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
+    //    at 
SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3100)
+    //    at SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
+    //    at 
org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:125)
     expectExceptionThrown(
       "SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id",
-      "Unsupported join type for semi-join RIGHT",
-      classOf[IllegalArgumentException]
+      null,
+      classOf[AssertionError]
     )
 
     // only support join on raw key of right table
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataTypeTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataTypeTest.scala
index 287662b..396d148 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataTypeTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataTypeTest.scala
@@ -33,10 +33,10 @@ class TimeIndicatorRelDataTypeTest {
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
     assertEquals(
       "TIME ATTRIBUTE(PROCTIME) NOT NULL",
-      typeFactory.createProctimeIndicatorType().getFullTypeString)
+      typeFactory.createProctimeIndicatorType(false).getFullTypeString)
     assertEquals(
       "TIME ATTRIBUTE(ROWTIME) NOT NULL",
-      typeFactory.createRowtimeIndicatorType().getFullTypeString)
+      typeFactory.createRowtimeIndicatorType(false).getFullTypeString)
   }
 
 }

Reply via email to