This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 419284989b Flink: Fix npe in TaskResultAggregator when job recovery (#13086) 419284989b is described below commit 419284989b4712e3d7e461d78eb3bf0cd247e87c Author: GuoYu <511955...@qq.com> AuthorDate: Fri May 23 21:43:34 2025 +0800 Flink: Fix npe in TaskResultAggregator when job recovery (#13086) --- .../maintenance/operator/TaskResultAggregator.java | 25 ++++++------- .../maintenance/operator/OperatorTestBase.java | 2 ++ .../operator/TestTaskResultAggregator.java | 41 +++++++++++++++++++--- 3 files changed, 52 insertions(+), 16 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java index f45d2542ce..bd8f709e37 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java @@ -61,7 +61,7 @@ public class TaskResultAggregator extends AbstractStreamOperator<TaskResult> private final String taskName; private final int taskIndex; private final List<Exception> exceptions; - private transient Long startTime; + private transient long startTime; public TaskResultAggregator(String tableName, String taskName, int taskIndex) { Preconditions.checkNotNull(tableName, "Table name should no be null"); @@ -71,7 +71,6 @@ public class TaskResultAggregator extends AbstractStreamOperator<TaskResult> this.taskName = taskName; this.taskIndex = taskIndex; this.exceptions = Lists.newArrayList(); - this.startTime = 0L; } @Override @@ -87,16 +86,18 @@ public class TaskResultAggregator extends AbstractStreamOperator<TaskResult> @Override public void processWatermark(Watermark mark) throws Exception { - TaskResult response = new TaskResult(taskIndex, startTime, exceptions.isEmpty(), exceptions); - output.collect(new StreamRecord<>(response)); - LOG.info( - "Aggregated result for table {}, task {}[{}] is {}", - tableName, - taskName, - taskIndex, - response); - exceptions.clear(); - startTime = 0L; + if (startTime != 0L) { + TaskResult response = new TaskResult(taskIndex, startTime, exceptions.isEmpty(), exceptions); + output.collect(new StreamRecord<>(response)); + LOG.info( + "Aggregated result for table {}, task {}[{}] is {}", + tableName, + taskName, + taskIndex, + response); + exceptions.clear(); + startTime = 0L; + } super.processWatermark(mark); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 78f34f6c16..9a79b94aba 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.iceberg.DataFile; @@ -83,6 +84,7 @@ public class OperatorTestBase { static final long EVENT_TIME = 10L; static final long EVENT_TIME_2 = 11L; + static final Watermark WATERMARK = new Watermark(EVENT_TIME); protected static final String DUMMY_TASK_NAME = "dummyTask"; protected static final String DUMMY_TABLE_NAME = "dummyTable"; diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java index b17a8f9e5d..51d901e923 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java @@ -20,8 +20,9 @@ package org.apache.iceberg.flink.maintenance.operator; import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.iceberg.flink.maintenance.api.TaskResult; import org.apache.iceberg.flink.maintenance.api.Trigger; @@ -36,10 +37,42 @@ class TestTaskResultAggregator extends OperatorTestBase { try (TwoInputStreamOperatorTestHarness<Trigger, Exception, TaskResult> testHarness = new TwoInputStreamOperatorTestHarness<>(taskResultAggregator)) { testHarness.open(); - testHarness.processWatermark1(new Watermark(EVENT_TIME)); - testHarness.processWatermark2(new Watermark(EVENT_TIME)); + testHarness.processBothWatermarks(WATERMARK); ConcurrentLinkedQueue<Object> output = testHarness.getOutput(); - assertThat(output).containsOnlyOnce(new Watermark(EVENT_TIME)); + assertThat(output).containsOnlyOnce(WATERMARK); + } + } + + @Test + void testProcessWatermarkWithoutElement() throws Exception { + TaskResultAggregator taskResultAggregator = + new TaskResultAggregator("table-name", "task-name", 0); + try (TwoInputStreamOperatorTestHarness<Trigger, Exception, TaskResult> testHarness = + new TwoInputStreamOperatorTestHarness<>(taskResultAggregator)) { + testHarness.open(); + testHarness.processBothWatermarks(WATERMARK); + List<TaskResult> taskResults = testHarness.extractOutputValues(); + assertThat(taskResults).hasSize(0); + } + } + + @Test + void testProcessWatermark() throws Exception { + TaskResultAggregator taskResultAggregator = + new TaskResultAggregator("table-name", "task-name", 0); + try (TwoInputStreamOperatorTestHarness<Trigger, Exception, TaskResult> testHarness = + new TwoInputStreamOperatorTestHarness<>(taskResultAggregator)) { + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(Trigger.create(EVENT_TIME, 0))); + testHarness.processBothWatermarks(WATERMARK); + List<TaskResult> taskResults = testHarness.extractOutputValues(); + assertThat(taskResults).hasSize(1); + TaskResult taskResult = taskResults.get(0); + assertThat(taskResult.taskIndex()).isEqualTo(0); + assertThat(taskResult.startEpoch()).isEqualTo(EVENT_TIME); + assertThat(taskResult.success()).isEqualTo(true); + assertThat(taskResult.exceptions()).hasSize(0); } } }