This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4e088dec0f2 [FLINK-38866][flink-table-planner] Passing timestamp
precision to window operators to respect BinaryRowData non-compact timestamp
form
4e088dec0f2 is described below
commit 4e088dec0f24e845809606ddacc2391dadfe6c73
Author: Matt Cuento <[email protected]>
AuthorDate: Tue Mar 3 05:24:54 2026 -0800
[FLINK-38866][flink-table-planner] Passing timestamp precision to window
operators to respect BinaryRowData non-compact timestamp form
---
.../exec/common/CommonExecWindowTableFunction.java | 8 +-
.../exec/stream/StreamExecWindowTableFunction.java | 4 +
...ndowTableFunctionEventTimeBatchRestoreTest.java | 3 +-
.../common/WindowTableFunctionTestPrograms.java | 43 ++
...window-table-function-tumble-tvf-union-all.json | 447 +++++++++++++++++++++
.../AlignedWindowTableFunctionOperator.java | 5 +-
.../UnalignedWindowTableFunctionOperator.java | 5 +-
.../operator/WindowTableFunctionOperatorBase.java | 4 +
.../AlignedWindowTableFunctionOperatorTest.java | 2 +-
.../UnalignedWindowTableFunctionOperatorTest.java | 1 +
.../WindowTableFunctionOperatorTestBase.java | 2 +
11 files changed, 517 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
index df205561781..dc4d47d4a09 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
@@ -40,6 +40,7 @@ import
org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableF
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -110,8 +111,13 @@ public abstract class CommonExecWindowTableFunction
extends ExecNodeBase<RowData
TimeWindowUtil.getShiftTimeZone(
windowingStrategy.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
+ final int timestampPrecision =
+
LogicalTypeChecks.getPrecision(windowingStrategy.getTimeAttributeType());
return new AlignedWindowTableFunctionOperator(
- windowAssigner, windowingStrategy.getTimeAttributeIndex(),
shiftTimeZone);
+ windowAssigner,
+ windowingStrategy.getTimeAttributeIndex(),
+ timestampPrecision,
+ shiftTimeZone);
}
protected abstract Transformation<RowData> translateWithUnalignedWindow(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
index 28fd8c128ee..f2bd3cf0a2e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
@@ -46,6 +46,7 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -152,12 +153,15 @@ public class StreamExecWindowTableFunction extends
CommonExecWindowTableFunction
TimeWindowUtil.getShiftTimeZone(
windowingStrategy.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
+ final int timestampPrecision =
+
LogicalTypeChecks.getPrecision(windowingStrategy.getTimeAttributeType());
return new UnalignedWindowTableFunctionOperator(
windowAssigner,
windowAssigner.getWindowSerializer(new ExecutionConfig()),
new RowDataSerializer(inputRowType),
windowingStrategy.getTimeAttributeIndex(),
+ timestampPrecision,
shiftTimeZone);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java
index 9d3651f02e6..e0e3afaad6a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java
@@ -42,6 +42,7 @@ public class WindowTableFunctionEventTimeBatchRestoreTest
extends BatchRestoreTe
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF,
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG,
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_POSITIVE_OFFSET,
-
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET);
+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET,
+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_UNION_ALL);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java
index a4db730e7b5..0fb0548e906 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java
@@ -115,6 +115,49 @@ public class WindowTableFunctionTestPrograms {
+ " %s\n"
+ " GROUP BY window_start, window_end";
+ public static final String QUERY_TVF_UNION_ALL_VALUES =
+ "INSERT INTO sink_t SELECT\n"
+ + " * FROM (\n"
+ + " WITH values_table AS (\n"
+ + " SELECT cast('2024-01-01 10:00:00' AS
TIMESTAMP_LTZ) AS event_time\n"
+ + " UNION ALL\n"
+ + " SELECT cast('2024-01-01 10:05:00' AS
TIMESTAMP_LTZ) AS event_time\n"
+ + " UNION ALL\n"
+ + " SELECT cast('2024-01-01 10:10:00' AS
TIMESTAMP_LTZ) AS event_time\n"
+ + " ) SELECT\n"
+ + " window_start,\n"
+ + " window_end\n"
+ + " FROM TABLE(\n"
+ + " HOP(\n"
+ + " TABLE values_table,\n"
+ + " DESCRIPTOR(event_time),\n"
+ + " INTERVAL '1' MINUTES,\n"
+ + " INTERVAL '2' MINUTES)\n"
+ + " ) GROUP BY\n"
+ + " window_start,\n"
+ + " window_end\n"
+ + ")";
+
+ public static final TableTestProgram
WINDOW_TABLE_FUNCTION_TUMBLE_TVF_UNION_ALL =
+ TableTestProgram.of(
+ "window-table-function-tumble-tvf-union-all",
+ "validates window with BinaryRowData using
non-compact timestamp precision")
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "window_start TIMESTAMP(3)",
"window_end TIMESTAMP(3)")
+ .consumedBeforeRestore(
+ "+I[2024-01-01T09:59,
2024-01-01T10:01]",
+ "+I[2024-01-01T10:00,
2024-01-01T10:02]",
+ "+I[2024-01-01T10:04,
2024-01-01T10:06]",
+ "+I[2024-01-01T10:05,
2024-01-01T10:07]",
+ "+I[2024-01-01T10:09,
2024-01-01T10:11]",
+ "+I[2024-01-01T10:10,
2024-01-01T10:12]")
+ .build())
+ .setupConfig(TableConfigOptions.LOCAL_TIME_ZONE, "UTC")
+ .runSql(QUERY_TVF_UNION_ALL_VALUES)
+ .build();
+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF =
TableTestProgram.of(
"window-table-function-tumble-tvf",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json
new file mode 100644
index 00000000000..6becdda7ff6
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json
@@ -0,0 +1,447 @@
+{
+ "flinkVersion" : "2.3",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "batch-exec-values_1",
+ "tuples" : [ [ {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ] ],
+ "outputType" : "ROW<`ZERO` INT NOT NULL>",
+ "description" : "Values(tuples=[[{ 0 }]], values=[ZERO])"
+ }, {
+ "id" : 2,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "LITERAL",
+ "value" : "2024-01-01 10:00:00",
+ "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT
NULL>",
+ "description" : "Calc(select=[2024-01-01 10:00:00 AS event_time])"
+ }, {
+ "id" : 3,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "LITERAL",
+ "value" : "2024-01-01 10:05:00",
+ "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT
NULL>",
+ "description" : "Calc(select=[2024-01-01 10:05:00 AS event_time])"
+ }, {
+ "id" : 4,
+ "type" : "batch-exec-union_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT
NULL>",
+ "description" : "Union(all=[true], union=[event_time])"
+ }, {
+ "id" : 5,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "LITERAL",
+ "value" : "2024-01-01 10:10:00",
+ "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT
NULL>",
+ "description" : "Calc(select=[2024-01-01 10:10:00 AS event_time])"
+ }, {
+ "id" : 6,
+ "type" : "batch-exec-union_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT
NULL>",
+ "description" : "Union(all=[true], union=[event_time])"
+ }, {
+ "id" : 7,
+ "type" : "batch-exec-window-table-function_1",
+ "windowing" : {
+ "strategy" : "TimeAttribute",
+ "window" : {
+ "type" : "HoppingWindow",
+ "size" : "PT2M",
+ "slide" : "PT1M"
+ },
+ "timeAttributeType" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL",
+ "timeAttributeIndex" : 0,
+ "isRowtime" : false
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT
NULL, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL,
`window_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL>",
+ "description" : "WindowTableFunction(window=[HOP(time_col=[event_time],
size=[2 min], slide=[1 min])])"
+ }, {
+ "id" : 8,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Calc(select=[window_start, window_end])"
+ }, {
+ "id" : 9,
+ "type" : "batch-exec-sort_1",
+ "configuration" : {
+ "table.exec.resource.sort.memory" : "128 mb",
+ "table.exec.sort.async-merge-enabled" : "true",
+ "table.exec.sort.max-num-file-handles" : "128",
+ "table.exec.spill-compression.block-size" : "64 kb",
+ "table.exec.spill-compression.enabled" : "true"
+ },
+ "sortSpec" : {
+ "fields" : [ {
+ "index" : 0,
+ "isAscending" : true,
+ "nullIsLast" : false
+ }, {
+ "index" : 1,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "END_INPUT",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Sort(orderBy=[window_start ASC, window_end ASC])"
+ }, {
+ "id" : 15,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "KEEP_INPUT_AS_IS",
+ "inputDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "isStrict" : true
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Exchange(distribution=[forward])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 10,
+ "type" : "batch-exec-sort-aggregate_1",
+ "grouping" : [ 0, 1 ],
+ "auxGrouping" : [ ],
+ "aggCalls" : [ ],
+ "aggInputRowType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL,
`window_end` TIMESTAMP(3) NOT NULL>",
+ "isMerge" : false,
+ "isFinal" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "LocalSortAggregate(groupBy=[window_start, window_end],
select=[window_start, window_end])"
+ }, {
+ "id" : 11,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0, 1 ]
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Exchange(distribution=[hash[window_start, window_end]])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 12,
+ "type" : "batch-exec-sort_1",
+ "configuration" : {
+ "table.exec.resource.sort.memory" : "128 mb",
+ "table.exec.sort.async-merge-enabled" : "true",
+ "table.exec.sort.max-num-file-handles" : "128",
+ "table.exec.spill-compression.block-size" : "64 kb",
+ "table.exec.spill-compression.enabled" : "true"
+ },
+ "sortSpec" : {
+ "fields" : [ {
+ "index" : 0,
+ "isAscending" : true,
+ "nullIsLast" : false
+ }, {
+ "index" : 1,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "END_INPUT",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Sort(orderBy=[window_start ASC, window_end ASC])"
+ }, {
+ "id" : 16,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "KEEP_INPUT_AS_IS",
+ "inputDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0, 1 ]
+ },
+ "isStrict" : true
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Exchange(distribution=[forward])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 13,
+ "type" : "batch-exec-sort-aggregate_1",
+ "grouping" : [ 0, 1 ],
+ "auxGrouping" : [ ],
+ "aggCalls" : [ ],
+ "aggInputRowType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL,
`window_end` TIMESTAMP(3) NOT NULL>",
+ "isMerge" : true,
+ "isFinal" : true,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0, 1 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "SortAggregate(isMerge=[true], groupBy=[window_start,
window_end], select=[window_start, window_end])"
+ }, {
+ "id" : 14,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "window_start",
+ "dataType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "window_end",
+ "dataType" : "TIMESTAMP(3)"
+ } ]
+ }
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[window_start, window_end])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 1,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 1,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 8,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 15,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 11,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 12,
+ "target" : 16,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 16,
+ "target" : 13,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 13,
+ "target" : 14,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
index 87867d5f4b2..a69d49e7b7d 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
@@ -46,8 +46,9 @@ public class AlignedWindowTableFunctionOperator extends
WindowTableFunctionOpera
public AlignedWindowTableFunctionOperator(
GroupWindowAssigner<TimeWindow> windowAssigner,
int rowtimeIndex,
+ int timestampPrecision,
ZoneId shiftTimeZone) {
- super(windowAssigner, rowtimeIndex, shiftTimeZone);
+ super(windowAssigner, rowtimeIndex, timestampPrecision, shiftTimeZone);
}
@Override
@@ -60,7 +61,7 @@ public class AlignedWindowTableFunctionOperator extends
WindowTableFunctionOpera
numNullRowTimeRecordsDropped.inc();
return;
}
- timestamp = inputRow.getTimestamp(rowtimeIndex,
3).getMillisecond();
+ timestamp = inputRow.getTimestamp(rowtimeIndex,
timestampPrecision).getMillisecond();
} else {
timestamp = getProcessingTimeService().getCurrentProcessingTime();
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
index 02b30e8f603..fdaca555771 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
@@ -120,8 +120,9 @@ public class UnalignedWindowTableFunctionOperator extends
WindowTableFunctionOpe
TypeSerializer<TimeWindow> windowSerializer,
TypeSerializer<RowData> inputSerializer,
int rowtimeIndex,
+ int timestampPrecision,
ZoneId shiftTimeZone) {
- super(windowAssigner, rowtimeIndex, shiftTimeZone);
+ super(windowAssigner, rowtimeIndex, timestampPrecision, shiftTimeZone);
this.trigger = createTrigger(windowAssigner);
this.windowSerializer = checkNotNull(windowSerializer);
this.inputSerializer = checkNotNull(inputSerializer);
@@ -202,7 +203,7 @@ public class UnalignedWindowTableFunctionOperator extends
WindowTableFunctionOpe
numNullRowTimeRecordsDropped.inc();
return;
}
- timestamp = inputRow.getTimestamp(rowtimeIndex,
3).getMillisecond();
+ timestamp = inputRow.getTimestamp(rowtimeIndex,
timestampPrecision).getMillisecond();
} else {
timestamp = getProcessingTimeService().getCurrentProcessingTime();
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
index 0e78ad72bb3..e76ff4bdec1 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
@@ -58,6 +58,8 @@ public abstract class WindowTableFunctionOperatorBase extends
TableStreamOperato
protected final GroupWindowAssigner<TimeWindow> windowAssigner;
+ protected final int timestampPrecision;
+
/** This is used for emitting elements with a given timestamp. */
private transient TimestampedCollector<RowData> collector;
@@ -73,10 +75,12 @@ public abstract class WindowTableFunctionOperatorBase
extends TableStreamOperato
public WindowTableFunctionOperatorBase(
GroupWindowAssigner<TimeWindow> windowAssigner,
int rowtimeIndex,
+ int timestampPrecision,
ZoneId shiftTimeZone) {
this.shiftTimeZone = shiftTimeZone;
this.rowtimeIndex = rowtimeIndex;
this.windowAssigner = windowAssigner;
+ this.timestampPrecision = timestampPrecision;
checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0);
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
index 1f2238588dd..e9317a84eeb 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
@@ -340,7 +340,7 @@ class AlignedWindowTableFunctionOperatorTest extends
WindowTableFunctionOperator
GroupWindowAssigner<TimeWindow> windowAssigner, ZoneId
shiftTimeZone) throws Exception {
AlignedWindowTableFunctionOperator operator =
new AlignedWindowTableFunctionOperator(
- windowAssigner, ROW_TIME_INDEX, shiftTimeZone);
+ windowAssigner, ROW_TIME_INDEX,
DEFAULT_TIMESTAMP_PRECISION, shiftTimeZone);
return new OneInputStreamOperatorTestHarness<>(operator,
INPUT_ROW_SER);
}
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
index 05e54aaca2d..cafe6e284f6 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
@@ -563,6 +563,7 @@ class UnalignedWindowTableFunctionOperatorTest extends
WindowTableFunctionOperat
windowAssigner.getWindowSerializer(new ExecutionConfig()),
new RowDataSerializer(INPUT_ROW_TYPE),
rowTimeIndex,
+ DEFAULT_TIMESTAMP_PRECISION,
shiftTimeZone);
}
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
index 06ede388ca9..0d015ac40e7 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
@@ -67,6 +67,8 @@ abstract class WindowTableFunctionOperatorTestBase {
protected static final RowDataSerializer INPUT_ROW_SER = new
RowDataSerializer(INPUT_ROW_TYPE);
protected static final int ROW_TIME_INDEX = 2;
+ protected static final int DEFAULT_TIMESTAMP_PRECISION = 3;
+
protected static final LogicalType[] OUTPUT_TYPES =
new LogicalType[] {
new VarCharType(Integer.MAX_VALUE),