This is an automated email from the ASF dual-hosted git repository.

lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 59ae964d77 refine watermark process (#11477)
59ae964d77 is described below

commit 59ae964d7760f1242cff11db6d2cc3e410411d09
Author: lgbo <[email protected]>
AuthorDate: Wed Jan 28 14:17:37 2026 +0800

    refine watermark process (#11477)
---
 .github/workflows/flink.yml                             |  2 +-
 gluten-flink/docs/Flink.md                              |  2 +-
 .../table/runtime/operators/GlutenOneInputOperator.java | 17 +++++++++++++++++
 .../table/runtime/operators/GlutenTwoInputOperator.java | 12 ++++++++----
 4 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index a6dce7ba69..cd758ae614 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -61,7 +61,7 @@ jobs:
           sudo yum install 
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
 -y
           sudo .github/workflows/util/install-flink-resources.sh
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
cd790c794d198a1b045b15efa8d8d71ecf5c338f
+          cd velox4j && git reset --hard 
288d181a1b05c47f1f17339eb498dd6375f7aec8
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 6ef9c2ded5..0ac30debc4 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard cd790c794d198a1b045b15efa8d8d71ecf5c338f
+git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index ccc3cde3c0..df0b8b921f 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -191,6 +191,23 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
     }
   }
 
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    task.notifyWatermark(mark.getTimestamp());
+    // Process any pending elements to ensure watermark-triggered operations 
complete.
+    processElementInternal();
+  }
+
+  @Override
+  public void processWatermark1(Watermark mark) throws Exception {
+    throw new UnsupportedOperationException("Not implemented for 
GlutenOneInputOperator");
+  }
+
+  @Override
+  public void processWatermark2(Watermark mark) throws Exception {
+    throw new UnsupportedOperationException("Not implemented for 
GlutenOneInputOperator");
+  }
+
   @Override
   public void close() throws Exception {
     if (inputQueue != null) {
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
index 923e8d8d0d..6d4765af97 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
@@ -171,17 +171,21 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
     }
   }
 
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    task.notifyWatermark(mark.getTimestamp());
+    processElementInternal();
+  }
+
   @Override
   public void processWatermark1(Watermark mark) throws Exception {
-    // TODO: implement it;
-    task.notifyWatermark(mark.getTimestamp(), 1);
+    task.notifyWatermark(mark.getTimestamp(), 0);
     processElementInternal();
   }
 
   @Override
   public void processWatermark2(Watermark mark) throws Exception {
-    // TODO: implement it;
-    task.notifyWatermark(mark.getTimestamp(), 2);
+    task.notifyWatermark(mark.getTimestamp(), 1);
     processElementInternal();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to