pnowojski commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476480862
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -133,14 +136,14 @@ public boolean isAvailable() {
* @param bufferAndBacklog
* current buffer and backlog including information about
the next buffer
*/
- private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+ @Nullable
Review comment:
nit: add a javadoc explaining the returned value?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer
bufferConsumer, boolean insertAs
checkState(inflightBufferSnapshot.isEmpty(),
"Supporting only one concurrent checkpoint in unaligned " +
"checkpoints");
- // Meanwhile prepare the collection of in-flight
buffers which would be fetched in the next step later.
- for (BufferConsumer buffer : buffers) {
- try (BufferConsumer bc = buffer.copy()) {
- if (bc.isBuffer()) {
-
inflightBufferSnapshot.add(bc.build());
+ final int pos = buffers.getNumPriorityElements();
+ buffers.addPriorityElement(bufferConsumer);
+
+ boolean unalignedCheckpoint =
isUnalignedCheckpoint(bufferConsumer);
+ if (unalignedCheckpoint) {
+ final Iterator<BufferConsumer> iterator =
buffers.iterator();
+ Iterators.advance(iterator, pos + 1);
+ while (iterator.hasNext()) {
+ BufferConsumer buffer = iterator.next();
+
+ if (buffer.isBuffer()) {
+ try (BufferConsumer bc =
buffer.copy()) {
+
inflightBufferSnapshot.add(bc.build());
+ }
}
}
}
+ return;
+ }
Review comment:
Why do we need this change? In what scenarios are you expecting more
than one priority event in the output buffer? (if there is a reason that I'm
forgetting about, please add it to the commit message)
edit: (after reading commit message a couple of times) Or you are just
re-using here a class, that you are mostly intending to use later in the future
(on the inputs?)? If so maybe it needs some more explanation in the commit
message?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex)
throws IOException {
}
@Override
- public void spillInflightBuffers(long checkpointId, ChannelStateWriter
channelStateWriter) throws IOException {
+ public void spillInflightBuffers(long checkpointId, ChannelStateWriter
channelStateWriter) {
synchronized (receivedBuffers) {
- checkState(checkpointId > lastRequestedCheckpointId,
"Need to request the next checkpointId");
-
- final List<Buffer> inflightBuffers = new
ArrayList<>(receivedBuffers.size());
- for (Buffer buffer : receivedBuffers) {
- CheckpointBarrier checkpointBarrier =
parseCheckpointBarrierOrNull(buffer);
- if (checkpointBarrier != null &&
checkpointBarrier.getId() >= checkpointId) {
- break;
+ final Integer numRecords =
numRecordsOvertaken.remove(checkpointId);
Review comment:
shouldn't we remove also obsolete values from this map? (to prevent a
potential memory leak?)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer
bufferConsumer, boolean insertAs
checkState(inflightBufferSnapshot.isEmpty(),
"Supporting only one concurrent checkpoint in unaligned " +
"checkpoints");
- // Meanwhile prepare the collection of in-flight
buffers which would be fetched in the next step later.
- for (BufferConsumer buffer : buffers) {
- try (BufferConsumer bc = buffer.copy()) {
- if (bc.isBuffer()) {
-
inflightBufferSnapshot.add(bc.build());
+ final int pos = buffers.getNumPriorityElements();
+ buffers.addPriorityElement(bufferConsumer);
+
+ boolean unalignedCheckpoint =
isUnalignedCheckpoint(bufferConsumer);
+ if (unalignedCheckpoint) {
+ final Iterator<BufferConsumer> iterator =
buffers.iterator();
+ Iterators.advance(iterator, pos + 1);
+ while (iterator.hasNext()) {
+ BufferConsumer buffer = iterator.next();
+
+ if (buffer.isBuffer()) {
+ try (BufferConsumer bc =
buffer.copy()) {
+
inflightBufferSnapshot.add(bc.build());
+ }
}
}
}
+ return;
+ }
+ buffers.add(bufferConsumer);
Review comment:
nit: it was a bit confusing for me for a moment which code paths are
doing what. IMO it would be easier to follow it, if the shorter branch would be
first, and if the more complicated part would have one lever of nesting less:
```
if (!insertAsHead) {
buffers.add(bufferConsumer);
return;
}
//rest of the code
```
in that case it's more obvious that `!insertAsHead` is a trivial case and
that it doesn't interact with the other branch at all.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex)
throws IOException {
}
@Override
- public void spillInflightBuffers(long checkpointId, ChannelStateWriter
channelStateWriter) throws IOException {
+ public void spillInflightBuffers(long checkpointId, ChannelStateWriter
channelStateWriter) {
synchronized (receivedBuffers) {
- checkState(checkpointId > lastRequestedCheckpointId,
"Need to request the next checkpointId");
-
- final List<Buffer> inflightBuffers = new
ArrayList<>(receivedBuffers.size());
- for (Buffer buffer : receivedBuffers) {
- CheckpointBarrier checkpointBarrier =
parseCheckpointBarrierOrNull(buffer);
- if (checkpointBarrier != null &&
checkpointBarrier.getId() >= checkpointId) {
- break;
+ final Integer numRecords =
numRecordsOvertaken.remove(checkpointId);
Review comment:
Do I understand it correctly? Currently there is a fragile contract,
that `numRecordsOvertaken` value wouldn't change between `onBuffer(...)` where
we are setting it and this `spillInflightBuffers(...)` call? In other words, it
assumes that between enqueueing of the priority event and the
`spillInflightBuffers(...)`, task thread is not allowed to process any buffers?
Maybe it would be better to embed the `numRecordsOvertaken` value in the
priority event that would be processed by the task thread?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -74,34 +106,34 @@ public CheckpointedInputGate(
}
@Override
- public Optional<BufferOrEvent> pollNext() throws Exception {
- while (true) {
Review comment:
Isn't it changing the semantic slightly? Am I right, that the only case
on the master branch which actually causes another iteration of this loop is
```
barrierHandler.processCancellationBarrier((CancelCheckpointMarker)
bufferOrEvent.getEvent());
```
and all of the other cases were exiting the loop? Are now all of the cases
exiting always?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex)
throws IOException {
}
@Override
- public void spillInflightBuffers(long checkpointId, ChannelStateWriter
channelStateWriter) throws IOException {
+ public void spillInflightBuffers(long checkpointId, ChannelStateWriter
channelStateWriter) {
synchronized (receivedBuffers) {
- checkState(checkpointId > lastRequestedCheckpointId,
"Need to request the next checkpointId");
-
- final List<Buffer> inflightBuffers = new
ArrayList<>(receivedBuffers.size());
- for (Buffer buffer : receivedBuffers) {
- CheckpointBarrier checkpointBarrier =
parseCheckpointBarrierOrNull(buffer);
- if (checkpointBarrier != null &&
checkpointBarrier.getId() >= checkpointId) {
- break;
+ final Integer numRecords =
numRecordsOvertaken.remove(checkpointId);
Review comment:
`numRecordsOvertaken` -> `numBuffersOvertaken`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -770,34 +808,50 @@ void triggerPartitionStateCheck(ResultPartitionID
partitionId) {
}));
}
- private void queueChannel(InputChannel channel) {
- int availableChannels;
+ private void queueChannel(InputChannel channel, boolean priority) {
Review comment:
Heh, there are quite a bit more of corner/edge cases now.
I wonder if there is maybe some other way to express the priority events,
that would simplify the input gates code?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##########
@@ -159,10 +158,12 @@ public InputStatus emitNext(DataOutput<T> output) throws
Exception {
if (bufferOrEvent.isPresent()) {
// return to the mailbox after receiving a
checkpoint barrier to avoid processing of
// data after the barrier before checkpoint is
performed for unaligned checkpoint mode
- if (bufferOrEvent.get().isEvent() &&
bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) {
+ if (bufferOrEvent.get().isBuffer()) {
+ processBuffer(bufferOrEvent.get());
+ } else {
+ processEvent(bufferOrEvent.get());
Review comment:
Is this spit into `processBuffer` and `processEvent` relevant?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -133,14 +136,14 @@ public boolean isAvailable() {
* @param bufferAndBacklog
* current buffer and backlog including information about
the next buffer
*/
- private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+ @Nullable
+ private Buffer.DataType getNextDataType(BufferAndBacklog
bufferAndBacklog) {
// BEWARE: this must be in sync with #isAvailable()!
- if (numCreditsAvailable > 0) {
- return bufferAndBacklog.isDataAvailable();
- }
- else {
- return bufferAndBacklog.isEventAvailable();
+ final Buffer.DataType nextDataType =
bufferAndBacklog.getNextDataType();
+ if (numCreditsAvailable > 0 || (nextDataType != null &&
nextDataType.isEvent())) {
+ return nextDataType;
}
+ return null;
Review comment:
hmmm, maybe add another enum type for this purpose? (I'm not sure, just
brain storming)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -621,61 +626,84 @@ public boolean isFinished() {
return Optional.of(transformToBufferOrEvent(
inputWithData.data.buffer(),
inputWithData.moreAvailable,
- inputWithData.input));
+ inputWithData.input,
+ inputWithData.morePriorityEvents));
}
private Optional<InputWithData<InputChannel, BufferAndAvailability>>
waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
- Optional<InputChannel> inputChannel =
getChannel(blocking);
- if (!inputChannel.isPresent()) {
+ Optional<InputChannel> inputChannelOpt =
getChannel(blocking);
+ if (!inputChannelOpt.isPresent()) {
return Optional.empty();
}
// Do not query inputChannel under the lock, to avoid
potential deadlocks coming from
// notifications.
- Optional<BufferAndAvailability> result =
inputChannel.get().getNextBuffer();
+ final InputChannel inputChannel = inputChannelOpt.get();
+ Optional<BufferAndAvailability>
bufferAndAvailabilityOpt = inputChannel.getNextBuffer();
synchronized (inputChannelsWithData) {
- if (result.isPresent() &&
result.get().moreAvailable()) {
+ if (!bufferAndAvailabilityOpt.isPresent()) {
+ if (inputChannelsWithData.isEmpty()) {
+
availabilityHelper.resetUnavailable();
+ }
+ continue;
Review comment:
maybe if the `result` variable rename and adding `continue` branch had
happened in an independent "refactor" commit, It would have saved me a couple
of minutes while reading this code while I was trying to understand the change
:(
maybe, as I can see how the changes are a bit interconnected.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -63,9 +66,38 @@
*/
public CheckpointedInputGate(
InputGate inputGate,
- CheckpointBarrierHandler barrierHandler) {
+ CheckpointBarrierHandler barrierHandler,
+ MailboxExecutor mailboxExecutor) {
this.inputGate = inputGate;
this.barrierHandler = barrierHandler;
+ this.mailboxExecutor = mailboxExecutor;
+
+ waitForPriorityEvents(inputGate, mailboxExecutor);
+ }
+
+ /**
+ * Eagerly pulls and processes all priority events. Must be called from
task thread.
+ *
+ * <p>Basic assumption is that no priority event needs to be handled by
the {@link StreamTaskNetworkInput}.
+ */
+ private void processPriorityEvents() throws IOException,
InterruptedException {
+ // check if the priority event is still not processed (could
have been pulled before mail was being executed)
+ final boolean hasPriorityEvents =
inputGate.getPriorityEventAvailableFuture().isDone();
+ if (hasPriorityEvents) {
+ // process as many priority events as possible
+ while
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+ }
+ }
+
+ // re-enqueue mail to process priority events
+ waitForPriorityEvents(inputGate, mailboxExecutor);
+ }
+
+ private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor
mailboxExecutor) {
+ final CompletableFuture<?> priorityEventAvailableFuture =
inputGate.getPriorityEventAvailableFuture();
+ priorityEventAvailableFuture.thenRun(() -> {
+ mailboxExecutor.execute(this::processPriorityEvents,
"process priority even @ gate %s", inputGate);
+ });
Review comment:
1. Again, do I understand this correctly? Is this assuming that nobody
polls anything between completing `getPriorityEventAvailableFuture` and
executing `this::processPriorityEvents`? Isn't that a bit fragile?
2. What was the motivation for no passing the priority events to the
`StreamTaskNetworkInput`?
3. What about processing priority events as part of `pollNext()`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -74,34 +106,34 @@ public CheckpointedInputGate(
}
@Override
- public Optional<BufferOrEvent> pollNext() throws Exception {
- while (true) {
- Optional<BufferOrEvent> next = inputGate.pollNext();
+ public Optional<BufferOrEvent> pollNext() throws IOException,
InterruptedException {
+ Optional<BufferOrEvent> next = inputGate.pollNext();
- if (!next.isPresent()) {
- return handleEmptyBuffer();
- }
+ if (!next.isPresent()) {
+ return handleEmptyBuffer();
+ }
- BufferOrEvent bufferOrEvent = next.get();
-
checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo()));
+ BufferOrEvent bufferOrEvent = next.get();
+
checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo()));
- if (bufferOrEvent.isBuffer()) {
- return next;
- }
- else if (bufferOrEvent.getEvent().getClass() ==
CheckpointBarrier.class) {
- CheckpointBarrier checkpointBarrier =
(CheckpointBarrier) bufferOrEvent.getEvent();
-
barrierHandler.processBarrier(checkpointBarrier,
bufferOrEvent.getChannelInfo());
- return next;
- }
- else if (bufferOrEvent.getEvent().getClass() ==
CancelCheckpointMarker.class) {
-
barrierHandler.processCancellationBarrier((CancelCheckpointMarker)
bufferOrEvent.getEvent());
- }
- else {
- if (bufferOrEvent.getEvent().getClass() ==
EndOfPartitionEvent.class) {
- barrierHandler.processEndOfPartition();
- }
- return next;
- }
+ if (bufferOrEvent.isEvent()) {
+ handleEvent(bufferOrEvent);
+ } else {
+ barrierHandler.processBuffer(bufferOrEvent.getBuffer(),
bufferOrEvent.getChannelInfo());
Review comment:
for now in this commit, this is just a NO-OP call?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -63,9 +66,38 @@
*/
public CheckpointedInputGate(
InputGate inputGate,
- CheckpointBarrierHandler barrierHandler) {
+ CheckpointBarrierHandler barrierHandler,
+ MailboxExecutor mailboxExecutor) {
this.inputGate = inputGate;
this.barrierHandler = barrierHandler;
+ this.mailboxExecutor = mailboxExecutor;
+
+ waitForPriorityEvents(inputGate, mailboxExecutor);
+ }
+
+ /**
+ * Eagerly pulls and processes all priority events. Must be called from
task thread.
+ *
+ * <p>Basic assumption is that no priority event needs to be handled by
the {@link StreamTaskNetworkInput}.
+ */
+ private void processPriorityEvents() throws IOException,
InterruptedException {
+ // check if the priority event is still not processed (could
have been pulled before mail was being executed)
+ final boolean hasPriorityEvents =
inputGate.getPriorityEventAvailableFuture().isDone();
+ if (hasPriorityEvents) {
+ // process as many priority events as possible
+ while
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+ }
Review comment:
As I understand it, it assumes that this `pollNext()` can not return
anything else besides a priority event?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -92,306 +90,173 @@
super(toNotifyOnCheckpoint);
this.taskName = taskName;
- hasInflightBuffers = Arrays.stream(inputGates)
+ this.inputGates = inputGates;
+ storeNewBuffers = Arrays.stream(inputGates)
.flatMap(gate -> gate.getChannelInfos().stream())
.collect(Collectors.toMap(Function.identity(), info ->
false));
- threadSafeUnaligner = new
ThreadSafeUnaligner(checkNotNull(checkpointCoordinator), this, inputGates);
+ numOpenChannels = storeNewBuffers.size();
+ this.checkpointCoordinator = checkpointCoordinator;
}
- /**
- * We still need to trigger checkpoint via {@link
ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
- * while reading the first barrier from one channel, because this might
happen
- * earlier than the previous async trigger via mailbox by netty thread.
- *
- * <p>Note this is also suitable for the trigger case of local input
channel.
- */
@Override
- public void processBarrier(CheckpointBarrier receivedBarrier,
InputChannelInfo channelInfo) throws IOException {
- long barrierId = receivedBarrier.getId();
- if (currentConsumedCheckpointId > barrierId ||
(currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
+ public void processBarrier(CheckpointBarrier barrier, InputChannelInfo
channelInfo) throws IOException {
+ long barrierId = barrier.getId();
+ if (currentCheckpointId > barrierId || (currentCheckpointId ==
barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
return;
}
- if (currentConsumedCheckpointId < barrierId) {
- currentConsumedCheckpointId = barrierId;
- numBarrierConsumed = 0;
- hasInflightBuffers.entrySet().forEach(hasInflightBuffer
-> hasInflightBuffer.setValue(true));
+ if (currentCheckpointId < barrierId) {
+ handleNewCheckpoint(barrier);
+ notifyCheckpoint(barrier, 0);
}
- if (currentConsumedCheckpointId == barrierId) {
- hasInflightBuffers.put(channelInfo, false);
- numBarrierConsumed++;
+ if (currentCheckpointId == barrierId) {
+ if (storeNewBuffers.put(channelInfo, false)) {
+ LOG.debug("{}: Received barrier from channel {}
@ {}.", taskName, channelInfo, barrierId);
+
+
inputGates[channelInfo.getGateIdx()].getChannel(channelInfo.getInputChannelIdx())
+ .spillInflightBuffers(barrierId,
checkpointCoordinator.getChannelStateWriter());
+
+ if (++numBarriersReceived == numOpenChannels) {
+
allBarriersReceivedFuture.complete(null);
+ }
+ }
}
- threadSafeUnaligner.notifyBarrierReceived(receivedBarrier,
channelInfo);
}
@Override
public void abortPendingCheckpoint(long checkpointId,
CheckpointException exception) throws IOException {
- threadSafeUnaligner.tryAbortPendingCheckpoint(checkpointId,
exception);
+ tryAbortPendingCheckpoint(checkpointId, exception);
- if (checkpointId > currentConsumedCheckpointId) {
- resetPendingCheckpoint(checkpointId);
+ if (checkpointId > currentCheckpointId) {
+ resetPendingCheckpoint();
}
}
@Override
public void processCancellationBarrier(CancelCheckpointMarker
cancelBarrier) throws IOException {
final long cancelledId = cancelBarrier.getCheckpointId();
- boolean shouldAbort =
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);
+ boolean shouldAbort = setCancelledCheckpointId(cancelledId);
if (shouldAbort) {
notifyAbort(
cancelledId,
new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
}
- if (cancelledId >= currentConsumedCheckpointId) {
- resetPendingCheckpoint(cancelledId);
- currentConsumedCheckpointId = cancelledId;
+ if (cancelledId >= currentCheckpointId) {
+ resetPendingCheckpoint();
+ currentCheckpointId = cancelledId;
}
}
@Override
public void processEndOfPartition() throws IOException {
- threadSafeUnaligner.onChannelClosed();
- resetPendingCheckpoint(-1L);
+ numOpenChannels--;
+
+ resetPendingCheckpoint();
+ notifyAbort(
+ currentCheckpointId,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
}
- private void resetPendingCheckpoint(long checkpointId) {
- if (isCheckpointPending()) {
- LOG.warn("{}: Received barrier or EndOfPartition(-1) {}
before completing current checkpoint {}. " +
- "Skipping current checkpoint.",
- taskName,
- checkpointId,
- currentConsumedCheckpointId);
+ private void resetPendingCheckpoint() {
+ LOG.warn("{}: Received barrier or EndOfPartition(-1) before
completing current checkpoint {}. " +
+ "Skipping current checkpoint.",
+ taskName,
+ currentCheckpointId);
- hasInflightBuffers.entrySet().forEach(hasInflightBuffer
-> hasInflightBuffer.setValue(false));
- numBarrierConsumed = 0;
- }
+ storeNewBuffers.entrySet().forEach(storeNewBuffer ->
storeNewBuffer.setValue(false));
+ numBarriersReceived = 0;
}
@Override
public long getLatestCheckpointId() {
- return currentConsumedCheckpointId;
+ return currentCheckpointId;
}
@Override
public String toString() {
- return String.format("%s: last checkpoint: %d", taskName,
currentConsumedCheckpointId);
+ return String.format("%s: last checkpoint: %d", taskName,
currentCheckpointId);
}
@Override
public void close() throws IOException {
super.close();
- threadSafeUnaligner.close();
- }
-
- @Override
- public boolean hasInflightData(long checkpointId, InputChannelInfo
channelInfo) {
- if (checkpointId < currentConsumedCheckpointId) {
- return false;
- }
- if (checkpointId > currentConsumedCheckpointId) {
- return true;
- }
- return hasInflightBuffers.get(channelInfo);
- }
-
- @Override
- public CompletableFuture<Void> getAllBarriersReceivedFuture(long
checkpointId) {
- return
threadSafeUnaligner.getAllBarriersReceivedFuture(checkpointId);
- }
-
- @Override
- public Optional<BufferReceivedListener> getBufferReceivedListener() {
- return Optional.of(threadSafeUnaligner);
+ allBarriersReceivedFuture.cancel(false);
}
@Override
protected boolean isCheckpointPending() {
- return numBarrierConsumed > 0;
- }
-
- @VisibleForTesting
- int getNumOpenChannels() {
- return threadSafeUnaligner.getNumOpenChannels();
- }
-
- @VisibleForTesting
- ThreadSafeUnaligner getThreadSafeUnaligner() {
- return threadSafeUnaligner;
+ return numBarriersReceived > 0;
}
- private void notifyCheckpoint(CheckpointBarrier barrier) throws
IOException {
- // ignore the previous triggered checkpoint by netty thread if
it was already canceled or aborted before.
- if (barrier.getId() >=
threadSafeUnaligner.getCurrentCheckpointId()) {
- super.notifyCheckpoint(barrier, 0);
+ @Override
+ public void processBuffer(Buffer buffer, InputChannelInfo channelInfo) {
+ if (storeNewBuffers.get(channelInfo)) {
+
checkpointCoordinator.getChannelStateWriter().addInputData(
+ currentCheckpointId,
+ channelInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ ofElement(buffer.retainBuffer(),
Buffer::recycleBuffer));
Review comment:
Here, we are persisting in-flight buffers, only as they are being
processed? Doesn't it mean, that unaligned checkpoint will be completed only
after we process all of the buffers, making unaligned checkpoint just as quick
as aligned?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]