This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 59ae964d77 refine watermark process (#11477)
59ae964d77 is described below
commit 59ae964d7760f1242cff11db6d2cc3e410411d09
Author: lgbo <[email protected]>
AuthorDate: Wed Jan 28 14:17:37 2026 +0800
refine watermark process (#11477)
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
.../table/runtime/operators/GlutenOneInputOperator.java | 17 +++++++++++++++++
.../table/runtime/operators/GlutenTwoInputOperator.java | 12 ++++++++----
4 files changed, 27 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index a6dce7ba69..cd758ae614 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -61,7 +61,7 @@ jobs:
sudo yum install
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
-y
sudo .github/workflows/util/install-flink-resources.sh
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
cd790c794d198a1b045b15efa8d8d71ecf5c338f
+ cd velox4j && git reset --hard
288d181a1b05c47f1f17339eb498dd6375f7aec8
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 6ef9c2ded5..0ac30debc4 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard cd790c794d198a1b045b15efa8d8d71ecf5c338f
+git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index ccc3cde3c0..df0b8b921f 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -191,6 +191,23 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
}
}
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ task.notifyWatermark(mark.getTimestamp());
+ // Process any pending elements to ensure watermark-triggered operations
complete.
+ processElementInternal();
+ }
+
+ @Override
+ public void processWatermark1(Watermark mark) throws Exception {
+ throw new UnsupportedOperationException("Not implemented for
GlutenOneInputOperator");
+ }
+
+ @Override
+ public void processWatermark2(Watermark mark) throws Exception {
+ throw new UnsupportedOperationException("Not implemented for
GlutenOneInputOperator");
+ }
+
@Override
public void close() throws Exception {
if (inputQueue != null) {
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
index 923e8d8d0d..6d4765af97 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
@@ -171,17 +171,21 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
}
}
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ task.notifyWatermark(mark.getTimestamp());
+ processElementInternal();
+ }
+
@Override
public void processWatermark1(Watermark mark) throws Exception {
- // TODO: implement it;
- task.notifyWatermark(mark.getTimestamp(), 1);
+ task.notifyWatermark(mark.getTimestamp(), 0);
processElementInternal();
}
@Override
public void processWatermark2(Watermark mark) throws Exception {
- // TODO: implement it;
- task.notifyWatermark(mark.getTimestamp(), 2);
+ task.notifyWatermark(mark.getTimestamp(), 1);
processElementInternal();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]