This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8a73e94542f8992b5f6dd268c18f0c8d3bdb6c6a Author: Arvid Heise <[email protected]> AuthorDate: Thu Mar 17 23:08:46 2022 +0100 [FLINK-25256][streaming] Externally induced sources replay barriers received over RPC instead of inventing them out of thin air. This change preserves the CheckpointOptions and properly integrates user-triggered snapshots and workflows with more than one source. The externally induced source now merely delays the barrier instead of being able to insert one at a whim which would never work in aforementioned setups. --- .../io/StreamTaskExternallyInducedSourceInput.java | 24 +++ .../runtime/tasks/SourceOperatorStreamTask.java | 187 +++++++++++++++++---- .../tasks/SourceOperatorStreamTaskTest.java | 79 ++++++++- 3 files changed, 254 insertions(+), 36 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java index fff008c..ca0462e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskExternallyInducedSourceInput.java @@ -21,12 +21,14 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.api.connector.source.ExternallyInducedSourceReader; import org.apache.flink.streaming.api.operators.SourceOperator; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; /** A subclass of {@link StreamTaskSourceInput} for {@link ExternallyInducedSourceReader}. */ public class StreamTaskExternallyInducedSourceInput<T> extends StreamTaskSourceInput<T> { private final Consumer<Long> checkpointTriggeringHook; private final ExternallyInducedSourceReader<T, ?> sourceReader; + private CompletableFuture<?> blockFuture; @SuppressWarnings("unchecked") public StreamTaskExternallyInducedSourceInput( @@ -39,12 +41,34 @@ public class StreamTaskExternallyInducedSourceInput<T> extends StreamTaskSourceI this.sourceReader = (ExternallyInducedSourceReader<T, ?>) operator.getSourceReader(); } + public void blockUntil(CompletableFuture<?> blockFuture) { + this.blockFuture = blockFuture; + // assume that the future is completed in mailbox thread + blockFuture.whenComplete((v, e) -> unblock()); + } + + private void unblock() { + this.blockFuture = null; + } + @Override public DataInputStatus emitNext(DataOutput<T> output) throws Exception { + if (blockFuture != null) { + return DataInputStatus.NOTHING_AVAILABLE; + } + DataInputStatus status = super.emitNext(output); if (status == DataInputStatus.NOTHING_AVAILABLE) { sourceReader.shouldTriggerCheckpoint().ifPresent(checkpointTriggeringHook); } return status; } + + @Override + public CompletableFuture<?> getAvailableFuture() { + if (blockFuture != null) { + return blockFuture; + } + return super.getAvailableFuture(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index efc594b..3d28055 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -24,14 +24,12 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.SavepointType; import org.apache.flink.runtime.checkpoint.SnapshotType; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; -import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.SourceOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -48,7 +46,12 @@ import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nullable; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -57,7 +60,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>> { private AsyncDataOutputToOutput<T> output; - private boolean isExternallyInducedSource; + /** + * Contains information about all checkpoints where RPC from checkpoint coordinator arrives + * before the source reader triggers it. (Common case) + */ + private SortedMap<Long, UntriggeredCheckpoint> untriggeredCheckpoints = new TreeMap<>(); + /** + * Contains the checkpoints that are triggered by the source but the RPC from checkpoint + * coordinator has yet to arrive. This may happen if the barrier is inserted as an event into + * the data plane by the source coordinator and the (distributed) source reader reads that event + * before receiving Flink's checkpoint RPC. (Rare case) + */ + private SortedSet<Long> triggeredCheckpoints = new TreeSet<>(); + /** + * Blocks input until the RPC call has been received that corresponds to the triggered + * checkpoint. This future must only be accessed and completed in the mailbox thread. + */ + private CompletableFuture<Void> waitForRPC = FutureUtils.completedVoidFuture(); + /** Only set for externally induced sources. See also {@link #isExternallyInducedSource()}. */ + private StreamTaskExternallyInducedSourceInput<T> externallyInducedSourceInput; public SourceOperatorStreamTask(Environment env) throws Exception { super(env); @@ -79,14 +100,14 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, if (operatorChain.isTaskDeployedAsFinished()) { input = new StreamTaskFinishedOnRestoreSourceInput<>(sourceOperator, 0, 0); } else if (sourceReader instanceof ExternallyInducedSourceReader) { - isExternallyInducedSource = true; - - input = + externallyInducedSourceInput = new StreamTaskExternallyInducedSourceInput<>( sourceOperator, this::triggerCheckpointForExternallyInducedSource, 0, 0); + + input = externallyInducedSourceInput; } else { input = new StreamTaskSourceInput<>(sourceOperator, 0, 0); } @@ -112,20 +133,53 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, @Override public CompletableFuture<Boolean> triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { - if (!isExternallyInducedSource) { - if (isSynchronous(checkpointOptions.getCheckpointType())) { - return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions); - } else { - return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); - } - } else if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) { - // see FLINK-25256 - throw new IllegalStateException( - "Using externally induced sources, we can not enforce taking a full checkpoint." - + "If you are restoring from a snapshot in NO_CLAIM mode, please use" - + " either CLAIM or LEGACY mode."); + if (!isExternallyInducedSource()) { + return triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions); + } + CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>(); + // immediately move RPC to mailbox so we don't need to synchronize fields + mainMailboxExecutor.execute( + () -> + triggerCheckpointOnExternallyInducedSource( + checkpointMetaData, checkpointOptions, triggerFuture), + "SourceOperatorStreamTask#triggerCheckpointAsync(%s, %s)", + checkpointMetaData, + checkpointOptions); + return triggerFuture; + } + + private boolean isExternallyInducedSource() { + return externallyInducedSourceInput != null; + } + + private void triggerCheckpointOnExternallyInducedSource( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CompletableFuture<Boolean> triggerFuture) { + assert (mailboxProcessor.isMailboxThread()); + if (!triggeredCheckpoints.remove(checkpointMetaData.getCheckpointId())) { + // common case: RPC is received before source reader triggers checkpoint + // store metadata and options for later + untriggeredCheckpoints.put( + checkpointMetaData.getCheckpointId(), + new UntriggeredCheckpoint(checkpointMetaData, checkpointOptions)); + triggerFuture.complete(isRunning()); + } else { + // trigger already received (rare case) + FutureUtils.forward( + triggerCheckpointNowAsync(checkpointMetaData, checkpointOptions), + triggerFuture); + + cleanupOldCheckpoints(checkpointMetaData.getCheckpointId()); + } + } + + private CompletableFuture<Boolean> triggerCheckpointNowAsync( + CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { + if (isSynchronous(checkpointOptions.getCheckpointType())) { + return triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions); } else { - return CompletableFuture.completedFuture(isRunning()); + return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); } } @@ -159,22 +213,76 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, output.emitWatermark(Watermark.MAX_WATERMARK); } + @Override + protected void declineCheckpoint(long checkpointId) { + cleanupCheckpoint(checkpointId); + super.declineCheckpoint(checkpointId); + } + + @Override + public Future<Void> notifyCheckpointAbortAsync( + long checkpointId, long latestCompletedCheckpointId) { + mainMailboxExecutor.execute( + () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId); + return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId); + } + + @Override + public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) { + mainMailboxExecutor.execute( + () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId); + return super.notifyCheckpointSubsumedAsync(checkpointId); + } + // -------------------------- private void triggerCheckpointForExternallyInducedSource(long checkpointId) { - final CheckpointOptions checkpointOptions = - CheckpointOptions.forConfig( - CheckpointType.CHECKPOINT, - CheckpointStorageLocationReference.getDefault(), - configuration.isExactlyOnceCheckpointMode(), - configuration.isUnalignedCheckpointsEnabled(), - configuration.getAlignedCheckpointTimeout().toMillis()); - final long timestamp = System.currentTimeMillis(); + UntriggeredCheckpoint untriggeredCheckpoint = untriggeredCheckpoints.remove(checkpointId); + if (untriggeredCheckpoint != null) { + // common case: RPC before external sources induces it + triggerCheckpointNowAsync( + untriggeredCheckpoint.getMetadata(), + untriggeredCheckpoint.getCheckpointOptions()); + cleanupOldCheckpoints(checkpointId); + } else { + // rare case: external source induced first + triggeredCheckpoints.add(checkpointId); + if (waitForRPC.isDone()) { + waitForRPC = new CompletableFuture<>(); + externallyInducedSourceInput.blockUntil(waitForRPC); + } + } + } + + /** + * Cleanup any orphaned checkpoint before the given currently triggered checkpoint. These + * checkpoint may occur when the checkpoint is cancelled but the RPC is lost. Note, to be safe, + * checkpoint X is only removed when both RPC and trigger for a checkpoint Y>X is received. + */ + private void cleanupOldCheckpoints(long checkpointId) { + assert (mailboxProcessor.isMailboxThread()); + triggeredCheckpoints.headSet(checkpointId).clear(); + untriggeredCheckpoints.headMap(checkpointId).clear(); + + maybeResumeProcessing(); + } + + /** Resumes processing if it was blocked before or else is a no-op. */ + private void maybeResumeProcessing() { + assert (mailboxProcessor.isMailboxThread()); - final CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(checkpointId, timestamp, timestamp); + if (triggeredCheckpoints.isEmpty()) { + waitForRPC.complete(null); + } + } + + /** Remove temporary data about a canceled checkpoint. */ + private void cleanupCheckpoint(long checkpointId) { + assert (mailboxProcessor.isMailboxThread()); + triggeredCheckpoints.remove(checkpointId); + untriggeredCheckpoints.remove(checkpointId); - super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions); + maybeResumeProcessing(); } // --------------------------- @@ -225,4 +333,23 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, output.emitWatermarkStatus(watermarkStatus); } } + + private static class UntriggeredCheckpoint { + private final CheckpointMetaData metadata; + private final CheckpointOptions checkpointOptions; + + private UntriggeredCheckpoint( + CheckpointMetaData metadata, CheckpointOptions checkpointOptions) { + this.metadata = metadata; + this.checkpointOptions = checkpointOptions; + } + + public CheckpointMetaData getMetadata() { + return metadata; + } + + public CheckpointOptions getCheckpointOptions() { + return checkpointOptions; + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index 0ea212a..d3b9f64 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -38,6 +38,7 @@ import org.apache.flink.core.io.InputStatus; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.SavepointType; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -61,6 +62,8 @@ import org.apache.flink.util.SerializedValue; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.io.Serializable; @@ -74,6 +77,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Supplier; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals; import static org.assertj.core.api.Assertions.assertThat; @@ -86,6 +90,10 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { private static final OperatorID OPERATOR_ID = new OperatorID(); private static final int NUM_RECORDS = 10; + public static final CheckpointStorageLocationReference SAVEPOINT_LOCATION = + new CheckpointStorageLocationReference("Savepoint".getBytes()); + public static final CheckpointStorageLocationReference CHECKPOINT_LOCATION = + new CheckpointStorageLocationReference("Checkpoint".getBytes()); @Test void testMetrics() throws Exception { @@ -157,8 +165,35 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { } } - @Test - void testExternallyInducedSource() throws Exception { + static Stream<?> provideExternallyInducedParameters() { + return Stream.of( + CheckpointOptions.alignedNoTimeout( + SavepointType.savepoint(SavepointFormatType.CANONICAL), + SAVEPOINT_LOCATION), + CheckpointOptions.alignedNoTimeout( + SavepointType.terminate(SavepointFormatType.CANONICAL), + SAVEPOINT_LOCATION), + CheckpointOptions.alignedNoTimeout( + SavepointType.suspend(SavepointFormatType.CANONICAL), + SAVEPOINT_LOCATION), + CheckpointOptions.alignedNoTimeout( + CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION), + CheckpointOptions.alignedWithTimeout( + CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION, 123L), + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION), + CheckpointOptions.notExactlyOnce( + CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION)) + .flatMap( + options -> + Stream.of( + new Object[] {options, true}, + new Object[] {options, false})); + } + + @ParameterizedTest + @MethodSource("provideExternallyInducedParameters") + void testExternallyInducedSource(CheckpointOptions checkpointOptions, boolean rpcFirst) + throws Exception { final int numEventsBeforeCheckpoint = 10; final int totalNumEvents = 20; TestingExternallyInducedSourceReader testingReader = @@ -170,15 +205,47 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { ((SourceOperator) testHarness.getStreamTask().mainOperator) .getSourceReader(); - testHarness.processAll(); + CheckpointMetaData checkpointMetaData = + new CheckpointMetaData(TestingExternallyInducedSourceReader.CHECKPOINT_ID, 2); + if (rpcFirst) { + testHarness.streamTask.triggerCheckpointAsync( + checkpointMetaData, checkpointOptions); + testHarness.processAll(); + } else { + do { + testHarness.processSingleStep(); + } while (!runtimeTestingReader.shouldTriggerCheckpoint().isPresent()); + // stream task should block when trigger received but no RPC + assertThat(testHarness.streamTask.inputProcessor.isAvailable()).isFalse(); + CompletableFuture<Boolean> triggerCheckpointAsync = + testHarness.streamTask.triggerCheckpointAsync( + checkpointMetaData, checkpointOptions); + // process mails until checkpoint has been processed + while (!triggerCheckpointAsync.isDone()) { + testHarness.processSingleStep(); + } + // stream task should be unblocked now + assertThat(testHarness.streamTask.inputProcessor.isAvailable()).isTrue(); + testHarness.processAll(); + } - assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(totalNumEvents); + int expectedEvents = + checkpointOptions.getCheckpointType().isSavepoint() + && ((SavepointType) checkpointOptions.getCheckpointType()) + .isSynchronous() + ? numEventsBeforeCheckpoint + : totalNumEvents; + assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(expectedEvents); assertThat(runtimeTestingReader.checkpointed).isTrue(); assertThat(runtimeTestingReader.checkpointedId) .isEqualTo(TestingExternallyInducedSourceReader.CHECKPOINT_ID); assertThat(runtimeTestingReader.checkpointedAt).isEqualTo(numEventsBeforeCheckpoint); Assertions.assertThat(testHarness.getOutput()) - .contains(new CheckpointBarrier(2, 2, checkpointOptions)); + .contains( + new CheckpointBarrier( + checkpointMetaData.getCheckpointId(), + checkpointMetaData.getTimestamp(), + checkpointOptions)); } } @@ -262,7 +329,7 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { new CheckpointMetaData(2, 2), CheckpointOptions.alignedNoTimeout( SavepointType.terminate(SavepointFormatType.CANONICAL), - CheckpointStorageLocationReference.getDefault())); + SAVEPOINT_LOCATION)); checkpointCompleted.whenComplete( (ignored, exception) -> testHarness.streamTask.notifyCheckpointCompleteAsync(2));
