This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dd8f4e2603309493300099396568ffc681e76e80 Author: fanrui <[email protected]> AuthorDate: Sat May 14 22:50:39 2022 +0800 [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers --- .../runtime/checkpoint/CheckpointOptions.java | 6 +- .../channel/ChannelStateWriteRequest.java | 61 ++++++- .../checkpoint/channel/ChannelStateWriter.java | 24 +++ .../checkpoint/channel/ChannelStateWriterImpl.java | 18 ++ .../io/network/api/writer/RecordWriter.java | 9 + .../network/api/writer/ResultPartitionWriter.java | 7 + .../partition/BoundedBlockingSubpartition.java | 11 ++ .../partition/BufferWritingResultPartition.java | 15 ++ .../network/partition/PipelinedSubpartition.java | 196 ++++++++++++++++++++- .../io/network/partition/ResultSubpartition.java | 5 + .../partition/SortMergeResultPartition.java | 11 ++ .../ChannelStateWriteRequestDispatcherTest.java | 19 ++ .../checkpoint/channel/MockChannelStateWriter.java | 19 ++ .../network/partition/InputChannelTestUtils.java | 2 + .../partition/MockResultPartitionWriter.java | 7 + .../partition/PipelinedSubpartitionTest.java | 143 +++++++++++++++ .../runtime/state/ChannelPersistenceITCase.java | 34 +++- .../streaming/runtime/io/RecordWriterOutput.java | 9 + ...tractAlternatingAlignedBarrierHandlerState.java | 1 + .../AlternatingCollectingBarriers.java | 1 + .../streaming/runtime/tasks/OperatorChain.java | 13 ++ .../flink/streaming/runtime/tasks/StreamTask.java | 22 ++- .../tasks/SubtaskCheckpointCoordinatorImpl.java | 111 ++++++++++-- .../MockSubtaskCheckpointCoordinatorBuilder.java | 3 +- .../tasks/SubtaskCheckpointCoordinatorTest.java | 61 ++++++- 25 files changed, 766 insertions(+), 42 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java index bc5b1dd9006..eaa30a3f81c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java @@ -186,6 +186,10 @@ public class CheckpointOptions implements Serializable { return alignmentType == AlignmentType.UNALIGNED; } + public boolean needsChannelState() { + return isUnalignedCheckpoint() || isTimeoutable(); + } + public CheckpointOptions withUnalignedSupported() { if (alignmentType == AlignmentType.FORCED_ALIGNED) { return alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT @@ -196,7 +200,7 @@ public class CheckpointOptions implements Serializable { } public CheckpointOptions withUnalignedUnsupported() { - if (isUnalignedCheckpoint() || isTimeoutable()) { + if (needsChannelState()) { return forceAligned(checkpointType, targetLocation, alignedCheckpointTimeout); } return this; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java index d8d19232df8..91a96a1d886 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java @@ -24,6 +24,9 @@ import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingConsumer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -72,6 +75,48 @@ interface ChannelStateWriteRequest { (writer, buffer) -> writer.writeOutput(info, buffer)); } + static ChannelStateWriteRequest write( + long checkpointId, + ResultSubpartitionInfo info, + CompletableFuture<List<Buffer>> dataFuture) { + return buildFutureWriteRequest( + checkpointId, + "writeOutputFuture", + dataFuture, + (writer, buffer) -> writer.writeOutput(info, buffer)); + } + + static ChannelStateWriteRequest buildFutureWriteRequest( + long checkpointId, + String name, + CompletableFuture<List<Buffer>> dataFuture, + BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) { + return new CheckpointInProgressRequest( + name, + checkpointId, + writer -> { + List<Buffer> buffers; + try { + buffers = dataFuture.get(); + } catch (ExecutionException e) { + // If dataFuture fails, fail only the single related writer + writer.fail(e); + return; + } + for (Buffer buffer : buffers) { + checkBufferIsBuffer(buffer); + bufferConsumer.accept(writer, buffer); + } + }, + throwable -> { + try { + CloseableIterator.fromList(dataFuture.get(), Buffer::recycleBuffer).close(); + } catch (ExecutionException ignored) { + } + }, + false); + } + static ChannelStateWriteRequest buildWriteRequest( long checkpointId, String name, @@ -83,12 +128,7 @@ interface ChannelStateWriteRequest { writer -> { while (iterator.hasNext()) { Buffer buffer = iterator.next(); - try { - checkArgument(buffer.isBuffer()); - } catch (Exception e) { - buffer.recycleBuffer(); - throw e; - } + checkBufferIsBuffer(buffer); bufferConsumer.accept(writer, buffer); } }, @@ -96,6 +136,15 @@ interface ChannelStateWriteRequest { false); } + static void checkBufferIsBuffer(Buffer buffer) { + try { + checkArgument(buffer.isBuffer()); + } catch (Exception e) { + buffer.recycleBuffer(); + throw e; + } + } + static ChannelStateWriteRequest start( long checkpointId, ChannelStateWriteResult targetResult, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index 43a81d114fc..74eef6c9336 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -27,6 +27,7 @@ import org.apache.flink.util.CloseableIterator; import java.io.Closeable; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; /** Writes channel state during checkpoint/savepoint. */ @@ -128,6 +129,22 @@ public interface ChannelStateWriter extends Closeable { long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) throws IllegalArgumentException; + /** + * Add in-flight bufferFuture from the {@link + * org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}. Must be + * called after {@link #start} and before {@link #finishOutput(long)}. Buffers are recycled + * after they are written or exception occurs. + * + * <p>The method will be called when the unaligned checkpoint is enabled and received an aligned + * barrier. + */ + void addOutputDataFuture( + long checkpointId, + ResultSubpartitionInfo info, + int startSeqNum, + CompletableFuture<List<Buffer>> data) + throws IllegalArgumentException; + /** * Finalize write of channel state data for the given checkpoint id. Must be called after {@link * #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added. @@ -178,6 +195,13 @@ public interface ChannelStateWriter extends Closeable { public void addOutputData( long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {} + @Override + public void addOutputDataFuture( + long checkpointId, + ResultSubpartitionInfo info, + int startSeqNum, + CompletableFuture<List<Buffer>> data) {} + @Override public void finishInput(long checkpointId) {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index ef8f58b594a..40d09f1a21a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -171,6 +173,22 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { enqueue(write(checkpointId, info, data), false); } + @Override + public void addOutputDataFuture( + long checkpointId, + ResultSubpartitionInfo info, + int startSeqNum, + CompletableFuture<List<Buffer>> dataFuture) + throws IllegalArgumentException { + LOG.trace( + "{} adding output data future, checkpoint {}, channel: {}, startSeqNum: {}", + taskName, + checkpointId, + info, + startSeqNum); + enqueue(write(checkpointId, info, dataFuture), false); + } + @Override public void finishInput(long checkpointId) { LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 4be9941abdd..9d5f351ef89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; @@ -122,6 +123,14 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai } } + public void alignedBarrierTimeout(long checkpointId) throws IOException { + targetPartition.alignedBarrierTimeout(checkpointId); + } + + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + targetPartition.abortCheckpoint(checkpointId, cause); + } + @VisibleForTesting public static ByteBuffer serializeRecord( DataOutputSerializer serializer, IOReadableWritable record) throws IOException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index 081565fc83e..9a505722f6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.api.writer; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.io.network.api.StopMode; @@ -67,6 +68,12 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid /** Writes the given {@link AbstractEvent} to all channels. */ void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException; + /** Timeout the aligned barrier to unaligned barrier. */ + void alignedBarrierTimeout(long checkpointId) throws IOException; + + /** Abort the checkpoint. */ + void abortCheckpoint(long checkpointId, CheckpointException cause); + /** * Notifies the downstream tasks that this {@code ResultPartitionWriter} have emitted all the * user records. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 0a5b0fcab73..c09975cc606 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -291,6 +292,16 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { return data.getSize(); } + @Override + public void alignedBarrierTimeout(long checkpointId) { + // Nothing to do. + } + + @Override + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + // Nothing to do. + } + int getBuffersInBacklogUnsafe() { return numDataBuffersWritten; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java index 4e82d5beda1..2422f3ad808 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; @@ -206,6 +207,20 @@ public abstract class BufferWritingResultPartition extends ResultPartition { } } + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException { + for (ResultSubpartition subpartition : subpartitions) { + subpartition.alignedBarrierTimeout(checkpointId); + } + } + + @Override + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + for (ResultSubpartition subpartition : subpartitions) { + subpartition.abortCheckpoint(checkpointId, cause); + } + } + @Override public void setMetricGroup(TaskIOMetricGroup metrics) { super.setMetricGroup(metrics); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 125e38fe34d..8be8a287836 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -41,8 +42,10 @@ import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import static java.util.Objects.requireNonNull; import static org.apache.flink.util.Preconditions.checkArgument; @@ -112,6 +115,17 @@ public class PipelinedSubpartition extends ResultSubpartition private int bufferSize = Integer.MAX_VALUE; + /** The channelState Future of unaligned checkpoint. */ + @GuardedBy("buffers") + private CompletableFuture<List<Buffer>> channelStateFuture; + + /** + * It is the checkpointId corresponding to channelStateFuture. And It should be always update + * with {@link #channelStateFuture}. + */ + @GuardedBy("buffers") + private long channelStateCheckpointId; + /** * Whether this subpartition is blocked (e.g. by exactly once checkpoint) and is waiting for * resumption. @@ -205,6 +219,9 @@ public class PipelinedSubpartition extends ResultSubpartition assert Thread.holdsLock(buffers); if (bufferConsumer.getDataType().hasPriority()) { return processPriorityBuffer(bufferConsumer, partialRecordLength); + } else if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()) { + processTimeoutableCheckpointBarrier(bufferConsumer); } buffers.add(new BufferConsumerWithPartialRecordLength(bufferConsumer, partialRecordLength)); return false; @@ -240,9 +257,151 @@ public class PipelinedSubpartition extends ResultSubpartition inflightBuffers.toArray(new Buffer[0])); } } - return numPriorityElements == 1 - && !isBlocked; // if subpartition is blocked then downstream doesn't expect any - // notifications + return needNotifyPriorityEvent(); + } + + // It just be called after add priorityEvent. + private boolean needNotifyPriorityEvent() { + assert Thread.holdsLock(buffers); + // if subpartition is blocked then downstream doesn't expect any notifications + return buffers.getNumPriorityElements() == 1 && !isBlocked; + } + + private void processTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + channelStateWriter.addOutputDataFuture( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + createChannelStateFuture(barrier.getId())); + } + + private CompletableFuture<List<Buffer>> createChannelStateFuture(long checkpointId) { + assert Thread.holdsLock(buffers); + if (channelStateFuture != null) { + completeChannelStateFuture( + null, + new IllegalStateException( + String.format( + "%s has uncompleted channelStateFuture of checkpointId=%s, but it received " + + "a new timeoutable checkpoint barrier of checkpointId=%s, it maybe " + + "a bug due to currently not supported concurrent unaligned checkpoint.", + this, channelStateCheckpointId, checkpointId))); + } + channelStateFuture = new CompletableFuture<>(); + channelStateCheckpointId = checkpointId; + return channelStateFuture; + } + + private void completeChannelStateFuture(List<Buffer> channelResult, Throwable e) { + assert Thread.holdsLock(buffers); + if (e != null) { + channelStateFuture.completeExceptionally(e); + } else { + channelStateFuture.complete(channelResult); + } + channelStateFuture = null; + } + + private boolean isChannelStateFutureAvailable(long checkpointId) { + assert Thread.holdsLock(buffers); + return channelStateFuture != null && channelStateCheckpointId == checkpointId; + } + + private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier( + BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer); + checkArgument(barrier != null, "Parse the timeoutable Checkpoint Barrier failed."); + checkState( + barrier.getCheckpointOptions().isTimeoutable() + && Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()); + return barrier; + } + + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException { + int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER; + synchronized (buffers) { + // The checkpoint barrier has sent to downstream, so nothing to do. + if (!isChannelStateFutureAvailable(checkpointId)) { + return; + } + + // 1. find inflightBuffers and timeout the aligned barrier to unaligned barrier + List<Buffer> inflightBuffers = new ArrayList<>(); + try { + if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, inflightBuffers)) { + prioritySequenceNumber = sequenceNumber; + } + } catch (IOException e) { + inflightBuffers.forEach(Buffer::recycleBuffer); + completeChannelStateFuture(null, e); + throw e; + } + + // 2. complete the channelStateFuture + completeChannelStateFuture(inflightBuffers, null); + } + + // 3. notify downstream read barrier, it must be called outside the buffers_lock to avoid + // the deadlock. + notifyPriorityEvent(prioritySequenceNumber); + } + + @Override + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + synchronized (buffers) { + if (isChannelStateFutureAvailable(checkpointId)) { + completeChannelStateFuture(null, cause); + } + } + } + + private boolean findInflightBuffersAndMakeBarrierToPriority( + long checkpointId, List<Buffer> inflightBuffers) throws IOException { + // 1. record the buffers before barrier as inflightBuffers + final int numPriorityElements = buffers.getNumPriorityElements(); + final Iterator<BufferConsumerWithPartialRecordLength> iterator = buffers.iterator(); + Iterators.advance(iterator, numPriorityElements); + + BufferConsumerWithPartialRecordLength element = null; + CheckpointBarrier barrier = null; + while (iterator.hasNext()) { + BufferConsumerWithPartialRecordLength next = iterator.next(); + BufferConsumer bufferConsumer = next.getBufferConsumer(); + + if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()) { + barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + // It may be a aborted barrier + if (barrier.getId() != checkpointId) { + continue; + } + element = next; + break; + } else if (bufferConsumer.isBuffer()) { + try (BufferConsumer bc = bufferConsumer.copy()) { + inflightBuffers.add(bc.build()); + } + } + } + + // 2. Make the barrier to be priority + checkNotNull( + element, "The checkpoint barrier=%d don't find in %s.", checkpointId, toString()); + makeBarrierToPriority(element, barrier); + + return needNotifyPriorityEvent(); + } + + private void makeBarrierToPriority( + BufferConsumerWithPartialRecordLength oldElement, CheckpointBarrier barrier) + throws IOException { + buffers.getAndRemove(oldElement::equals); + buffers.addPriorityElement( + new BufferConsumerWithPartialRecordLength( + EventSerializer.toBufferConsumer(barrier.asUnaligned(), true), 0)); } @Nullable @@ -280,6 +439,12 @@ public class PipelinedSubpartition extends ResultSubpartition } buffers.clear(); + if (channelStateFuture != null) { + IllegalStateException exception = + new IllegalStateException("The PipelinedSubpartition is released"); + completeChannelStateFuture(null, exception); + } + view = readView; readView = null; @@ -312,7 +477,10 @@ public class PipelinedSubpartition extends ResultSubpartition buffers.peek(); BufferConsumer bufferConsumer = bufferConsumerWithPartialRecordLength.getBufferConsumer(); - + if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()) { + completeTimeoutableCheckpointBarrier(bufferConsumer); + } buffer = buildSliceBuffer(bufferConsumerWithPartialRecordLength); checkState( @@ -376,6 +544,15 @@ public class PipelinedSubpartition extends ResultSubpartition } } + private void completeTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + if (!isChannelStateFutureAvailable(barrier.getId())) { + // It happens on a previously aborted checkpoint. + return; + } + completeChannelStateFuture(Collections.emptyList(), null); + } + void resumeConsumption() { synchronized (buffers) { checkState(isBlocked, "Should be blocked by checkpoint."); @@ -627,4 +804,15 @@ public class PipelinedSubpartition extends ResultSubpartition BufferConsumerWithPartialRecordLength getNextBuffer() { return buffers.poll(); } + + /** for testing only. */ + @VisibleForTesting + CompletableFuture<List<Buffer>> getChannelStateFuture() { + return channelStateFuture; + } + + @VisibleForTesting + public long getChannelStateCheckpointId() { + return channelStateCheckpointId; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index ac99607331d..300d5a41061 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; @@ -61,6 +62,10 @@ public abstract class ResultSubpartition { parent.onConsumedSubpartition(getSubPartitionIndex()); } + public abstract void alignedBarrierTimeout(long checkpointId) throws IOException; + + public abstract void abortCheckpoint(long checkpointId, CheckpointException cause); + @VisibleForTesting public final int add(BufferConsumer bufferConsumer) throws IOException { return add(bufferConsumer, 0); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java index 190ca452c67..2f10b357877 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.network.api.EndOfData; @@ -270,6 +271,16 @@ public class SortMergeResultPartition extends ResultPartition { } } + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException { + // Nothing to do. + } + + @Override + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + // Nothing to do. + } + private void broadcast(ByteBuffer record, DataType dataType) throws IOException { emit(record, 0, dataType, true); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java index 9e01c958259..f425d057c86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java @@ -32,6 +32,7 @@ import org.junit.runners.Parameterized.Parameters; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -59,11 +60,14 @@ public class ChannelStateWriteRequestDispatcherTest { new Object[] {empty(), asList(start(), completeIn(), completeOut())}, new Object[] {empty(), asList(start(), writeIn(), completeIn())}, new Object[] {empty(), asList(start(), writeOut(), completeOut())}, + new Object[] {empty(), asList(start(), writeOutFuture(), completeOut())}, new Object[] {empty(), asList(start(), completeIn(), writeOut())}, + new Object[] {empty(), asList(start(), completeIn(), writeOutFuture())}, new Object[] {empty(), asList(start(), completeOut(), writeIn())}, // invalid without start new Object[] {of(IllegalArgumentException.class), singletonList(writeIn())}, new Object[] {of(IllegalArgumentException.class), singletonList(writeOut())}, + new Object[] {of(IllegalArgumentException.class), singletonList(writeOutFuture())}, new Object[] {of(IllegalArgumentException.class), singletonList(completeIn())}, new Object[] {of(IllegalArgumentException.class), singletonList(completeOut())}, // invalid double complete @@ -80,6 +84,9 @@ public class ChannelStateWriteRequestDispatcherTest { new Object[] { of(IllegalStateException.class), asList(start(), completeOut(), writeOut()) }, + new Object[] { + of(IllegalStateException.class), asList(start(), completeOut(), writeOutFuture()) + }, // invalid double start new Object[] {of(IllegalStateException.class), asList(start(), start())} }; @@ -113,6 +120,18 @@ public class ChannelStateWriteRequestDispatcherTest { FreeingBufferRecycler.INSTANCE)); } + private static ChannelStateWriteRequest writeOutFuture() { + CompletableFuture<List<Buffer>> outFuture = new CompletableFuture<>(); + ChannelStateWriteRequest writeRequest = + write(CHECKPOINT_ID, new ResultSubpartitionInfo(1, 1), outFuture); + outFuture.complete( + singletonList( + new NetworkBuffer( + MemorySegmentFactory.allocateUnpooledSegment(1), + FreeingBufferRecycler.INSTANCE))); + return writeRequest; + } + private static CheckpointStartRequest start() { return new CheckpointStartRequest( CHECKPOINT_ID, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java index b8910c85439..c77208f3ff7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java @@ -21,6 +21,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.util.CloseableIterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; + import static org.apache.flink.util.ExceptionUtils.rethrow; /** @@ -80,6 +83,22 @@ public class MockChannelStateWriter implements ChannelStateWriter { } } + @Override + public void addOutputDataFuture( + long checkpointId, + ResultSubpartitionInfo info, + int startSeqNum, + CompletableFuture<List<Buffer>> data) + throws IllegalArgumentException { + checkCheckpointId(checkpointId); + try { + for (final Buffer buffer : data.get()) { + buffer.recycleBuffer(); + } + } catch (Exception ignored) { + } + } + @Override public void finishInput(long checkpointId) { checkCheckpointId(checkpointId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java index 9d336cbaf15..181aca1ed62 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.disk.NoOpFileChannelManager; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -236,6 +237,7 @@ public class InputChannelTestUtils { NoOpFileChannelManager.INSTANCE, true, bufferSize); + parent.setChannelStateWriter(ChannelStateWriter.NO_OP); ResultSubpartition subpartition = parent.getAllPartitions()[0]; for (BufferConsumer buffer : buffers) { subpartition.add(buffer); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java index 4a6be76a3ba..25ee8161ba4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -61,6 +62,12 @@ public class MockResultPartitionWriter implements ResultPartitionWriter { @Override public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {} + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException {} + + @Override + public void abortCheckpoint(long checkpointId, CheckpointException cause) {} + @Override public void notifyEndOfData(StopMode mode) throws IOException {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 7e2b8a5b245..295b49d75d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -20,8 +20,13 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.execution.CancelTaskException; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; @@ -29,7 +34,9 @@ import org.apache.flink.runtime.io.network.util.TestConsumerCallback; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.testutils.executor.TestExecutorResource; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.CheckedSupplier; @@ -38,7 +45,10 @@ import org.junit.Assume; import org.junit.ClassRule; import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -347,6 +357,139 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertTrue(view.getFailureCause() instanceof CancelTaskException); } + @Test + public void testConsumeTimeoutableCheckpointBarrierQuickly() throws Exception { + PipelinedSubpartition subpartition = createSubpartition(); + subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); + assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, null, true, 0, false); + + // test without data buffer + testConsumeQuicklyWithNDataBuffers(0, subpartition, 5L); + + // test with data buffer + testConsumeQuicklyWithNDataBuffers(1, subpartition, 6L); + testConsumeQuicklyWithNDataBuffers(2, subpartition, 7L); + } + + private void testConsumeQuicklyWithNDataBuffers( + int numberOfDataBuffers, PipelinedSubpartition subpartition, long checkpointId) + throws Exception { + // add data buffers and barrier + for (int i = 0; i < numberOfDataBuffers; i++) { + subpartition.add(createFilledFinishedBufferConsumer(4096)); + } + subpartition.add(getTimeoutableBarrierBuffer(checkpointId)); + assertEquals(checkpointId, subpartition.getChannelStateCheckpointId()); + CompletableFuture<List<Buffer>> channelStateFuture = subpartition.getChannelStateFuture(); + assertSubpartitionChannelStateFuturesAndQueuedBuffers( + subpartition, channelStateFuture, false, numberOfDataBuffers + 1, false); + + // poll data buffers first + for (int i = 0; i < numberOfDataBuffers; i++) { + pollBufferAndCheckType(subpartition, Buffer.DataType.DATA_BUFFER); + } + pollBufferAndCheckType( + subpartition, Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER); + + assertSubpartitionChannelStateFuturesAndQueuedBuffers( + subpartition, channelStateFuture, true, 0, true); + assertTrue(channelStateFuture.get().isEmpty()); + subpartition.resumeConsumption(); + } + + @Test + public void testTimeoutAlignedToUnalignedBarrier() throws Exception { + PipelinedSubpartition subpartition = createSubpartition(); + subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); + assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, null, true, 0, false); + + // test without data buffer + testTimeoutWithNDataBuffers(0, subpartition, 7L); + + // test with data buffer + testTimeoutWithNDataBuffers(1, subpartition, 8L); + } + + private void testTimeoutWithNDataBuffers( + int numberOfDataBuffers, PipelinedSubpartition subpartition, long checkpointId) + throws Exception { + // put data buffers and barrier + List<Buffer> expectedBuffers = new ArrayList<>(); + for (int i = 0; i < numberOfDataBuffers; i++) { + BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(4096); + subpartition.add(bufferConsumer); + expectedBuffers.add(bufferConsumer.copy().build()); + } + subpartition.add(getTimeoutableBarrierBuffer(checkpointId)); + + assertEquals(checkpointId, subpartition.getChannelStateCheckpointId()); + CompletableFuture<List<Buffer>> channelStateFuture = subpartition.getChannelStateFuture(); + assertSubpartitionChannelStateFuturesAndQueuedBuffers( + subpartition, channelStateFuture, false, numberOfDataBuffers + 1, false); + + subpartition.alignedBarrierTimeout(checkpointId); + assertSubpartitionChannelStateFuturesAndQueuedBuffers( + subpartition, channelStateFuture, true, numberOfDataBuffers + 1, true); + + pollBufferAndCheckType(subpartition, Buffer.DataType.PRIORITIZED_EVENT_BUFFER); + for (int i = 0; i < numberOfDataBuffers; i++) { + pollBufferAndCheckType(subpartition, Buffer.DataType.DATA_BUFFER); + } + + assertEquals(expectedBuffers, channelStateFuture.get()); + } + + private void pollBufferAndCheckType( + PipelinedSubpartition subpartition, Buffer.DataType dataType) { + ResultSubpartition.BufferAndBacklog barrierBuffer = subpartition.pollBuffer(); + assertNotNull(barrierBuffer); + assertEquals(dataType, barrierBuffer.buffer().getDataType()); + } + + @Test + public void testConcurrentTimeoutableCheckpointBarrier() throws Exception { + PipelinedSubpartition subpartition = createSubpartition(); + subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); + + subpartition.add(getTimeoutableBarrierBuffer(10L)); + assertEquals(10L, subpartition.getChannelStateCheckpointId()); + CompletableFuture<List<Buffer>> checkpointFuture10 = subpartition.getChannelStateFuture(); + assertNotNull(checkpointFuture10); + + try { + // It should fail due to currently does not support concurrent unaligned checkpoints. + subpartition.add(getTimeoutableBarrierBuffer(11L)); + checkpointFuture10.get(); + fail("Should fail with an IllegalStateException."); + } catch (Throwable e) { + ExceptionUtils.assertThrowable(e, IllegalStateException.class); + } + } + + private BufferConsumer getTimeoutableBarrierBuffer(long checkpointId) throws IOException { + CheckpointOptions checkpointOptions = + CheckpointOptions.alignedWithTimeout( + CheckpointType.CHECKPOINT, + CheckpointStorageLocationReference.getDefault(), + 1000); + return EventSerializer.toBufferConsumer( + new CheckpointBarrier(checkpointId, System.currentTimeMillis(), checkpointOptions), + false); + } + + private void assertSubpartitionChannelStateFuturesAndQueuedBuffers( + PipelinedSubpartition subpartition, + CompletableFuture<List<Buffer>> channelStateFuture, + boolean channelStateFutureIsNull, + long numberOfQueuedBuffers, + boolean expectedFutureIsDone) { + assertEquals(channelStateFutureIsNull, subpartition.getChannelStateFuture() == null); + assertEquals(numberOfQueuedBuffers, subpartition.getNumberOfQueuedBuffers()); + if (channelStateFuture != null) { + assertEquals(expectedFutureIsDone, channelStateFuture.isDone()); + } + } + private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception { // Add a bufferConsumer diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java index f9cc4675f39..7306ad3a749 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java @@ -53,12 +53,15 @@ import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN; @@ -86,6 +89,7 @@ public class ChannelPersistenceITCase { public void testReadWritten() throws Exception { byte[] inputChannelInfoData = randomBytes(1024); byte[] resultSubpartitionInfoData = randomBytes(1024); + byte[] resultSubpartitionInfoFutureData = randomBytes(1024); int partitionIndex = 0; SequentialChannelStateReader reader = @@ -97,9 +101,12 @@ public class ChannelPersistenceITCase { new InputChannelInfo(0, 0), inputChannelInfoData), singletonMap( new ResultSubpartitionInfo(partitionIndex, 0), - resultSubpartitionInfoData)))); + resultSubpartitionInfoData), + singletonMap( + new ResultSubpartitionInfo(partitionIndex, 1), + resultSubpartitionInfoFutureData)))); - NetworkBufferPool networkBufferPool = new NetworkBufferPool(4, 1024); + NetworkBufferPool networkBufferPool = new NetworkBufferPool(6, 1024); try { int numChannels = 1; InputGate gate = buildGate(networkBufferPool, numChannels); @@ -107,12 +114,13 @@ public class ChannelPersistenceITCase { assertArrayEquals( inputChannelInfoData, collectBytes(gate::pollNext, BufferOrEvent::getBuffer)); + int subpartitions = 2; BufferWritingResultPartition resultPartition = buildResultPartition( networkBufferPool, ResultPartitionType.PIPELINED, partitionIndex, - numChannels); + subpartitions); reader.readOutputData(new BufferWritingResultPartition[] {resultPartition}, false); ResultSubpartitionView view = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener()); @@ -121,6 +129,13 @@ public class ChannelPersistenceITCase { collectBytes( () -> Optional.ofNullable(view.getNextBuffer()), BufferAndBacklog::buffer)); + ResultSubpartitionView futureView = + resultPartition.createSubpartitionView(1, new NoOpBufferAvailablityListener()); + assertArrayEquals( + resultSubpartitionInfoFutureData, + collectBytes( + () -> Optional.ofNullable(futureView.getNextBuffer()), + BufferAndBacklog::buffer)); } finally { networkBufferPool.destroy(); } @@ -224,11 +239,14 @@ public class ChannelPersistenceITCase { private ChannelStateWriteResult write( long checkpointId, Map<InputChannelInfo, byte[]> icMap, - Map<ResultSubpartitionInfo, byte[]> rsMap) + Map<ResultSubpartitionInfo, byte[]> rsMap, + Map<ResultSubpartitionInfo, byte[]> rsFutureMap) throws Exception { - int maxStateSize = sizeOfBytes(icMap) + sizeOfBytes(rsMap) + Long.BYTES * 2; + int maxStateSize = + sizeOfBytes(icMap) + sizeOfBytes(rsMap) + sizeOfBytes(rsFutureMap) + Long.BYTES * 3; Map<InputChannelInfo, Buffer> icBuffers = wrapWithBuffers(icMap); Map<ResultSubpartitionInfo, Buffer> rsBuffers = wrapWithBuffers(rsMap); + Map<ResultSubpartitionInfo, Buffer> rsFutureBuffers = wrapWithBuffers(rsFutureMap); try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl("test", 0, getStreamFactoryFactory(maxStateSize))) { writer.open(); @@ -244,6 +262,12 @@ public class ChannelPersistenceITCase { ofElements(Buffer::recycleBuffer, e.getValue())); } writer.finishInput(checkpointId); + for (Map.Entry<ResultSubpartitionInfo, Buffer> e : rsFutureBuffers.entrySet()) { + CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>(); + writer.addOutputDataFuture( + checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, dataFuture); + dataFuture.complete(singletonList(e.getValue())); + } for (Map.Entry<ResultSubpartitionInfo, Buffer> e : rsBuffers.entrySet()) { writer.addOutputData( checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, e.getValue()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 8a63074815f..93acf23d30b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; @@ -158,6 +159,14 @@ public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<Str recordWriter.broadcastEvent(event, isPriorityEvent); } + public void alignedBarrierTimeout(long checkpointId) throws IOException { + recordWriter.alignedBarrierTimeout(checkpointId); + } + + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + recordWriter.abortCheckpoint(checkpointId, cause); + } + public void flush() throws IOException { recordWriter.flushAll(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java index f239eac23da..97e148f3c14 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java @@ -63,6 +63,7 @@ abstract class AbstractAlternatingAlignedBarrierHandlerState implements BarrierH } if (controller.allBarriersReceived()) { + controller.initInputsCheckpoint(checkpointBarrier); controller.triggerGlobalCheckpoint(checkpointBarrier); return finishCheckpoint(); } else if (controller.isTimedOut(checkpointBarrier)) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java index da8a486fb3b..eacd9f58afb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java @@ -67,6 +67,7 @@ final class AlternatingCollectingBarriers extends AbstractAlternatingAlignedBarr + "collecting aligned barrier state"); if (controller.allBarriersReceived()) { + controller.initInputsCheckpoint(pendingCheckpointBarrier); controller.triggerGlobalCheckpoint(pendingCheckpointBarrier); return finishCheckpoint(); } else if (controller.isTimedOut(pendingCheckpointBarrier)) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 75325c13810..d5f94d42412 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateObjectCollection; @@ -335,6 +336,18 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> } } + public void alignedBarrierTimeout(long checkpointId) throws IOException { + for (RecordWriterOutput<?> streamOutput : streamOutputs) { + streamOutput.alignedBarrierTimeout(checkpointId); + } + } + + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + for (RecordWriterOutput<?> streamOutput : streamOutputs) { + streamOutput.abortCheckpoint(checkpointId, cause); + } + } + /** * Execute {@link StreamOperator#close()} of each operator in the chain of this {@link * StreamTask}. Closing happens from <b>tail to head</b> operator in the chain. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 312f4f38eb3..f431fe90399 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -87,6 +87,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl; import org.apache.flink.streaming.runtime.io.DataInputStatus; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.StreamInputProcessor; +import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil; import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -440,6 +441,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> environment.setCheckpointStorageAccess(checkpointStorageAccess); + // if the clock is not already set, then assign a default TimeServiceProvider + if (timerService == null) { + this.timerService = createTimerService("Time Trigger for " + getName()); + } else { + this.timerService = timerService; + } + + this.systemTimerService = createTimerService("System Time Trigger for " + getName()); + this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl( checkpointStorageAccess, @@ -455,16 +465,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> .get( ExecutionCheckpointingOptions .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH), - this::prepareInputSnapshot); + this::prepareInputSnapshot, + BarrierAlignmentUtil.createRegisterTimerCallback( + mainMailboxExecutor, systemTimerService)); - // if the clock is not already set, then assign a default TimeServiceProvider - if (timerService == null) { - this.timerService = createTimerService("Time Trigger for " + getName()); - } else { - this.timerService = timerService; - } - - this.systemTimerService = createTimerService("System Time Trigger for " + getName()); // Register to stop all timers and threads. Should be closed first. resourceCloser.registerCloseable(this::tryShutdownTimerService); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 7c96d49207f..76f413c4b49 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -38,9 +38,14 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; +import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil; +import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.Cancellable; +import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.DelayableTimer; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.function.BiFunctionWithException; import org.slf4j.Logger; @@ -49,6 +54,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -103,6 +109,19 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { @GuardedBy("lock") private boolean closed; + private final DelayableTimer registerTimer; + + private final Clock clock; + + /** It always be called in Task Thread. */ + private Cancellable alignmentTimer; + + /** + * It is the checkpointId corresponding to alignmentTimer. And It should be always updated with + * {@link #alignmentTimer}. + */ + private long alignmentCheckpointId; + SubtaskCheckpointCoordinatorImpl( CheckpointStorageWorkerView checkpointStorage, String taskName, @@ -115,7 +134,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { boolean enableCheckpointAfterTasksFinished, BiFunctionWithException< ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> - prepareInputSnapshot) + prepareInputSnapshot, + DelayableTimer registerTimer) throws IOException { this( checkpointStorage, @@ -128,7 +148,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { unalignedCheckpointEnabled, enableCheckpointAfterTasksFinished, prepareInputSnapshot, - DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS); + DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS, + registerTimer); } SubtaskCheckpointCoordinatorImpl( @@ -144,7 +165,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { BiFunctionWithException< ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot, - int maxRecordAbortedCheckpoints) + int maxRecordAbortedCheckpoints, + DelayableTimer registerTimer) throws IOException { this( checkpointStorage, @@ -159,7 +181,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { unalignedCheckpointEnabled ? openChannelStateWriter(taskName, checkpointStorage, env) : ChannelStateWriter.NO_OP, - enableCheckpointAfterTasksFinished); + enableCheckpointAfterTasksFinished, + registerTimer); } @VisibleForTesting @@ -176,7 +199,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { prepareInputSnapshot, int maxRecordAbortedCheckpoints, ChannelStateWriter channelStateWriter, - boolean enableCheckpointAfterTasksFinished) + boolean enableCheckpointAfterTasksFinished, + DelayableTimer registerTimer) throws IOException { this.checkpointStorage = new CachingCheckpointStorageWorkerView(checkNotNull(checkpointStorage)); @@ -195,6 +219,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { closeableRegistry.registerCloseable(this); this.closed = false; this.enableCheckpointAfterTasksFinished = enableCheckpointAfterTasksFinished; + this.registerTimer = registerTimer; + this.clock = SystemClock.getInstance(); } private static ChannelStateWriter openChannelStateWriter( @@ -229,9 +255,24 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { // notify the coordinator that we decline this checkpoint env.declineCheckpoint(checkpointId, cause); - // notify all downstream operators that they should not wait for a barrier from us actionExecutor.runThrowing( - () -> operatorChain.broadcastEvent(new CancelCheckpointMarker(checkpointId))); + () -> { + if (checkpointId == alignmentCheckpointId) { + cancelAlignmentTimer(); + } + // notify all downstream operators that they should not wait for a barrier from + // us and abort checkpoint. + operatorChain.abortCheckpoint(checkpointId, cause); + operatorChain.broadcastEvent(new CancelCheckpointMarker(checkpointId)); + }); + } + + private void cancelAlignmentTimer() { + if (alignmentTimer == null) { + return; + } + alignmentTimer.cancel(); + alignmentTimer = null; } @Override @@ -300,17 +341,26 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId()); // Step (2): Send the checkpoint barrier downstream - operatorChain.broadcastEvent( - new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options), - options.isUnalignedCheckpoint()); - - // Step (3): Prepare to spill the in-flight buffers for input and output - if (options.isUnalignedCheckpoint()) { + LOG.debug( + "Task {} broadcastEvent at {}, triggerTime {}, passed time {}", + taskName, + System.currentTimeMillis(), + metadata.getTimestamp(), + System.currentTimeMillis() - metadata.getTimestamp()); + CheckpointBarrier checkpointBarrier = + new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options); + operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint()); + + // Step (3): Register alignment timer to timeout aligned barrier to unaligned barrier + registerAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier); + + // Step (4): Prepare to spill the in-flight buffers for input and output + if (options.needsChannelState()) { // output data already written while broadcasting event channelStateWriter.finishOutput(metadata.getCheckpointId()); } - // Step (4): Take the state snapshot. This should be largely asynchronous, to not impact + // Step (5): Take the state snapshot. This should be largely asynchronous, to not impact // progress of the // streaming topology @@ -335,6 +385,33 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } } + private void registerAlignmentTimer( + long checkpointId, + OperatorChain<?, ?> operatorChain, + CheckpointBarrier checkpointBarrier) { + // The timer isn't triggered when the checkpoint completes quickly, so cancel timer here. + cancelAlignmentTimer(); + if (!checkpointBarrier.getCheckpointOptions().isTimeoutable()) { + return; + } + + long timerDelay = BarrierAlignmentUtil.getTimerDelay(clock, checkpointBarrier); + + alignmentTimer = + registerTimer.registerTask( + () -> { + try { + operatorChain.alignedBarrierTimeout(checkpointId); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + alignmentTimer = null; + return null; + }, + Duration.ofMillis(timerDelay)); + alignmentCheckpointId = checkpointId; + } + @Override public void notifyCheckpointComplete( long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) @@ -439,6 +516,10 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { channelStateWriter.start(id, checkpointOptions); prepareInflightDataSnapshot(id); + } else if (checkpointOptions.isTimeoutable()) { + // The output buffer may need to be snapshotted, so start the channelStateWriter here. + channelStateWriter.start(id, checkpointOptions); + channelStateWriter.finishInput(id); } } @@ -635,7 +716,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { long started = System.nanoTime(); ChannelStateWriteResult channelStateWriteResult = - checkpointOptions.isUnalignedCheckpoint() + checkpointOptions.needsChannelState() ? channelStateWriter.getAndRemoveWriteResult(checkpointId) : ChannelStateWriteResult.EMPTY; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java index 650edf5abe9..1855df80cbe 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java @@ -112,7 +112,8 @@ public class MockSubtaskCheckpointCoordinatorBuilder { unalignedCheckpointEnabled, enableCheckpointAfterTasksFinished, prepareInputSnapshot, - maxRecordAbortedCheckpoints); + maxRecordAbortedCheckpoints, + (callable, duration) -> () -> {}); } private static class NonHandleAsyncException implements AsyncExceptionHandler { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index ef37551a02f..135799b8e44 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -72,6 +72,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -79,11 +80,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -511,6 +514,61 @@ public class SubtaskCheckpointCoordinatorTest { } } + @Test + public void testTimeoutableAlignedBarrierNotPriorityAndChannelStateResult() throws Exception { + long checkpointId = 66; + MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + + try (SubtaskCheckpointCoordinator coordinator = + new MockSubtaskCheckpointCoordinatorBuilder() + .setUnalignedCheckpointEnabled(true) + .setEnvironment(mockEnvironment) + .build()) { + AtomicReference<Boolean> broadcastedPriorityEvent = new AtomicReference<>(null); + AtomicReference<ChannelStateWriter.ChannelStateWriteResult> channelStateResult = + new AtomicReference<>(null); + final OperatorChain<?, ?> operatorChain = + new RegularOperatorChain( + new NoOpStreamTask<>(new DummyEnvironment()), new NonRecordWriter<>()) { + @Override + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) + throws IOException { + super.broadcastEvent(event, isPriorityEvent); + broadcastedPriorityEvent.set(isPriorityEvent); + } + + @Override + public void snapshotState( + Map operatorSnapshotsInProgress, + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + Supplier isRunning, + ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, + CheckpointStreamFactory storage) { + channelStateResult.set(channelStateWriteResult); + } + }; + + CheckpointOptions checkpointOptions = + new CheckpointOptions( + CHECKPOINT, + CheckpointStorageLocationReference.getDefault(), + CheckpointOptions.AlignmentType.ALIGNED, + 200); + coordinator.initInputsCheckpoint(checkpointId, checkpointOptions); + coordinator.checkpointState( + new CheckpointMetaData(checkpointId, System.currentTimeMillis()), + checkpointOptions, + new CheckpointMetricsBuilder(), + operatorChain, + false, + () -> true); + + assertEquals(false, broadcastedPriorityEvent.get()); + assertNotNull(channelStateResult.get()); + } + } + private OperatorChain<?, ?> getOperatorChain(MockEnvironment mockEnvironment) throws Exception { return new RegularOperatorChain<>( new MockStreamTaskBuilder(mockEnvironment).build(), new NonRecordWriter<>()); @@ -686,6 +744,7 @@ public class SubtaskCheckpointCoordinatorTest { (unused1, unused2) -> CompletableFuture.completedFuture(null), 0, channelStateWriter, - true); + true, + (callable, duration) -> () -> {}); } }
