This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f235c3c92d [GLUTEN-11412][FLINK] Refine watermark assigner (#11414)
f235c3c92d is described below
commit f235c3c92d604974a7c4a2d77943cd9dccbf7f1a
Author: lgbo <[email protected]>
AuthorDate: Tue Jan 20 15:02:56 2026 +0800
[GLUTEN-11412][FLINK] Refine watermark assigner (#11414)
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
.../plan/nodes/exec/stream/StreamExecWatermarkAssigner.java | 12 ++----------
3 files changed, 4 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index e30540de0e..a6dce7ba69 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -61,7 +61,7 @@ jobs:
sudo yum install
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
-y
sudo .github/workflows/util/install-flink-resources.sh
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
59442932803ca06a34c1cdbc43a3b80348c4da7a
+ cd velox4j && git reset --hard
cd790c794d198a1b045b15efa8d8d71ecf5c338f
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 42fec4a7e0..6ef9c2ded5 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard 59442932803ca06a34c1cdbc43a3b80348c4da7a
+git reset --hard cd790c794d198a1b045b15efa8d8d71ecf5c338f
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
index 3230bac8c2..1d07e93f28 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
@@ -23,12 +23,11 @@ import
org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
-import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.ProjectNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.TableScanNode;
import io.github.zhztheplayer.velox4j.plan.WatermarkAssignerNode;
import org.apache.flink.FlinkVersion;
@@ -140,17 +139,10 @@ public class StreamExecWatermarkAssigner extends
ExecNodeBase<RowData>
io.github.zhztheplayer.velox4j.type.RowType outputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(getOutputType());
- // This scan can be ignored, it's used only to make ProjectNode valid
- PlanNode ignore =
- new TableScanNode(
- PlanNodeIdGenerator.newId(),
- outputType,
- new NexmarkTableHandle("connector-nexmark"),
- List.of());
ProjectNode project =
new ProjectNode(
PlanNodeIdGenerator.newId(),
- List.of(ignore),
+ List.of(new EmptyNode(outputType)),
List.of("TIMESTAMP"),
List.of(watermarkExprs));
PlanNode watermark =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]