rkhachatryan commented on a change in pull request #12617:
URL: https://github.com/apache/flink/pull/12617#discussion_r439069816



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -253,7 +249,7 @@ public void checkpointState(
                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastEvent(
                        new CheckpointBarrier(metadata.getCheckpointId(), 
metadata.getTimestamp(), options),
-                       unalignedCheckpointEnabled);
+                       options.isUnalignedCheckpoint());

Review comment:
       I like the idea of using barrier options instead of local task 
configuration.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -732,7 +732,7 @@ private void snapshotTaskState(
                        props.getCheckpointType(),
                        checkpointStorageLocation.getLocationReference(),
                        isExactlyOnceMode,
-                       unalignedCheckpointsEnabled);
+                       props.getCheckpointType() == CheckpointType.CHECKPOINT 
&& unalignedCheckpointsEnabled);

Review comment:
       :+1: 

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
##########
@@ -119,10 +122,40 @@ public void testNotifyCheckpointComplete() throws 
Exception {
                }
        }
 
+       @Test
+       public void testSavepointNotResultingInPriorityEvents() throws 
Exception {
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+
+               SubtaskCheckpointCoordinator coordinator = new 
MockSubtaskCheckpointCoordinatorBuilder()
+                               .setUnalignedCheckpointEnabled(true)
+                               .setEnvironment(mockEnvironment)
+                               .build();
+
+               AtomicReference<Boolean> broadcastedPriorityEvent = new 
AtomicReference<>(null);
+               final OperatorChain<?, ?> operatorChain = new OperatorChain(
+                               new 
MockStreamTaskBuilder(mockEnvironment).build(),
+                               new NonRecordWriter<>()) {
+                       @Override
+                       public void broadcastEvent(AbstractEvent event, boolean 
isPriorityEvent) throws IOException {
+                               super.broadcastEvent(event, isPriorityEvent);
+                               broadcastedPriorityEvent.set(isPriorityEvent);
+                       }
+               };
+
+               coordinator.checkpointState(
+                               new CheckpointMetaData(0, 0),
+                               new CheckpointOptions(SAVEPOINT, 
CheckpointStorageLocationReference.getDefault()),
+                               new CheckpointMetrics(),
+                               operatorChain,
+                               () -> false);
+
+               assertEquals(false, broadcastedPriorityEvent.get());
+       }
+
        @Test
        public void testSkipChannelStateForSavepoints() throws Exception {
                SubtaskCheckpointCoordinator coordinator = new 
MockSubtaskCheckpointCoordinatorBuilder()
-                       .setUnalignedCheckpointEnabled(false)
+                       .setUnalignedCheckpointEnabled(true)

Review comment:
       :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to