This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e088dec0f2 [FLINK-38866][flink-table-planner] Passing timestamp 
precision to window operators to respect BinaryRowData non-compact timestamp 
form
4e088dec0f2 is described below

commit 4e088dec0f24e845809606ddacc2391dadfe6c73
Author: Matt Cuento <[email protected]>
AuthorDate: Tue Mar 3 05:24:54 2026 -0800

    [FLINK-38866][flink-table-planner] Passing timestamp precision to window 
operators to respect BinaryRowData non-compact timestamp form
---
 .../exec/common/CommonExecWindowTableFunction.java |   8 +-
 .../exec/stream/StreamExecWindowTableFunction.java |   4 +
 ...ndowTableFunctionEventTimeBatchRestoreTest.java |   3 +-
 .../common/WindowTableFunctionTestPrograms.java    |  43 ++
 ...window-table-function-tumble-tvf-union-all.json | 447 +++++++++++++++++++++
 .../AlignedWindowTableFunctionOperator.java        |   5 +-
 .../UnalignedWindowTableFunctionOperator.java      |   5 +-
 .../operator/WindowTableFunctionOperatorBase.java  |   4 +
 .../AlignedWindowTableFunctionOperatorTest.java    |   2 +-
 .../UnalignedWindowTableFunctionOperatorTest.java  |   1 +
 .../WindowTableFunctionOperatorTestBase.java       |   2 +
 11 files changed, 517 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
index df205561781..dc4d47d4a09 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableF
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -110,8 +111,13 @@ public abstract class CommonExecWindowTableFunction 
extends ExecNodeBase<RowData
                 TimeWindowUtil.getShiftTimeZone(
                         windowingStrategy.getTimeAttributeType(),
                         TableConfigUtils.getLocalTimeZone(config));
+        final int timestampPrecision =
+                
LogicalTypeChecks.getPrecision(windowingStrategy.getTimeAttributeType());
         return new AlignedWindowTableFunctionOperator(
-                windowAssigner, windowingStrategy.getTimeAttributeIndex(), 
shiftTimeZone);
+                windowAssigner,
+                windowingStrategy.getTimeAttributeIndex(),
+                timestampPrecision,
+                shiftTimeZone);
     }
 
     protected abstract Transformation<RowData> translateWithUnalignedWindow(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
index 28fd8c128ee..f2bd3cf0a2e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -152,12 +153,15 @@ public class StreamExecWindowTableFunction extends 
CommonExecWindowTableFunction
                 TimeWindowUtil.getShiftTimeZone(
                         windowingStrategy.getTimeAttributeType(),
                         TableConfigUtils.getLocalTimeZone(config));
+        final int timestampPrecision =
+                
LogicalTypeChecks.getPrecision(windowingStrategy.getTimeAttributeType());
 
         return new UnalignedWindowTableFunctionOperator(
                 windowAssigner,
                 windowAssigner.getWindowSerializer(new ExecutionConfig()),
                 new RowDataSerializer(inputRowType),
                 windowingStrategy.getTimeAttributeIndex(),
+                timestampPrecision,
                 shiftTimeZone);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java
index 9d3651f02e6..e0e3afaad6a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/WindowTableFunctionEventTimeBatchRestoreTest.java
@@ -42,6 +42,7 @@ public class WindowTableFunctionEventTimeBatchRestoreTest 
extends BatchRestoreTe
                 
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF,
                 
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG,
                 
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_POSITIVE_OFFSET,
-                
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET);
+                
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET,
+                
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_UNION_ALL);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java
index a4db730e7b5..0fb0548e906 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/WindowTableFunctionTestPrograms.java
@@ -115,6 +115,49 @@ public class WindowTableFunctionTestPrograms {
                     + "     %s\n"
                     + " GROUP BY window_start, window_end";
 
+    public static final String QUERY_TVF_UNION_ALL_VALUES =
+            "INSERT INTO sink_t SELECT\n"
+                    + "    * FROM (\n"
+                    + "    WITH values_table AS (\n"
+                    + "        SELECT cast('2024-01-01 10:00:00' AS 
TIMESTAMP_LTZ) AS event_time\n"
+                    + "        UNION ALL\n"
+                    + "        SELECT cast('2024-01-01 10:05:00' AS 
TIMESTAMP_LTZ) AS event_time\n"
+                    + "        UNION ALL\n"
+                    + "        SELECT cast('2024-01-01 10:10:00' AS 
TIMESTAMP_LTZ) AS event_time\n"
+                    + "    ) SELECT\n"
+                    + "        window_start,\n"
+                    + "        window_end\n"
+                    + "    FROM TABLE(\n"
+                    + "        HOP(\n"
+                    + "            TABLE values_table,\n"
+                    + "            DESCRIPTOR(event_time),\n"
+                    + "            INTERVAL '1' MINUTES,\n"
+                    + "            INTERVAL '2' MINUTES)\n"
+                    + "    ) GROUP BY\n"
+                    + "          window_start,\n"
+                    + "          window_end\n"
+                    + ")";
+
+    public static final TableTestProgram 
WINDOW_TABLE_FUNCTION_TUMBLE_TVF_UNION_ALL =
+            TableTestProgram.of(
+                            "window-table-function-tumble-tvf-union-all",
+                            "validates window with BinaryRowData using 
non-compact timestamp precision")
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "window_start TIMESTAMP(3)", 
"window_end TIMESTAMP(3)")
+                                    .consumedBeforeRestore(
+                                            "+I[2024-01-01T09:59, 
2024-01-01T10:01]",
+                                            "+I[2024-01-01T10:00, 
2024-01-01T10:02]",
+                                            "+I[2024-01-01T10:04, 
2024-01-01T10:06]",
+                                            "+I[2024-01-01T10:05, 
2024-01-01T10:07]",
+                                            "+I[2024-01-01T10:09, 
2024-01-01T10:11]",
+                                            "+I[2024-01-01T10:10, 
2024-01-01T10:12]")
+                                    .build())
+                    .setupConfig(TableConfigOptions.LOCAL_TIME_ZONE, "UTC")
+                    .runSql(QUERY_TVF_UNION_ALL_VALUES)
+                    .build();
+
     public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF =
             TableTestProgram.of(
                             "window-table-function-tumble-tvf",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json
new file mode 100644
index 00000000000..6becdda7ff6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-window-table-function_1/window-table-function-tumble-tvf-union-all/plan/window-table-function-tumble-tvf-union-all.json
@@ -0,0 +1,447 @@
+{
+  "flinkVersion" : "2.3",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-values_1",
+    "tuples" : [ [ {
+      "kind" : "LITERAL",
+      "value" : 0,
+      "type" : "INT NOT NULL"
+    } ] ],
+    "outputType" : "ROW<`ZERO` INT NOT NULL>",
+    "description" : "Values(tuples=[[{ 0 }]], values=[ZERO])"
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "LITERAL",
+      "value" : "2024-01-01 10:00:00",
+      "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT 
NULL>",
+    "description" : "Calc(select=[2024-01-01 10:00:00 AS event_time])"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "LITERAL",
+      "value" : "2024-01-01 10:05:00",
+      "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT 
NULL>",
+    "description" : "Calc(select=[2024-01-01 10:05:00 AS event_time])"
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-union_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT 
NULL>",
+    "description" : "Union(all=[true], union=[event_time])"
+  }, {
+    "id" : 5,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "LITERAL",
+      "value" : "2024-01-01 10:10:00",
+      "type" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT 
NULL>",
+    "description" : "Calc(select=[2024-01-01 10:10:00 AS event_time])"
+  }, {
+    "id" : 6,
+    "type" : "batch-exec-union_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT 
NULL>",
+    "description" : "Union(all=[true], union=[event_time])"
+  }, {
+    "id" : 7,
+    "type" : "batch-exec-window-table-function_1",
+    "windowing" : {
+      "strategy" : "TimeAttribute",
+      "window" : {
+        "type" : "HoppingWindow",
+        "size" : "PT2M",
+        "slide" : "PT1M"
+      },
+      "timeAttributeType" : "TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL",
+      "timeAttributeIndex" : 0,
+      "isRowtime" : false
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`event_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT 
NULL, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, 
`window_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL>",
+    "description" : "WindowTableFunction(window=[HOP(time_col=[event_time], 
size=[2 min], slide=[1 min])])"
+  }, {
+    "id" : 8,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Calc(select=[window_start, window_end])"
+  }, {
+    "id" : 9,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : true,
+        "nullIsLast" : false
+      }, {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Sort(orderBy=[window_start ASC, window_end ASC])"
+  }, {
+    "id" : 15,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "KEEP_INPUT_AS_IS",
+        "inputDistribution" : {
+          "type" : "UNKNOWN"
+        },
+        "isStrict" : true
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Exchange(distribution=[forward])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 10,
+    "type" : "batch-exec-sort-aggregate_1",
+    "grouping" : [ 0, 1 ],
+    "auxGrouping" : [ ],
+    "aggCalls" : [ ],
+    "aggInputRowType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, 
`window_end` TIMESTAMP(3) NOT NULL>",
+    "isMerge" : false,
+    "isFinal" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "LocalSortAggregate(groupBy=[window_start, window_end], 
select=[window_start, window_end])"
+  }, {
+    "id" : 11,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Exchange(distribution=[hash[window_start, window_end]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 12,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : true,
+        "nullIsLast" : false
+      }, {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Sort(orderBy=[window_start ASC, window_end ASC])"
+  }, {
+    "id" : 16,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "KEEP_INPUT_AS_IS",
+        "inputDistribution" : {
+          "type" : "HASH",
+          "keys" : [ 0, 1 ]
+        },
+        "isStrict" : true
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Exchange(distribution=[forward])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 13,
+    "type" : "batch-exec-sort-aggregate_1",
+    "grouping" : [ 0, 1 ],
+    "auxGrouping" : [ ],
+    "aggCalls" : [ ],
+    "aggInputRowType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, 
`window_end` TIMESTAMP(3) NOT NULL>",
+    "isMerge" : true,
+    "isFinal" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "SortAggregate(isMerge=[true], groupBy=[window_start, 
window_end], select=[window_start, window_end])"
+  }, {
+    "id" : 14,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "window_start",
+              "dataType" : "TIMESTAMP(3)"
+            }, {
+              "name" : "window_end",
+              "dataType" : "TIMESTAMP(3)"
+            } ]
+          }
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[window_start, window_end])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 1,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 1,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
index 87867d5f4b2..a69d49e7b7d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
@@ -46,8 +46,9 @@ public class AlignedWindowTableFunctionOperator extends 
WindowTableFunctionOpera
     public AlignedWindowTableFunctionOperator(
             GroupWindowAssigner<TimeWindow> windowAssigner,
             int rowtimeIndex,
+            int timestampPrecision,
             ZoneId shiftTimeZone) {
-        super(windowAssigner, rowtimeIndex, shiftTimeZone);
+        super(windowAssigner, rowtimeIndex, timestampPrecision, shiftTimeZone);
     }
 
     @Override
@@ -60,7 +61,7 @@ public class AlignedWindowTableFunctionOperator extends 
WindowTableFunctionOpera
                 numNullRowTimeRecordsDropped.inc();
                 return;
             }
-            timestamp = inputRow.getTimestamp(rowtimeIndex, 
3).getMillisecond();
+            timestamp = inputRow.getTimestamp(rowtimeIndex, 
timestampPrecision).getMillisecond();
         } else {
             timestamp = getProcessingTimeService().getCurrentProcessingTime();
         }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
index 02b30e8f603..fdaca555771 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
@@ -120,8 +120,9 @@ public class UnalignedWindowTableFunctionOperator extends 
WindowTableFunctionOpe
             TypeSerializer<TimeWindow> windowSerializer,
             TypeSerializer<RowData> inputSerializer,
             int rowtimeIndex,
+            int timestampPrecision,
             ZoneId shiftTimeZone) {
-        super(windowAssigner, rowtimeIndex, shiftTimeZone);
+        super(windowAssigner, rowtimeIndex, timestampPrecision, shiftTimeZone);
         this.trigger = createTrigger(windowAssigner);
         this.windowSerializer = checkNotNull(windowSerializer);
         this.inputSerializer = checkNotNull(inputSerializer);
@@ -202,7 +203,7 @@ public class UnalignedWindowTableFunctionOperator extends 
WindowTableFunctionOpe
                 numNullRowTimeRecordsDropped.inc();
                 return;
             }
-            timestamp = inputRow.getTimestamp(rowtimeIndex, 
3).getMillisecond();
+            timestamp = inputRow.getTimestamp(rowtimeIndex, 
timestampPrecision).getMillisecond();
         } else {
             timestamp = getProcessingTimeService().getCurrentProcessingTime();
         }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
index 0e78ad72bb3..e76ff4bdec1 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
@@ -58,6 +58,8 @@ public abstract class WindowTableFunctionOperatorBase extends 
TableStreamOperato
 
     protected final GroupWindowAssigner<TimeWindow> windowAssigner;
 
+    protected final int timestampPrecision;
+
     /** This is used for emitting elements with a given timestamp. */
     private transient TimestampedCollector<RowData> collector;
 
@@ -73,10 +75,12 @@ public abstract class WindowTableFunctionOperatorBase 
extends TableStreamOperato
     public WindowTableFunctionOperatorBase(
             GroupWindowAssigner<TimeWindow> windowAssigner,
             int rowtimeIndex,
+            int timestampPrecision,
             ZoneId shiftTimeZone) {
         this.shiftTimeZone = shiftTimeZone;
         this.rowtimeIndex = rowtimeIndex;
         this.windowAssigner = windowAssigner;
+        this.timestampPrecision = timestampPrecision;
         checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0);
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
index 1f2238588dd..e9317a84eeb 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
@@ -340,7 +340,7 @@ class AlignedWindowTableFunctionOperatorTest extends 
WindowTableFunctionOperator
             GroupWindowAssigner<TimeWindow> windowAssigner, ZoneId 
shiftTimeZone) throws Exception {
         AlignedWindowTableFunctionOperator operator =
                 new AlignedWindowTableFunctionOperator(
-                        windowAssigner, ROW_TIME_INDEX, shiftTimeZone);
+                        windowAssigner, ROW_TIME_INDEX, 
DEFAULT_TIMESTAMP_PRECISION, shiftTimeZone);
         return new OneInputStreamOperatorTestHarness<>(operator, 
INPUT_ROW_SER);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
index 05e54aaca2d..cafe6e284f6 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
@@ -563,6 +563,7 @@ class UnalignedWindowTableFunctionOperatorTest extends 
WindowTableFunctionOperat
                 windowAssigner.getWindowSerializer(new ExecutionConfig()),
                 new RowDataSerializer(INPUT_ROW_TYPE),
                 rowTimeIndex,
+                DEFAULT_TIMESTAMP_PRECISION,
                 shiftTimeZone);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
index 06ede388ca9..0d015ac40e7 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
@@ -67,6 +67,8 @@ abstract class WindowTableFunctionOperatorTestBase {
     protected static final RowDataSerializer INPUT_ROW_SER = new 
RowDataSerializer(INPUT_ROW_TYPE);
     protected static final int ROW_TIME_INDEX = 2;
 
+    protected static final int DEFAULT_TIMESTAMP_PRECISION = 3;
+
     protected static final LogicalType[] OUTPUT_TYPES =
             new LogicalType[] {
                 new VarCharType(Integer.MAX_VALUE),

Reply via email to