This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c2d9e69 [FLINK-21733][table-planner-blink] WatermarkAssigner
incorrectly recomputing the rowtime index which may cause
ArrayIndexOutOfBoundsException (#15150)
c2d9e69 is described below
commit c2d9e69450e0a1bf14aa866ecdd8f5c38072923d
Author: lincoln lee <[email protected]>
AuthorDate: Fri Mar 12 15:20:22 2021 +0800
[FLINK-21733][table-planner-blink] WatermarkAssigner incorrectly
recomputing the rowtime index which may cause ArrayIndexOutOfBoundsException
(#15150)
---
.../plan/nodes/calcite/WatermarkAssigner.scala | 6 +-----
.../plan/stream/sql/SourceWatermarkTest.xml | 18 +++++++++++++++++
.../plan/stream/sql/SourceWatermarkTest.scala | 23 ++++++++++++++++++++++
3 files changed, 42 insertions(+), 5 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
index 99756bf..459981a 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
@@ -65,11 +65,7 @@ abstract class WatermarkAssigner(
}
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
- val rowtimeFieldName =
inputRel.getRowType.getFieldNames.get(rowtimeFieldIndex)
- val newInputRel = inputs.get(0)
- // the input fields maybe reordered, re-computed the rowtime index
- val newIndex =
newInputRel.getRowType.getFieldNames.indexOf(rowtimeFieldName)
- copy(traitSet, newInputRel, newIndex, watermarkExpr)
+ copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr)
}
/**
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
index d2d5632..93b8d8b 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
@@ -130,4 +130,22 @@ Calc(select=[a, b], where=[(b > 10)])
]]>
</Resource>
</TestCase>
+ <TestCase name="testProjectTransposeWatermarkAssigner">
+ <Resource name="sql">
+ <![CDATA[SELECT a, b, ts FROM t1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], ts=[$5])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($5, 10000:INTERVAL
SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], t=[$4], ts=[$4])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, t1, project=[a, b,
t], watermark=[-($2, 10000:INTERVAL SECOND)]]], fields=[a, b, t])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
index 54d657d..cc0c24c 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
@@ -137,4 +137,27 @@ class SourceWatermarkTest extends TableTestBase {
def testWatermarkWithMetadata(): Unit = {
util.verifyExecPlan("SELECT a, b FROM MyTable")
}
+
+ @Test
+ def testProjectTransposeWatermarkAssigner(): Unit = {
+ val sourceDDL =
+ s"""
+ |CREATE TEMPORARY TABLE `t1` (
+ | `a` VARCHAR,
+ | `b` VARCHAR,
+ | `c` VARCHAR,
+ | `d` INT,
+ | `t` TIMESTAMP(3),
+ | `ts` AS `t`,
+ | WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND
+ |) WITH (
+ | 'connector' = 'values',
+ | 'enable-watermark-push-down' = 'true',
+ | 'bounded' = 'false',
+ | 'disable-lookup' = 'true'
+ |)
+ """.stripMargin
+ util.tableEnv.executeSql(sourceDDL)
+ util.verifyExecPlan("SELECT a, b, ts FROM t1")
+ }
}