This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f235c3c92d [GLUTEN-11412][FLINK] Refine watermark assigner (#11414)
f235c3c92d is described below

commit f235c3c92d604974a7c4a2d77943cd9dccbf7f1a
Author: lgbo <[email protected]>
AuthorDate: Tue Jan 20 15:02:56 2026 +0800

    [GLUTEN-11412][FLINK] Refine watermark assigner (#11414)
---
 .github/workflows/flink.yml                                  |  2 +-
 gluten-flink/docs/Flink.md                                   |  2 +-
 .../plan/nodes/exec/stream/StreamExecWatermarkAssigner.java  | 12 ++----------
 3 files changed, 4 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index e30540de0e..a6dce7ba69 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -61,7 +61,7 @@ jobs:
           sudo yum install 
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
 -y
           sudo .github/workflows/util/install-flink-resources.sh
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
59442932803ca06a34c1cdbc43a3b80348c4da7a
+          cd velox4j && git reset --hard 
cd790c794d198a1b045b15efa8d8d71ecf5c338f
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 42fec4a7e0..6ef9c2ded5 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 59442932803ca06a34c1cdbc43a3b80348c4da7a
+git reset --hard cd790c794d198a1b045b15efa8d8d71ecf5c338f
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
index 3230bac8c2..1d07e93f28 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
@@ -23,12 +23,11 @@ import 
org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
 
-import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
 import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.ProjectNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.TableScanNode;
 import io.github.zhztheplayer.velox4j.plan.WatermarkAssignerNode;
 
 import org.apache.flink.FlinkVersion;
@@ -140,17 +139,10 @@ public class StreamExecWatermarkAssigner extends 
ExecNodeBase<RowData>
     io.github.zhztheplayer.velox4j.type.RowType outputType =
         (io.github.zhztheplayer.velox4j.type.RowType)
             LogicalTypeConverter.toVLType(getOutputType());
-    // This scan can be ignored, it's used only to make ProjectNode valid
-    PlanNode ignore =
-        new TableScanNode(
-            PlanNodeIdGenerator.newId(),
-            outputType,
-            new NexmarkTableHandle("connector-nexmark"),
-            List.of());
     ProjectNode project =
         new ProjectNode(
             PlanNodeIdGenerator.newId(),
-            List.of(ignore),
+            List.of(new EmptyNode(outputType)),
             List.of("TIMESTAMP"),
             List.of(watermarkExprs));
     PlanNode watermark =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to