pnowojski commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r540103645



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
##########
@@ -23,6 +23,7 @@
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
+import org.apache.flink.util.Preconditions;

Review comment:
       optional nit: I would prefer to static import `checkState` for shorter 
code (we are using it frequently enough, that I think 
`Preconditions.checkState` is too verbose).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
##########
@@ -88,16 +88,16 @@ protected void maybePersist(Buffer buffer) {
        }

Review comment:
       Commit message is a bit misleading/out dated (`pendingBarrier` -> 
`lastSeenBarrier`).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -229,11 +229,8 @@ public void run() {
 
                numBytesIn.inc(buffer.getSize());
                numBuffersIn.inc();
-               if (buffer.getDataType().hasPriority()) {
-                       channelStatePersister.checkForBarrier(buffer);
-               } else {
-                       channelStatePersister.maybePersist(buffer);
-               }
+               channelStatePersister.checkForBarrier(buffer);
+               channelStatePersister.maybePersist(buffer);

Review comment:
       Can you add a unit test that would show/explain the problem?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -569,14 +569,23 @@ public void convertToPriorityEvent(int sequenceNumber) 
throws IOException {
                                "Attempted to convertToPriorityEvent an event 
[%s] that has already been prioritized [%s]",
                                toPrioritize,
                                numPriorityElementsBeforeRemoval);
+                       // set the priority flag (checked on poll)
+                       // don't convert the barrier itself (barrier controller 
might not have been switched yet)
+                       AbstractEvent e = 
EventSerializer.fromBuffer(toPrioritize.buffer, 
this.getClass().getClassLoader());
+                       toPrioritize.buffer.setReaderIndex(0);
+                       toPrioritize = new 
SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber);
                        firstPriorityEvent = addPriorityBuffer(toPrioritize);   
// note that only position of the element is changed
                                                                                
                                                        // converting the event 
itself would require switching the controller sooner
                }
                if (firstPriorityEvent) {
-                       notifyPriorityEvent(sequenceNumber);
+                       notifyPriorityEventForce(); // use force here because 
the barrier SQN might be seen by gate during the announcement

Review comment:
       nit: I don't understand this comment, can you rephrase/elaborate a bit?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
##########
@@ -119,6 +119,6 @@ public boolean isCheckpoint() {
        }
 
        public CheckpointBarrier asUnaligned() {
-               return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut());
+               return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), 
getCheckpointOptions().toUnaligned());

Review comment:
       part of a previous fixup?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
##########
@@ -120,6 +122,12 @@ public void obsoleteBarrierReceived(
                resumeConsumption(channelInfo);
        }
 
+       protected void resetPendingCheckpoint(long cancelledId) {
+               for (final CheckpointableInput input : inputs) {
+                       input.checkpointStopped(cancelledId);
+               }
+       }
+

Review comment:
       ditto unit test

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -451,11 +451,17 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
                                else {
                                        receivedBuffers.add(sequenceBuffer);
-                                       
channelStatePersister.maybePersist(buffer);
                                        if (dataType.requiresAnnouncement()) {
                                                firstPriorityEvent = 
addPriorityBuffer(announce(sequenceBuffer));
                                        }
                                }
+                               
channelStatePersister.checkForBarrier(sequenceBuffer.buffer).ifPresent(id -> {
+                                       // checkpoint was not yet started by 
task thread,
+                                       // so remember the numbers of buffers 
to spill for the time when it will be started
+                                       lastBarrierSequenceNumber = 
sequenceBuffer.sequenceNumber;
+                                       lastBarrierId = id;
+                               });
+                               channelStatePersister.maybePersist(buffer);

Review comment:
       ditto about a unit test? (What is the scenario that is working 
differently and how is it is supposed to be working)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##########
@@ -193,6 +202,11 @@ private CheckpointBarrierBehaviourController 
chooseController(CheckpointBarrier
 
        private boolean canTimeout(CheckpointBarrier barrier) {
                return barrier.getCheckpointOptions().isTimeoutable() &&
-                       barrier.getCheckpointOptions().getAlignmentTimeout() < 
(System.currentTimeMillis() - barrier.getTimestamp());
+                       barrier.getId() <= lastSeenBarrier &&
+                       barrier.getCheckpointOptions().getAlignmentTimeout() * 
1_000_000 < (System.nanoTime() - firstBarrierArrivalTime);
+       }
+
+       private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+               return announcedBarrier.getCheckpointOptions().isTimeoutable() 
? System.nanoTime() : Long.MAX_VALUE;

Review comment:
       I'm not entirely convinced if this a better approach. 
   
   For me, using global checkpoint time (in other words recently added 
checkpoinStartDelay metric) seemed easier to understand for the user. If 
aligned checkpoint barrier is taking too much time to reach given tasks - we 
are timeouting it to unaligned checkpoint that overtakes in-flight data. This 
seemed easier to comprehend and easier to explain  compared to time outing 
alignment on some subtask?
   
   Secondly your proposed change will not work with single input tasks without 
active timeouts?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##########
@@ -114,6 +114,7 @@ public void barrierAnnouncement(
                        lastSeenBarrier = barrier.getId();
                        firstBarrierArrivalTime = getArrivalTime(barrier);
                }
+               activeController = chooseController(barrier);

Review comment:
       ? 
   
   Why is it not enough to do this in `preProcessFirstBarrierOrAnnouncement`?
   
   (as in other places: a unit test would be helpful)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##########
@@ -146,28 +146,7 @@ private void switchToUnaligned(
 
        @Override
        public Optional<CheckpointBarrier> 
postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
throws IOException, CheckpointException {
-               Optional<CheckpointBarrier> maybeTimeOut = asTimedOut(barrier);
-               if (maybeTimeOut.isPresent() && activeController == 
alignedController) {
-                       switchToUnaligned(channelInfo, maybeTimeOut.get());
-                       checkState(activeController == unalignedController);
-                       
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
-                       return maybeTimeOut;
-               }
-
-               barrier = maybeTimeOut.orElse(barrier);
-               if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
-                       checkState(activeController == unalignedController);
-                       
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
-                       return Optional.empty();
-               }
-               else {
-                       checkState(activeController == alignedController);
-                       Optional<CheckpointBarrier> triggerResult = 
activeController.postProcessLastBarrier(
-                               channelInfo,
-                               barrier);
-                       checkState(triggerResult.isPresent());
-                       return triggerResult;
-               }

Review comment:
       Why have you removed this code? What was the problem?
   
   Was it subsumed by `switchToUnaligned ` call happening in the last 
`barrierReceived` call?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to