This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bb043471604ee9564876aebf85e415181f217a17 Author: Arvid Heise <[email protected]> AuthorDate: Fri Oct 18 15:00:19 2024 +0200 [FLINK-36455] Sinks retry synchronously Sinks so far retried asynchronously to increase commit throughput in case of temporary issues. However, the contract of notifyCheckpointCompleted states that checkpoints must be side-effect free meaning all transactions have to be committed on return of the PRC call. This commit retries a fixed number of times and then fails in notifyCheckpointCompleted. Note that sync retries significantly simplify the committable handling. This commit starts a few simplifications; the next commit clears up more. (cherry picked from commit bc0f241b86799a39d7ce08e5902e47c71bdaf68f) --- .../generated/common_miscellaneous_section.html | 6 +++ .../apache/flink/configuration/SinkOptions.java | 42 +++++++++++++++ .../connector/sink2/GlobalCommitterOperator.java | 27 ++++------ .../runtime/operators/sink/CommitterOperator.java | 60 +++++++++++---------- .../committables/CheckpointCommittableManager.java | 14 ++--- .../CheckpointCommittableManagerImpl.java | 62 +++++++++------------- .../sink/committables/CommittableCollector.java | 9 ++-- .../committables/SubtaskCommittableManager.java | 9 +++- .../sink2/GlobalCommitterOperatorTest.java | 1 + .../operators/sink/CommitterOperatorTestBase.java | 35 ++++++++++-- .../CheckpointCommittableManagerImplTest.java | 18 +++---- .../CommittableCollectorSerializerTest.java | 10 ++-- .../committables/CommittableCollectorTest.java | 6 +-- .../operators/sink/deprecated/TestSinkV2.java | 6 +-- 14 files changed, 184 insertions(+), 121 deletions(-) diff --git a/docs/layouts/shortcodes/generated/common_miscellaneous_section.html b/docs/layouts/shortcodes/generated/common_miscellaneous_section.html index 6ac31199dc7..26c6b3677df 100644 --- a/docs/layouts/shortcodes/generated/common_miscellaneous_section.html +++ b/docs/layouts/shortcodes/generated/common_miscellaneous_section.html @@ -26,5 +26,11 @@ <td>String</td> <td>Directories for temporary files, separated by",", "|", or the system's java.io.File.pathSeparator.</td> </tr> + <tr> + <td><h5>sink.committer.retries</h5></td> + <td style="word-wrap: break-word;">10</td> + <td>Integer</td> + <td>The number of retries a Flink application attempts for committable operations (such as transactions) on retriable errors, as specified by the sink connector, before Flink fails and potentially restarts.</td> + </tr> </tbody> </table> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SinkOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SinkOptions.java new file mode 100644 index 00000000000..2cf1431d939 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/SinkOptions.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.Documentation; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Configuration options for sinks. */ +@PublicEvolving +public class SinkOptions { + /** + * The number of retries on a committable (e.g., transaction) before Flink application fails and + * potentially restarts. + */ + @Documentation.Section(Documentation.Sections.COMMON_MISCELLANEOUS) + public static final ConfigOption<Integer> COMMITTER_RETRIES = + key("sink.committer.retries") + .intType() + .defaultValue(10) + .withDescription( + "The number of retries a Flink application attempts for committable operations (such as transactions) on retriable errors, as specified by the sink connector, before Flink fails and potentially restarts."); + + private SinkOptions() {} +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java index b3195d286d6..9310a92504e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink.GlobalCommitter; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; @@ -49,7 +50,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -118,6 +118,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamO private long lastCompletedCheckpointId = -1; private SimpleVersionedSerializer<CommT> committableSerializer; private SinkCommitterMetricGroup metricGroup; + private int maxRetries; @Nullable private GlobalCommitter<CommT, GlobalCommT> globalCommitter; @Nullable private SimpleVersionedSerializer<GlobalCommT> globalCommittableSerializer; @@ -148,6 +149,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamO globalCommitter = gc.getGlobalCommitter(); globalCommittableSerializer = gc.getGlobalCommittableSerializer(); } + maxRetries = config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES); } @Override @@ -215,23 +217,14 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamO } lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointIdOrEOI); - // this is true for the last commit and we need to make sure that all committables are - // indeed committed as this function will never be invoked again - boolean waitForAllCommitted = - lastCompletedCheckpointId == EOI - && committableCollector - .getEndOfInputCommittable() - .map(CheckpointCommittableManager::hasGloballyReceivedAll) - .orElse(false); - do { - for (CheckpointCommittableManager<CommT> committable : - committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) { - if (committable.hasGloballyReceivedAll()) { - committable.commit(committer); - } + for (CheckpointCommittableManager<CommT> checkpointManager : + committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) { + if (!checkpointManager.hasGloballyReceivedAll()) { + return; } - committableCollector.compact(); - } while (waitForAllCommitted && !committableCollector.isFinished()); + checkpointManager.commit(committer, maxRetries); + committableCollector.remove(checkpointManager); + } } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index d155f5dd509..10ae86cf10d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -22,12 +22,14 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -65,7 +67,6 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage implements OneInputStreamOperator<CommittableMessage<CommT>, CommittableMessage<CommT>>, BoundedOneInput { - private static final long RETRY_DELAY = 1000; private final SimpleVersionedSerializer<CommT> committableSerializer; private final FunctionWithException<CommitterInitContext, Committer<CommT>, IOException> committerSupplier; @@ -76,6 +77,7 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage private Committer<CommT> committer; private CommittableCollector<CommT> committableCollector; private long lastCompletedCheckpointId = -1; + private int maxRetries; private boolean endInput = false; @@ -111,6 +113,7 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage super.setup(containingTask, config, output); metricGroup = InternalSinkCommitterMetricGroup.wrap(getMetricGroup()); committableCollector = CommittableCollector.of(metricGroup); + maxRetries = config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES); } @Override @@ -161,41 +164,42 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage private void commitAndEmitCheckpoints() throws IOException, InterruptedException { long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; - do { - for (CheckpointCommittableManager<CommT> manager : - committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { - commitAndEmit(manager); - } - // !committableCollector.isFinished() indicates that we should retry - // Retry should be done here if this is a final checkpoint (indicated by endInput) - // WARN: this is an endless retry, may make the job stuck while finishing - } while (!committableCollector.isFinished() && endInput); - - if (!committableCollector.isFinished()) { - // if not endInput, we can schedule retrying later - retryWithDelay(); + for (CheckpointCommittableManager<CommT> checkpointManager : + committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { + // ensure that all committables of the first checkpoint are fully committed before + // attempting the next committable + commitAndEmit(checkpointManager); + committableCollector.remove(checkpointManager); } - committableCollector.compact(); } private void commitAndEmit(CheckpointCommittableManager<CommT> committableManager) throws IOException, InterruptedException { - Collection<CommittableWithLineage<CommT>> committed = committableManager.commit(committer); - if (emitDownstream && committableManager.isFinished()) { - int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); - int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); - output.collect( - new StreamRecord<>(committableManager.getSummary(subtaskId, numberOfSubtasks))); - for (CommittableWithLineage<CommT> committable : committed) { - output.collect(new StreamRecord<>(committable.withSubtaskId(subtaskId))); - } + committableManager.commit(committer, maxRetries); + if (emitDownstream) { + emit(committableManager); } } - private void retryWithDelay() { - processingTimeService.registerTimer( - processingTimeService.getCurrentProcessingTime() + RETRY_DELAY, - ts -> commitAndEmitCheckpoints()); + private void emit(CheckpointCommittableManager<CommT> committableManager) { + int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + long checkpointId = committableManager.getCheckpointId(); + Collection<CommT> committables = committableManager.getSuccessfulCommittables(); + output.collect( + new StreamRecord<>( + new CommittableSummary<>( + subtaskId, + numberOfSubtasks, + checkpointId, + committables.size(), + 0, + 0))); + for (CommT committable : committables) { + output.collect( + new StreamRecord<>( + new CommittableWithLineage<>(committable, checkpointId, subtaskId))); + } } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java index ada7f8640f0..4e34fbbe698 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java @@ -20,8 +20,6 @@ package org.apache.flink.streaming.runtime.operators.sink.committables; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import java.io.IOException; import java.util.Collection; @@ -50,12 +48,6 @@ public interface CheckpointCommittableManager<CommT> { /** Returns the number of upstream subtasks belonging to the checkpoint. */ int getNumberOfSubtasks(); - /** - * Returns a summary of the current commit progress for the emitting subtask identified by the - * parameters. - */ - CommittableSummary<CommT> getSummary(int emittingSubtaskId, int emittingNumberOfSubtasks); - boolean isFinished(); /** @@ -69,8 +61,10 @@ public interface CheckpointCommittableManager<CommT> { * checkpoint have been received. * * @param committer used to commit to the external system - * @return successfully committed committables with meta information + * @param maxRetries */ - Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer) + void commit(Committer<CommT> committer, int maxRetries) throws IOException, InterruptedException; + + Collection<CommT> getSuccessfulCommittables(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index bb6cceead47..00c3b7f65d4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -126,24 +126,6 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa return checkNotNull(committables, "Unknown subtask for %s", subtaskId); } - @Override - public CommittableSummary<CommT> getSummary( - int emittingSubtaskId, int emittingNumberOfSubtasks) { - return new CommittableSummary<>( - emittingSubtaskId, - emittingNumberOfSubtasks, - checkpointId, - subtasksCommittableManagers.values().stream() - .mapToInt(SubtaskCommittableManager::getNumCommittables) - .sum(), - subtasksCommittableManagers.values().stream() - .mapToInt(SubtaskCommittableManager::getNumPending) - .sum(), - subtasksCommittableManagers.values().stream() - .mapToInt(SubtaskCommittableManager::getNumFailed) - .sum()); - } - @Override public boolean isFinished() { return subtasksCommittableManagers.values().stream() @@ -158,20 +140,34 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa } @Override - public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer) + public void commit(Committer<CommT> committer, int maxRetries) throws IOException, InterruptedException { - Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(true); - requests.forEach(CommitRequestImpl::setSelected); - committer.commit(new ArrayList<>(requests)); - requests.forEach(CommitRequestImpl::setCommittedIfNoError); - Collection<CommittableWithLineage<CommT>> committed = drainFinished(); - metricGroup.setCurrentPendingCommittablesGauge(() -> getPendingRequests(false).size()); - return committed; + Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(); + for (int retry = 0; !requests.isEmpty() && retry <= maxRetries; retry++) { + requests.forEach(CommitRequestImpl::setSelected); + committer.commit(new ArrayList<>(requests)); + requests.forEach(CommitRequestImpl::setCommittedIfNoError); + requests = requests.stream().filter(r -> !r.isFinished()).collect(Collectors.toList()); + metricGroup.setCurrentPendingCommittablesGauge(requests::size); + } + if (!requests.isEmpty()) { + throw new IOException( + String.format( + "Failed to commit %s committables after %s retries: %s", + requests.size(), maxRetries, requests)); + } } - Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean assertFull) { + @Override + public Collection<CommT> getSuccessfulCommittables() { return subtasksCommittableManagers.values().stream() - .peek(subtask -> assertReceivedAll(assertFull, subtask)) + .flatMap(SubtaskCommittableManager::getSuccessfulCommittables) + .collect(Collectors.toList()); + } + + Collection<CommitRequestImpl<CommT>> getPendingRequests() { + return subtasksCommittableManagers.values().stream() + .peek(this::assertReceivedAll) .flatMap(SubtaskCommittableManager::getPendingRequests) .collect(Collectors.toList()); } @@ -192,20 +188,14 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa * <p>This assertion will fail in case of bugs in the writer or in the pre-commit topology if * present. */ - private void assertReceivedAll(boolean assertFull, SubtaskCommittableManager<CommT> subtask) { + private void assertReceivedAll(SubtaskCommittableManager<CommT> subtask) { Preconditions.checkArgument( - !assertFull || subtask.hasReceivedAll(), + subtask.hasReceivedAll(), "Trying to commit incomplete batch of committables subtask=%s, manager=%s", subtask.getSubtaskId(), this); } - Collection<CommittableWithLineage<CommT>> drainFinished() { - return subtasksCommittableManagers.values().stream() - .flatMap(subtask -> subtask.drainCommitted().stream()) - .collect(Collectors.toList()); - } - CheckpointCommittableManagerImpl<CommT> merge(CheckpointCommittableManagerImpl<CommT> other) { checkArgument(other.checkpointId == checkpointId); for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry : diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index 37a7b72f7b3..098de7f186e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -132,7 +133,7 @@ public class CommittableCollector<CommT> { */ public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCommittablesUpTo( long checkpointId) { - return checkpointCommittables.headMap(checkpointId, true).values(); + return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values()); } /** @@ -208,9 +209,9 @@ public class CommittableCollector<CommT> { return checkNotNull(committables, "Unknown checkpoint for %s", committable); } - /** Removes all metadata about checkpoints of which all committables are fully committed. */ - public void compact() { - checkpointCommittables.values().removeIf(CheckpointCommittableManagerImpl::isFinished); + /** Removes the manager for a specific checkpoint and all it's metadata. */ + public void remove(CheckpointCommittableManager<CommT> manager) { + checkpointCommittables.remove(manager.getCheckpointId()); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java index 185d6e0fe78..3128a2d083c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java @@ -35,6 +35,7 @@ import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState.COMMITTED; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -138,6 +139,12 @@ class SubtaskCommittableManager<CommT> { return requests.stream().filter(c -> !c.isFinished()); } + Stream<CommT> getSuccessfulCommittables() { + return getRequests().stream() + .filter(c -> c.getState() == COMMITTED) + .map(CommitRequestImpl::getCommittable); + } + /** * Iterates through all currently registered {@link #requests} and returns all {@link * CommittableWithLineage} that could be successfully committed. @@ -184,7 +191,7 @@ class SubtaskCommittableManager<CommT> { return checkpointId; } - Deque<CommitRequestImpl<CommT>> getRequests() { + Collection<CommitRequestImpl<CommT>> getRequests() { return requests; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java index 865a24175ae..641a651e2e4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java @@ -63,6 +63,7 @@ class GlobalCommitterOperatorTest { if (commitOnInput) { assertThat(committer.committed).containsExactly(1, 2); } else { + // 3PC behavior assertThat(committer.committed).isEmpty(); testHarness.notifyOfCompletedCheckpoint(cid + 1); assertThat(committer.committed).containsExactly(1, 2); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index e63ec66c41b..756ea0c8022 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.configuration.SinkOptions; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; @@ -26,6 +27,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.assertj.core.api.AbstractThrowableAssert; import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -201,10 +203,6 @@ abstract class CommitterOperatorTestBase { testHarness.processElement(new StreamRecord<>(second)); final OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L); - - // Trigger first checkpoint but committer needs retry - testHarness.notifyOfCompletedCheckpoint(0); - assertThat(testHarness.getOutput()).isEmpty(); testHarness.close(); @@ -244,6 +242,35 @@ abstract class CommitterOperatorTestBase { restored.close(); } + @ParameterizedTest + @ValueSource(ints = {0, 1}) + void testNumberOfRetries(int numRetries) throws Exception { + try (OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + createTestHarness( + sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0)) { + testHarness + .getStreamConfig() + .getConfiguration() + .set(SinkOptions.COMMITTER_RETRIES, numRetries); + testHarness.open(); + + long ckdId = 1L; + testHarness.processElement( + new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0, 0))); + testHarness.processElement( + new StreamRecord<>(new CommittableWithLineage<>("1", ckdId, 0))); + AbstractThrowableAssert<?, ? extends Throwable> throwableAssert = + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId)); + if (numRetries == 0) { + throwableAssert.hasMessageContaining("Failed to commit 1 committables"); + } else { + throwableAssert.doesNotThrowAnyException(); + } + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java index fcc2f559043..56804ce9f2b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java @@ -23,7 +23,6 @@ import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -41,6 +40,7 @@ class CheckpointCommittableManagerImplTest { private static final SinkCommitterMetricGroup METRIC_GROUP = MetricsGroupTestUtils.mockCommitterMetricGroup(); + private static final int MAX_RETRIES = 1; @Test void testAddSummary() { @@ -74,19 +74,20 @@ class CheckpointCommittableManagerImplTest { final Committer<Integer> committer = new NoOpCommitter(); // Only commit fully received committables - assertThatCode(() -> checkpointCommittables.commit(committer)) + assertThatCode(() -> checkpointCommittables.commit(committer, MAX_RETRIES)) .hasMessageContaining("Trying to commit incomplete batch of committables"); // Even on retry - assertThatCode(() -> checkpointCommittables.commit(committer)) + assertThatCode(() -> checkpointCommittables.commit(committer, MAX_RETRIES)) .hasMessageContaining("Trying to commit incomplete batch of committables"); // Add missing committable checkpointCommittables.addCommittable(new CommittableWithLineage<>(5, 1L, 2)); // Commit all committables - assertThat(checkpointCommittables.commit(committer)) + assertThatCode(() -> checkpointCommittables.commit(committer, MAX_RETRIES)) + .doesNotThrowAnyException(); + assertThat(checkpointCommittables.getSuccessfulCommittables()) .hasSize(3) - .extracting(CommittableWithLineage::getCommittable) .containsExactlyInAnyOrder(3, 4, 5); } @@ -118,10 +119,9 @@ class CheckpointCommittableManagerImplTest { CheckpointCommittableManagerImpl<Integer> copy = original.copy(); assertThat(copy.getCheckpointId()).isEqualTo(checkpointId); - SinkV2Assertions.assertThat(copy.getSummary(subtaskId, numberOfSubtasks)) - .hasNumberOfSubtasks(numberOfSubtasks) - .hasSubtaskId(subtaskId) - .hasCheckpointId(checkpointId); + assertThat(copy) + .returns(numberOfSubtasks, CheckpointCommittableManagerImpl::getNumberOfSubtasks) + .returns(checkpointId, CheckpointCommittableManagerImpl::getCheckpointId); } private static class NoOpCommitter implements Committer<Integer> { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java index 0b4bddd36a4..c90be1cd03c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer; -import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; @@ -205,11 +204,10 @@ class CommittableCollectorSerializerTest { checkpointCommittableManager.getSubtaskCommittableManager( subtaskId); - SinkV2Assertions.assertThat( - checkpointCommittableManager.getSummary( - subtaskId, numberOfSubtasks)) - .hasSubtaskId(subtaskId) - .hasNumberOfSubtasks(numberOfSubtasks); + assertThat(checkpointCommittableManager) + .returns( + numberOfSubtasks, + CheckpointCommittableManagerImpl::getNumberOfSubtasks); assertPendingRequests( subtaskCommittableManager, expectedPendingRequestCount); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java index b8a58aa6c65..6e55adcc0c5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.operators.sink.committables; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; -import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.junit.jupiter.api.Test; @@ -59,7 +58,8 @@ class CommittableCollectorTest { Optional<CheckpointCommittableManager<Integer>> endOfInputCommittable = committableCollector.getEndOfInputCommittable(); assertThat(endOfInputCommittable).isPresent(); - SinkV2Assertions.assertThat(endOfInputCommittable.get().getSummary(1, 1)) - .hasCheckpointId(EOI); + assertThat(endOfInputCommittable) + .get() + .returns(EOI, CheckpointCommittableManager::getCheckpointId); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java index e240de21066..dfaaf17d884 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java @@ -406,17 +406,17 @@ public class TestSinkV2<InputT> implements Sink<InputT> { /** A {@link Committer} that always re-commits the committables data it received. */ public static class RetryOnceCommitter extends DefaultCommitter { - private final Set<CommitRequest<String>> seen = new LinkedHashSet<>(); + private final Set<String> seen = new LinkedHashSet<>(); @Override public void commit(Collection<CommitRequest<String>> committables) { committables.forEach( c -> { - if (seen.remove(c)) { + if (seen.remove(c.getCommittable())) { checkNotNull(committedData); committedData.add(c); } else { - seen.add(c); + seen.add(c.getCommittable()); c.retryLater(); } });
