AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476683415
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -92,306 +90,173 @@
super(toNotifyOnCheckpoint);
this.taskName = taskName;
- hasInflightBuffers = Arrays.stream(inputGates)
+ this.inputGates = inputGates;
+ storeNewBuffers = Arrays.stream(inputGates)
.flatMap(gate -> gate.getChannelInfos().stream())
.collect(Collectors.toMap(Function.identity(), info ->
false));
- threadSafeUnaligner = new
ThreadSafeUnaligner(checkNotNull(checkpointCoordinator), this, inputGates);
+ numOpenChannels = storeNewBuffers.size();
+ this.checkpointCoordinator = checkpointCoordinator;
}
- /**
- * We still need to trigger checkpoint via {@link
ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
- * while reading the first barrier from one channel, because this might
happen
- * earlier than the previous async trigger via mailbox by netty thread.
- *
- * <p>Note this is also suitable for the trigger case of local input
channel.
- */
@Override
- public void processBarrier(CheckpointBarrier receivedBarrier,
InputChannelInfo channelInfo) throws IOException {
- long barrierId = receivedBarrier.getId();
- if (currentConsumedCheckpointId > barrierId ||
(currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
+ public void processBarrier(CheckpointBarrier barrier, InputChannelInfo
channelInfo) throws IOException {
+ long barrierId = barrier.getId();
+ if (currentCheckpointId > barrierId || (currentCheckpointId ==
barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
return;
}
- if (currentConsumedCheckpointId < barrierId) {
- currentConsumedCheckpointId = barrierId;
- numBarrierConsumed = 0;
- hasInflightBuffers.entrySet().forEach(hasInflightBuffer
-> hasInflightBuffer.setValue(true));
+ if (currentCheckpointId < barrierId) {
+ handleNewCheckpoint(barrier);
+ notifyCheckpoint(barrier, 0);
}
- if (currentConsumedCheckpointId == barrierId) {
- hasInflightBuffers.put(channelInfo, false);
- numBarrierConsumed++;
+ if (currentCheckpointId == barrierId) {
+ if (storeNewBuffers.put(channelInfo, false)) {
+ LOG.debug("{}: Received barrier from channel {}
@ {}.", taskName, channelInfo, barrierId);
+
+
inputGates[channelInfo.getGateIdx()].getChannel(channelInfo.getInputChannelIdx())
+ .spillInflightBuffers(barrierId,
checkpointCoordinator.getChannelStateWriter());
+
+ if (++numBarriersReceived == numOpenChannels) {
+
allBarriersReceivedFuture.complete(null);
+ }
+ }
}
- threadSafeUnaligner.notifyBarrierReceived(receivedBarrier,
channelInfo);
}
@Override
public void abortPendingCheckpoint(long checkpointId,
CheckpointException exception) throws IOException {
- threadSafeUnaligner.tryAbortPendingCheckpoint(checkpointId,
exception);
+ tryAbortPendingCheckpoint(checkpointId, exception);
- if (checkpointId > currentConsumedCheckpointId) {
- resetPendingCheckpoint(checkpointId);
+ if (checkpointId > currentCheckpointId) {
+ resetPendingCheckpoint();
}
}
@Override
public void processCancellationBarrier(CancelCheckpointMarker
cancelBarrier) throws IOException {
final long cancelledId = cancelBarrier.getCheckpointId();
- boolean shouldAbort =
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);
+ boolean shouldAbort = setCancelledCheckpointId(cancelledId);
if (shouldAbort) {
notifyAbort(
cancelledId,
new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
}
- if (cancelledId >= currentConsumedCheckpointId) {
- resetPendingCheckpoint(cancelledId);
- currentConsumedCheckpointId = cancelledId;
+ if (cancelledId >= currentCheckpointId) {
+ resetPendingCheckpoint();
+ currentCheckpointId = cancelledId;
}
}
@Override
public void processEndOfPartition() throws IOException {
- threadSafeUnaligner.onChannelClosed();
- resetPendingCheckpoint(-1L);
+ numOpenChannels--;
+
+ resetPendingCheckpoint();
+ notifyAbort(
+ currentCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
}
- private void resetPendingCheckpoint(long checkpointId) {
- if (isCheckpointPending()) {
- LOG.warn("{}: Received barrier or EndOfPartition(-1) {}
before completing current checkpoint {}. " +
- "Skipping current checkpoint.",
- taskName,
- checkpointId,
- currentConsumedCheckpointId);
+ private void resetPendingCheckpoint() {
+ LOG.warn("{}: Received barrier or EndOfPartition(-1) before
completing current checkpoint {}. " +
+ "Skipping current checkpoint.",
+ taskName,
+ currentCheckpointId);
- hasInflightBuffers.entrySet().forEach(hasInflightBuffer
-> hasInflightBuffer.setValue(false));
- numBarrierConsumed = 0;
- }
+ storeNewBuffers.entrySet().forEach(storeNewBuffer ->
storeNewBuffer.setValue(false));
+ numBarriersReceived = 0;
}
@Override
public long getLatestCheckpointId() {
- return currentConsumedCheckpointId;
+ return currentCheckpointId;
}
@Override
public String toString() {
- return String.format("%s: last checkpoint: %d", taskName,
currentConsumedCheckpointId);
+ return String.format("%s: last checkpoint: %d", taskName,
currentCheckpointId);
}
@Override
public void close() throws IOException {
super.close();
- threadSafeUnaligner.close();
- }
-
- @Override
- public boolean hasInflightData(long checkpointId, InputChannelInfo
channelInfo) {
- if (checkpointId < currentConsumedCheckpointId) {
- return false;
- }
- if (checkpointId > currentConsumedCheckpointId) {
- return true;
- }
- return hasInflightBuffers.get(channelInfo);
- }
-
- @Override
- public CompletableFuture<Void> getAllBarriersReceivedFuture(long
checkpointId) {
- return
threadSafeUnaligner.getAllBarriersReceivedFuture(checkpointId);
- }
-
- @Override
- public Optional<BufferReceivedListener> getBufferReceivedListener() {
- return Optional.of(threadSafeUnaligner);
+ allBarriersReceivedFuture.cancel(false);
}
@Override
protected boolean isCheckpointPending() {
- return numBarrierConsumed > 0;
- }
-
- @VisibleForTesting
- int getNumOpenChannels() {
- return threadSafeUnaligner.getNumOpenChannels();
- }
-
- @VisibleForTesting
- ThreadSafeUnaligner getThreadSafeUnaligner() {
- return threadSafeUnaligner;
+ return numBarriersReceived > 0;
}
- private void notifyCheckpoint(CheckpointBarrier barrier) throws
IOException {
- // ignore the previous triggered checkpoint by netty thread if
it was already canceled or aborted before.
- if (barrier.getId() >=
threadSafeUnaligner.getCurrentCheckpointId()) {
- super.notifyCheckpoint(barrier, 0);
+ @Override
+ public void processBuffer(Buffer buffer, InputChannelInfo channelInfo) {
+ if (storeNewBuffers.get(channelInfo)) {
+
checkpointCoordinator.getChannelStateWriter().addInputData(
+ currentCheckpointId,
+ channelInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ ofElement(buffer.retainBuffer(),
Buffer::recycleBuffer));
Review comment:
Let me sketch the most general data flow:
* We have couple of buffers in our input channel.
* Checkpoint is triggered by another channel. *
* All buffers that are now pulled by `CheckpointedInputGate` from the first
channel get persisted by above function.
* Then the barrier comes in. *
* It overtakes all buffers and is now at the head. *
* `CheckpointedInputGate` gets priority notification and polls the barrier.
* Upon dispatching, it calls `Unaligner`, which spills additionally all
overtaken buffers.
* Further buffers are not persisted.
All steps marked with * are performed in a different thread (other task
thread / netty).
For me this is equivalent to the current behavior, but maybe I missed
something.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]