This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 419284989b Flink: Fix npe in TaskResultAggregator when job recovery 
(#13086)
419284989b is described below

commit 419284989b4712e3d7e461d78eb3bf0cd247e87c
Author: GuoYu <511955...@qq.com>
AuthorDate: Fri May 23 21:43:34 2025 +0800

    Flink: Fix npe in TaskResultAggregator when job recovery (#13086)
---
 .../maintenance/operator/TaskResultAggregator.java | 25 ++++++-------
 .../maintenance/operator/OperatorTestBase.java     |  2 ++
 .../operator/TestTaskResultAggregator.java         | 41 +++++++++++++++++++---
 3 files changed, 52 insertions(+), 16 deletions(-)

diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
index f45d2542ce..bd8f709e37 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java
@@ -61,7 +61,7 @@ public class TaskResultAggregator extends 
AbstractStreamOperator<TaskResult>
   private final String taskName;
   private final int taskIndex;
   private final List<Exception> exceptions;
-  private transient Long startTime;
+  private transient long startTime;
 
   public TaskResultAggregator(String tableName, String taskName, int 
taskIndex) {
     Preconditions.checkNotNull(tableName, "Table name should no be null");
@@ -71,7 +71,6 @@ public class TaskResultAggregator extends 
AbstractStreamOperator<TaskResult>
     this.taskName = taskName;
     this.taskIndex = taskIndex;
     this.exceptions = Lists.newArrayList();
-    this.startTime = 0L;
   }
 
   @Override
@@ -87,16 +86,18 @@ public class TaskResultAggregator extends 
AbstractStreamOperator<TaskResult>
 
   @Override
   public void processWatermark(Watermark mark) throws Exception {
-    TaskResult response = new TaskResult(taskIndex, startTime, 
exceptions.isEmpty(), exceptions);
-    output.collect(new StreamRecord<>(response));
-    LOG.info(
-        "Aggregated result for table {}, task {}[{}] is {}",
-        tableName,
-        taskName,
-        taskIndex,
-        response);
-    exceptions.clear();
-    startTime = 0L;
+    if (startTime != 0L) {
+      TaskResult response = new TaskResult(taskIndex, startTime, 
exceptions.isEmpty(), exceptions);
+      output.collect(new StreamRecord<>(response));
+      LOG.info(
+          "Aggregated result for table {}, task {}[{}] is {}",
+          tableName,
+          taskName,
+          taskIndex,
+          response);
+      exceptions.clear();
+      startTime = 0L;
+    }
 
     super.processWatermark(mark);
   }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 78f34f6c16..9a79b94aba 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.iceberg.DataFile;
@@ -83,6 +84,7 @@ public class OperatorTestBase {
 
   static final long EVENT_TIME = 10L;
   static final long EVENT_TIME_2 = 11L;
+  static final Watermark WATERMARK = new Watermark(EVENT_TIME);
   protected static final String DUMMY_TASK_NAME = "dummyTask";
   protected static final String DUMMY_TABLE_NAME = "dummyTable";
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java
index b17a8f9e5d..51d901e923 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java
@@ -20,8 +20,9 @@ package org.apache.iceberg.flink.maintenance.operator;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.iceberg.flink.maintenance.api.TaskResult;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
@@ -36,10 +37,42 @@ class TestTaskResultAggregator extends OperatorTestBase {
     try (TwoInputStreamOperatorTestHarness<Trigger, Exception, TaskResult> 
testHarness =
         new TwoInputStreamOperatorTestHarness<>(taskResultAggregator)) {
       testHarness.open();
-      testHarness.processWatermark1(new Watermark(EVENT_TIME));
-      testHarness.processWatermark2(new Watermark(EVENT_TIME));
+      testHarness.processBothWatermarks(WATERMARK);
       ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
-      assertThat(output).containsOnlyOnce(new Watermark(EVENT_TIME));
+      assertThat(output).containsOnlyOnce(WATERMARK);
+    }
+  }
+
+  @Test
+  void testProcessWatermarkWithoutElement() throws Exception {
+    TaskResultAggregator taskResultAggregator =
+        new TaskResultAggregator("table-name", "task-name", 0);
+    try (TwoInputStreamOperatorTestHarness<Trigger, Exception, TaskResult> 
testHarness =
+        new TwoInputStreamOperatorTestHarness<>(taskResultAggregator)) {
+      testHarness.open();
+      testHarness.processBothWatermarks(WATERMARK);
+      List<TaskResult> taskResults = testHarness.extractOutputValues();
+      assertThat(taskResults).hasSize(0);
+    }
+  }
+
+  @Test
+  void testProcessWatermark() throws Exception {
+    TaskResultAggregator taskResultAggregator =
+        new TaskResultAggregator("table-name", "task-name", 0);
+    try (TwoInputStreamOperatorTestHarness<Trigger, Exception, TaskResult> 
testHarness =
+        new TwoInputStreamOperatorTestHarness<>(taskResultAggregator)) {
+      testHarness.open();
+
+      testHarness.processElement1(new 
StreamRecord<>(Trigger.create(EVENT_TIME, 0)));
+      testHarness.processBothWatermarks(WATERMARK);
+      List<TaskResult> taskResults = testHarness.extractOutputValues();
+      assertThat(taskResults).hasSize(1);
+      TaskResult taskResult = taskResults.get(0);
+      assertThat(taskResult.taskIndex()).isEqualTo(0);
+      assertThat(taskResult.startEpoch()).isEqualTo(EVENT_TIME);
+      assertThat(taskResult.success()).isEqualTo(true);
+      assertThat(taskResult.exceptions()).hasSize(0);
     }
   }
 }

Reply via email to