This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new b050314a03 Flink: Fix watermark no pass in TaskResultAggregator (#13069) b050314a03 is described below commit b050314a035f7916667e8441de6f100eafcc9ddb Author: GuoYu <511955...@qq.com> AuthorDate: Sat May 17 03:01:00 2025 +0800 Flink: Fix watermark no pass in TaskResultAggregator (#13069) --- .../maintenance/operator/TaskResultAggregator.java | 4 +- .../operator/TestTaskResultAggregator.java | 45 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java index cceb043a26..f45d2542ce 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResultAggregator.java @@ -86,7 +86,7 @@ public class TaskResultAggregator extends AbstractStreamOperator<TaskResult> } @Override - public void processWatermark(Watermark mark) { + public void processWatermark(Watermark mark) throws Exception { TaskResult response = new TaskResult(taskIndex, startTime, exceptions.isEmpty(), exceptions); output.collect(new StreamRecord<>(response)); LOG.info( @@ -97,5 +97,7 @@ public class TaskResultAggregator extends AbstractStreamOperator<TaskResult> response); exceptions.clear(); startTime = 0L; + + super.processWatermark(mark); } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java new file mode 100644 index 0000000000..b17a8f9e5d --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTaskResultAggregator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.iceberg.flink.maintenance.api.TaskResult; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.junit.jupiter.api.Test; + +class TestTaskResultAggregator extends OperatorTestBase { + + @Test + void testPassWatermark() throws Exception { + TaskResultAggregator taskResultAggregator = + new TaskResultAggregator("table-name", "task-name", 0); + try (TwoInputStreamOperatorTestHarness<Trigger, Exception, TaskResult> testHarness = + new TwoInputStreamOperatorTestHarness<>(taskResultAggregator)) { + testHarness.open(); + testHarness.processWatermark1(new Watermark(EVENT_TIME)); + testHarness.processWatermark2(new Watermark(EVENT_TIME)); + ConcurrentLinkedQueue<Object> output = testHarness.getOutput(); + assertThat(output).containsOnlyOnce(new Watermark(EVENT_TIME)); + } + } +}