This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b050314a03 Flink: Fix watermark no pass in TaskResultAggregator 
(#13069)
b050314a03 is described below

commit b050314a035f7916667e8441de6f100eafcc9ddb
Author: GuoYu <511955...@qq.com>
AuthorDate: Sat May 17 03:01:00 2025 +0800

    Flink: Fix watermark no pass in TaskResultAggregator (#13069)
---
 .../maintenance/operator/TaskResultAggregator.java |  4 +-
 .../operator/TestTaskResultAggregator.java         | 45 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
index cceb043a26..f45d2542ce 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
@@ -86,7 +86,7 @@ public class TaskResultAggregator extends 
AbstractStreamOperator<TaskResult>
   }
 
   @Override
-  public void processWatermark(Watermark mark) {
+  public void processWatermark(Watermark mark) throws Exception {
     TaskResult response = new TaskResult(taskIndex, startTime, 
exceptions.isEmpty(), exceptions);
     output.collect(new StreamRecord<>(response));
     LOG.info(
@@ -97,5 +97,7 @@ public class TaskResultAggregator extends 
AbstractStreamOperator<TaskResult>
         response);
     exceptions.clear();
     startTime = 0L;
+
+    super.processWatermark(mark);
   }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java
new file mode 100644
index 0000000000..b17a8f9e5d
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.iceberg.flink.maintenance.api.TaskResult;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.junit.jupiter.api.Test;
+
+class TestTaskResultAggregator extends OperatorTestBase {
+
+  @Test
+  void testPassWatermark() throws Exception {
+    TaskResultAggregator taskResultAggregator =
+        new TaskResultAggregator("table-name", "task-name", 0);
+    try (TwoInputStreamOperatorTestHarness<Trigger, Exception, TaskResult> 
testHarness =
+        new TwoInputStreamOperatorTestHarness<>(taskResultAggregator)) {
+      testHarness.open();
+      testHarness.processWatermark1(new Watermark(EVENT_TIME));
+      testHarness.processWatermark2(new Watermark(EVENT_TIME));
+      ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+      assertThat(output).containsOnlyOnce(new Watermark(EVENT_TIME));
+    }
+  }
+}

Reply via email to