This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dac9f78 [FLINK-12611][table-planner-blink] Make time indicator
nullable in blink
dac9f78 is described below
commit dac9f78658efda3339977e05591989cc363f3722
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 27 16:16:25 2019 +0800
[FLINK-12611][table-planner-blink] Make time indicator nullable in blink
This closes #8530
---
.../sql/ProctimeMaterializeSqlFunction.java | 5 ++-
.../table/functions/sql/ProctimeSqlFunction.java | 14 +++-----
.../flink/table/calcite/FlinkTypeFactory.scala | 38 ++++++++++------------
.../plan/nodes/calcite/WatermarkAssigner.scala | 2 +-
.../plan/schema/TimeIndicatorRelDataType.scala | 13 +++-----
.../table/plan/batch/sql/join/LookupJoinTest.xml | 25 +++++++-------
.../stream/sql/RelTimeIndicatorConverterTest.xml | 10 +++---
.../table/plan/stream/sql/TableSourceTest.xml | 2 +-
.../plan/stream/sql/agg/WindowAggregateTest.xml | 28 ++++++++--------
.../table/plan/batch/sql/join/LookupJoinTest.scala | 15 +++++++--
.../plan/schema/TimeIndicatorRelDataTypeTest.scala | 4 +--
11 files changed, 80 insertions(+), 76 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeMaterializeSqlFunction.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeMaterializeSqlFunction.java
index 17b9ea3..a59145c 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeMaterializeSqlFunction.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeMaterializeSqlFunction.java
@@ -28,6 +28,7 @@ import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeTransforms;
import org.apache.calcite.sql.validate.SqlMonotonicity;
/**
@@ -40,7 +41,9 @@ public class ProctimeMaterializeSqlFunction extends
SqlFunction {
super(
"PROCTIME_MATERIALIZE",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
+ ReturnTypes.cascade(
+
ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
+ SqlTypeTransforms.TO_NULLABLE),
InferTypes.RETURN_TYPE,
OperandTypes.family(SqlTypeFamily.TIMESTAMP),
SqlFunctionCategory.SYSTEM);
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
index b884390..989d554 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/ProctimeSqlFunction.java
@@ -20,9 +20,6 @@ package org.apache.flink.table.functions.sql;
import org.apache.flink.table.calcite.FlinkTypeFactory;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
@@ -37,16 +34,15 @@ public class ProctimeSqlFunction extends SqlFunction {
super(
"PROCTIME",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.explicit(new
ProctimeRelProtoDataType()),
+ ReturnTypes.explicit(factory ->
+ ((FlinkTypeFactory)
factory).createProctimeIndicatorType(false)),
null,
OperandTypes.NILADIC,
SqlFunctionCategory.TIMEDATE);
}
- private static class ProctimeRelProtoDataType implements
RelProtoDataType {
- @Override
- public RelDataType apply(RelDataTypeFactory factory) {
- return ((FlinkTypeFactory)
factory).createProctimeIndicatorType();
- }
+ @Override
+ public boolean isDeterministic() {
+ return false;
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 4f6f831..ce40ae2 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -128,8 +128,8 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
extends JavaTypeFactoryImp
case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
val timestampType = t.asInstanceOf[TimestampType]
timestampType.getKind match {
- case TimestampKind.PROCTIME => createProctimeIndicatorType()
- case TimestampKind.ROWTIME => createRowtimeIndicatorType()
+ case TimestampKind.PROCTIME => createProctimeIndicatorType(true)
+ case TimestampKind.ROWTIME => createRowtimeIndicatorType(true)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP)
}
case _ =>
@@ -148,27 +148,25 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
extends JavaTypeFactoryImp
/**
* Creates a indicator type for processing-time, but with similar
properties as SQL timestamp.
*/
- def createProctimeIndicatorType(): RelDataType = {
- val originalType = createFieldTypeFromLogicalType(new TimestampType(3))
- canonize(
- new TimeIndicatorRelDataType(
- getTypeSystem,
- originalType.asInstanceOf[BasicSqlType],
- isEventTime = false)
- )
+ def createProctimeIndicatorType(isNullable: Boolean): RelDataType = {
+ val originalType = createFieldTypeFromLogicalType(new
TimestampType(isNullable, 3))
+ canonize(new TimeIndicatorRelDataType(
+ getTypeSystem,
+ originalType.asInstanceOf[BasicSqlType],
+ isNullable,
+ isEventTime = false))
}
/**
* Creates a indicator type for event-time, but with similar properties as
SQL timestamp.
*/
- def createRowtimeIndicatorType(): RelDataType = {
- val originalType = createFieldTypeFromLogicalType(new TimestampType(3))
- canonize(
- new TimeIndicatorRelDataType(
- getTypeSystem,
- originalType.asInstanceOf[BasicSqlType],
- isEventTime = true)
- )
+ def createRowtimeIndicatorType(isNullable: Boolean): RelDataType = {
+ val originalType = createFieldTypeFromLogicalType(new
TimestampType(isNullable, 3))
+ canonize(new TimeIndicatorRelDataType(
+ getTypeSystem,
+ originalType.asInstanceOf[BasicSqlType],
+ isNullable,
+ isEventTime = true))
}
/**
@@ -276,8 +274,8 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
extends JavaTypeFactoryImp
case generic: GenericRelDataType =>
new GenericRelDataType(generic.genericType, isNullable, typeSystem)
- case timeIndicator: TimeIndicatorRelDataType =>
- timeIndicator
+ case it: TimeIndicatorRelDataType =>
+ new TimeIndicatorRelDataType(it.typeSystem, it.originalType,
isNullable, it.isEventTime)
case _ =>
super.createTypeWithNullability(relDataType, isNullable)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
index 2ac9e34..1db813c 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala
@@ -44,7 +44,7 @@ abstract class WatermarkAssigner(
val newFieldList = inputRowType.getFieldList.map { f =>
rowtimeFieldIndex match {
case Some(index) if f.getIndex == index =>
- val rowtimeIndicatorType = typeFactory.createRowtimeIndicatorType()
+ val rowtimeIndicatorType =
typeFactory.createRowtimeIndicatorType(f.getType.isNullable)
new RelDataTypeFieldImpl(f.getName, f.getIndex, rowtimeIndicatorType)
case _ => f
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
index c6b4bcc..0a4689c 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -28,20 +28,17 @@ import java.lang
* as a basic SQL type.
*/
class TimeIndicatorRelDataType(
- typeSystem: RelDataTypeSystem,
- originalType: BasicSqlType,
+ val typeSystem: RelDataTypeSystem,
+ val originalType: BasicSqlType,
+ val nullable: Boolean,
val isEventTime: Boolean)
extends BasicSqlType(
typeSystem,
originalType.getSqlTypeName,
originalType.getPrecision) {
- override def equals(other: Any): Boolean = other match {
- case that: TimeIndicatorRelDataType =>
- super.equals(that) &&
- isEventTime == that.isEventTime
- case _ => false
- }
+ this.isNullable = nullable
+ computeDigest()
override def hashCode(): Int = {
super.hashCode() + 42 // we change the hash code to differentiate from
regular timestamps
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index d906d06..e281fb4 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -79,8 +79,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3],
id=[$4], name=[$5], age=[$
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c, PROCTIME() AS proctime, id, name, age])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin],
async=[false], on=[a=id], select=[a, b, c, id, name, age])
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin],
async=[false], on=[a=id], select=[a, b, c, proctime, id, name, age])
++- Calc(select=[a, b, c, PROCTIME() AS proctime])
+- BoundedStreamScan(table=[[default_catalog, default_database, T0]],
fields=[a, b, c])
]]>
</Resource>
@@ -108,9 +108,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3],
id=[$4], name=[$5], age=[$
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c, PROCTIME() AS proctime, id, name, CAST(10) AS age])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin],
async=[false], on=[a=id], where=[=(age, 10)], select=[a, b, c, id, name])
- +- Calc(select=[a, b, c], where=[>(c, 1000)])
+Calc(select=[a, b, c, proctime, id, name, CAST(10) AS age])
++- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin],
async=[false], on=[a=id], where=[=(age, 10)], select=[a, b, c, proctime, id,
name])
+ +- Calc(select=[a, b, c, PROCTIME() AS proctime], where=[>(c, 1000)])
+- BoundedStreamScan(table=[[default_catalog, default_database, T0]],
fields=[a, b, c])
]]>
</Resource>
@@ -134,10 +134,9 @@ LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3],
name=[$4], age=[$5])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, PROCTIME() AS proctime, id, name, age])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin],
async=[false], on=[a=id], select=[a, b, id, name, age])
- +- Calc(select=[a, b], where=[>(c, 1000)])
- +- BoundedStreamScan(table=[[default_catalog, default_database, T0]],
fields=[a, b, c])
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin],
async=[false], on=[a=id], select=[a, b, proctime, id, name, age])
++- Calc(select=[a, b, PROCTIME() AS proctime], where=[>(c, 1000)])
+ +- BoundedStreamScan(table=[[default_catalog, default_database, T0]],
fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -158,8 +157,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3],
id=[$4], name=[$5], age=[$
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c, PROCTIME() AS proctime, id, name, age])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)],
joinType=[LeftOuterJoin], async=[false], on=[a=id], select=[a, b, c, id, name,
age])
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[LeftOuterJoin],
async=[false], on=[a=id], select=[a, b, c, proctime, id, name, age])
++- Calc(select=[a, b, c, PROCTIME() AS proctime])
+- BoundedStreamScan(table=[[default_catalog, default_database, T0]],
fields=[a, b, c])
]]>
</Resource>
@@ -186,8 +185,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3],
id=[$4])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c, PROCTIME() AS proctime, id])
-+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin],
async=[false], on=[a=id], where=[], select=[a, b, c, id])
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin],
async=[false], on=[a=id], where=[], select=[a, b, c, proctime, id])
++- Calc(select=[a, b, c, PROCTIME() AS proctime])
+- BoundedStreamScan(table=[[default_catalog, default_database, T0]],
fields=[a, b, c])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RelTimeIndicatorConverterTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RelTimeIndicatorConverterTest.xml
index 3191737..381c804 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RelTimeIndicatorConverterTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/RelTimeIndicatorConverterTest.xml
@@ -147,7 +147,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1],
EXPR$2=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[CAST(w$end) AS EXPR$0, long, EXPR$2])
+Calc(select=[w$end AS EXPR$0, long, EXPR$2])
+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- Calc(select=[w$rowtime AS $f0, long, int])
@@ -213,7 +213,7 @@ LogicalProject(rowtime=[$0], proctime=[$3], s=[$4])
<Resource name="planAfter">
<![CDATA[
Calc(select=[rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, f0 AS s])
-+- Correlate(invocation=[tableFunc(CAST($cor0.rowtime):TIMESTAMP(3) NOT NULL,
PROCTIME_MATERIALIZE($cor0.proctime), _UTF-16LE'')],
correlate=[table(tableFunc(CAST($cor0.rowtime),PROCTIME_MATERIALIZE($cor0.proctime),_UTF-16LE''))],
select=[rowtime,long,int,proctime,f0], rowType=[RecordType(TIME
ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, TIME ATTRIBUTE(PROCTIME)
proctime, VARCHAR(2147483647) f0)], joinType=[INNER])
++- Correlate(invocation=[tableFunc(CAST($cor0.rowtime):TIMESTAMP(3),
PROCTIME_MATERIALIZE($cor0.proctime), _UTF-16LE'')],
correlate=[table(tableFunc(CAST($cor0.rowtime),PROCTIME_MATERIALIZE($cor0.proctime),_UTF-16LE''))],
select=[rowtime,long,int,proctime,f0], rowType=[RecordType(TIME
ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, TIME ATTRIBUTE(PROCTIME)
proctime, VARCHAR(2147483647) f0)], joinType=[INNER])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[rowtime, long, int, proctime])
]]>
</Resource>
@@ -285,7 +285,7 @@ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1],
EXPR$2=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[CAST(w$end) AS EXPR$0, long, EXPR$2])
+Calc(select=[w$end AS EXPR$0, long, EXPR$2])
+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]],
fields=[rowtime, long, int])
@@ -311,7 +311,7 @@ LogicalProject(EXPR$0=[$2], long=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[EXPR$0, long], where=[=(EXTRACT(FLAG(QUARTER), CAST(w$end)),
1:BIGINT)])
+Calc(select=[EXPR$0, long], where=[=(EXTRACT(FLAG(QUARTER), w$end), 1:BIGINT)])
+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, MIN(rowtime0)
AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- Calc(select=[long, rowtime, CAST(rowtime) AS rowtime0])
@@ -340,7 +340,7 @@ LogicalProject(rowtime=[TUMBLE_END($1)], long=[$0],
EXPR$2=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[CAST(w$end) AS rowtime, long, EXPR$2])
+Calc(select=[w$end AS rowtime, long, EXPR$2])
+- GroupWindowAggregate(groupBy=[long], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[long, SUM(int) AS
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[long]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]],
fields=[rowtime, long, int])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
index 1a599a8..ce0243c 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
@@ -108,7 +108,7 @@ LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)],
EXPR$2=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[name, CAST(w$end) AS EXPR$1, EXPR$2])
+Calc(select=[name, w$end AS EXPR$1, EXPR$2])
+- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow],
properties=[w$start, w$end, w$rowtime, w$proctime], select=[name, AVG(val) AS
EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[name]])
+- Calc(select=[name, rowtime, val], where=[>(val, 100)])
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/WindowAggregateTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/WindowAggregateTest.xml
index ce1da72..5847055 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/WindowAggregateTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -40,7 +40,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3],
EXPR$3=[$4], EXPR$4=[TUMBL
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0,
/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2,
1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2),
0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)),
CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3,
CAST(w$start) AS EXPR$4, CAST(w$end) AS EXPR$5])
+Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0,
/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2,
1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2),
0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)),
CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3, w$start
AS EXPR$4, w$end AS EXPR$5])
+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0, SUM(c) AS $f1, COUNT(c)
AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, c, *(CAST(c), CAST(c)) AS $f2])
@@ -67,7 +67,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[+(TUMBLE_END($0),
60000:INTERVAL MINUTE)])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[EXPR$0, +(CAST(w$end), 60000:INTERVAL MINUTE) AS EXPR$1])
+Calc(select=[EXPR$0, +(w$end, 60000:INTERVAL MINUTE) AS EXPR$1])
+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, start('w$) AS
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime])
@@ -97,7 +97,7 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[HOP_START($0)])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[EXPR$0, CAST(w$start) AS EXPR$1], where=[AND(>($f1, 0),
=(EXTRACT(FLAG(QUARTER), CAST(w$start)), 1:BIGINT))])
+Calc(select=[EXPR$0, w$start AS EXPR$1], where=[AND(>($f1, 0),
=(EXTRACT(FLAG(QUARTER), w$start), 1:BIGINT))])
+- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 60000,
900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*)
AS EXPR$0, SUM(a) AS $f1, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, a])
@@ -126,7 +126,7 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2],
EXPR$2=[HOP_START($0)], EXPR$3=[HOP_END($
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[EXPR$0, wAvg, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3])
+Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
+- GroupWindowAggregate(window=[SlidingGroupWindow('w$, proctime, 3600000,
900000)], properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$0,
weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[proctime, c, a])
@@ -203,12 +203,12 @@ LogicalUnion(all=[true])
<Resource name="planAfter">
<![CDATA[
Union(all=[true], union=[EXPR$0, EXPR$1, EXPR$2, EXPR$3])
-:- Calc(select=[CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, EXPR$2,
EXPR$3])
+:- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3])
: +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000,
60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*)
AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
: +- Exchange(distribution=[single], reuse_id=[1])
: +- Calc(select=[rowtime, c])
: +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
-+- Calc(select=[CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, EXPR$2,
EXPR$3])
++- Calc(select=[w$start AS EXPR$0, w$end AS EXPR$1, EXPR$2, EXPR$3])
+- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 86400000,
60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*)
AS EXPR$2, SUM(c) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Reused(reference_id=[1])
]]>
@@ -253,15 +253,15 @@ LogicalProject(hs1=[$0], he1=[$1], c1=[$2], s1=[$3],
hs2=[$4], he2=[$5], c2=[$6]
</Resource>
<Resource name="planAfter">
<![CDATA[
-Join(joinType=[InnerJoin], where=[=(he1, he2)], select=[hs1, he1, c1, s1, hs2,
he2, c2, s2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+Join(joinType=[InnerJoin], where=[=(he1, he2)], select=[hs1, he1, c1, s1, hs2,
he2, c2, s2], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[he1]])
-: +- Calc(select=[CAST(w$start) AS hs1, CAST(w$end) AS he1, c1, s1],
where=[IS NOT NULL(s1)])
+: +- Calc(select=[w$start AS hs1, w$end AS he1, c1, s1], where=[IS NOT
NULL(s1)])
: +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime,
3600000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[COUNT(*) AS c1, SUM(c) AS s1, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
: +- Exchange(distribution=[single], reuse_id=[1])
: +- Calc(select=[rowtime, c])
: +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+- Exchange(distribution=[hash[he2]])
- +- Calc(select=[CAST(w$start) AS hs2, CAST(w$end) AS he2, c2, s2])
+ +- Calc(select=[w$start AS hs2, w$end AS he2, c2, s2])
+- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime,
86400000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[COUNT(*) AS c2, SUM(c) AS s2, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Reused(reference_id=[1])
]]>
@@ -296,7 +296,7 @@ LogicalProject(EXPR$0=[TUMBLE_ROWTIME($0)],
EXPR$1=[TUMBLE_END($0)], a=[$1])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[w$rowtime AS EXPR$0, CAST(w$end) AS EXPR$1, a])
+Calc(select=[w$rowtime AS EXPR$0, w$end AS EXPR$1, a])
+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS a, start('w$) AS w$start,
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[w$rowtime AS $f0, a])
@@ -361,7 +361,7 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2],
EXPR$2=[SESSION_START($0)], EXPR$3=[SESSI
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[EXPR$0, wAvg, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3])
+Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
+- GroupWindowAggregate(window=[SessionGroupWindow('w$, proctime, 900000)],
properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$0,
weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[proctime, c, a])
@@ -395,7 +395,7 @@ LogicalProject(EXPR$0=[$3])
Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start,
weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b, d, ping_start]])
- +- Calc(select=[b, d, CAST(w$start) AS ping_start, c, a])
+ +- Calc(select=[b, d, w$start AS ping_start, c, a])
+- GroupWindowAggregate(groupBy=[a, b, c],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime,
w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
@@ -430,7 +430,7 @@ LogicalProject(EXPR$0=[$2])
Calc(select=[EXPR$0])
+- GroupAggregate(groupBy=[b, ping_start], select=[b, ping_start,
weightedAvg(c, a) AS EXPR$0])
+- Exchange(distribution=[hash[b, ping_start]])
- +- Calc(select=[b, CAST(w$start) AS ping_start, c, a])
+ +- Calc(select=[b, w$start AS ping_start, c, a])
+- GroupWindowAggregate(groupBy=[a, b, c],
window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime,
w$proctime], select=[a, b, c, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+- Exchange(distribution=[hash[a, b, c]])
+- Calc(select=[a, b, c, rowtime])
@@ -459,7 +459,7 @@ LogicalProject(EXPR$0=[$1], wAvg=[$2],
EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[EXPR$0, wAvg, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3])
+Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
+- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS
wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
proctime('w$) AS w$proctime])
+- Exchange(distribution=[single])
+- Calc(select=[rowtime, c, a])
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 017e1a1..8bc5cc2 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -56,11 +56,22 @@ class LookupJoinTest extends TableTestBase {
)
// only support left or inner join
+ // Calcite does not allow FOR SYSTEM_TIME AS OF non-nullable left table
field to Right Join.
+ // There is a exception:
+ // java.lang.AssertionError
+ // at SqlToRelConverter.getCorrelationUse(SqlToRelConverter.java:2517)
+ // at SqlToRelConverter.createJoin(SqlToRelConverter.java:2426)
+ // at SqlToRelConverter.convertFrom(SqlToRelConverter.java:2071)
+ // at SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
+ // at SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
+ // at
SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3100)
+ // at SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
+ // at
org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:125)
expectExceptionThrown(
"SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id",
- "Unsupported join type for semi-join RIGHT",
- classOf[IllegalArgumentException]
+ null,
+ classOf[AssertionError]
)
// only support join on raw key of right table
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataTypeTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataTypeTest.scala
index 287662b..396d148 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataTypeTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataTypeTest.scala
@@ -33,10 +33,10 @@ class TimeIndicatorRelDataTypeTest {
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
assertEquals(
"TIME ATTRIBUTE(PROCTIME) NOT NULL",
- typeFactory.createProctimeIndicatorType().getFullTypeString)
+ typeFactory.createProctimeIndicatorType(false).getFullTypeString)
assertEquals(
"TIME ATTRIBUTE(ROWTIME) NOT NULL",
- typeFactory.createRowtimeIndicatorType().getFullTypeString)
+ typeFactory.createRowtimeIndicatorType(false).getFullTypeString)
}
}