rkhachatryan commented on a change in pull request #12617:
URL: https://github.com/apache/flink/pull/12617#discussion_r439069816
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -253,7 +249,7 @@ public void checkpointState(
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(),
metadata.getTimestamp(), options),
- unalignedCheckpointEnabled);
+ options.isUnalignedCheckpoint());
Review comment:
I like the idea of using barrier options instead of local task
configuration.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -732,7 +732,7 @@ private void snapshotTaskState(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference(),
isExactlyOnceMode,
- unalignedCheckpointsEnabled);
+ props.getCheckpointType() == CheckpointType.CHECKPOINT
&& unalignedCheckpointsEnabled);
Review comment:
:+1:
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
##########
@@ -119,10 +122,40 @@ public void testNotifyCheckpointComplete() throws
Exception {
}
}
+ @Test
+ public void testSavepointNotResultingInPriorityEvents() throws
Exception {
+ MockEnvironment mockEnvironment =
MockEnvironment.builder().build();
+
+ SubtaskCheckpointCoordinator coordinator = new
MockSubtaskCheckpointCoordinatorBuilder()
+ .setUnalignedCheckpointEnabled(true)
+ .setEnvironment(mockEnvironment)
+ .build();
+
+ AtomicReference<Boolean> broadcastedPriorityEvent = new
AtomicReference<>(null);
+ final OperatorChain<?, ?> operatorChain = new OperatorChain(
+ new
MockStreamTaskBuilder(mockEnvironment).build(),
+ new NonRecordWriter<>()) {
+ @Override
+ public void broadcastEvent(AbstractEvent event, boolean
isPriorityEvent) throws IOException {
+ super.broadcastEvent(event, isPriorityEvent);
+ broadcastedPriorityEvent.set(isPriorityEvent);
+ }
+ };
+
+ coordinator.checkpointState(
+ new CheckpointMetaData(0, 0),
+ new CheckpointOptions(SAVEPOINT,
CheckpointStorageLocationReference.getDefault()),
+ new CheckpointMetrics(),
+ operatorChain,
+ () -> false);
+
+ assertEquals(false, broadcastedPriorityEvent.get());
+ }
+
@Test
public void testSkipChannelStateForSavepoints() throws Exception {
SubtaskCheckpointCoordinator coordinator = new
MockSubtaskCheckpointCoordinatorBuilder()
- .setUnalignedCheckpointEnabled(false)
+ .setUnalignedCheckpointEnabled(true)
Review comment:
:+1:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]