dawidwys commented on a change in pull request #16227:
URL: https://github.com/apache/flink/pull/16227#discussion_r659689492
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
##########
@@ -783,9 +816,17 @@ public void configure(ReadableConfig configuration) {
configuration
.getOptional(ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA)
.ifPresent(this::setCheckpointIdOfIgnoredInFlightData);
- configuration
- .getOptional(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT)
- .ifPresent(this::setAlignmentTimeout);
+
+ Optional<Duration> alignedCheckpointTimeout =
+
configuration.getOptional(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT);
+ if (alignedCheckpointTimeout.isPresent()) {
Review comment:
IIRC this whole block is unnecessary. It is already implemented inside
of the Configuration if you use the `deprecatedKeys`.
It should be enough to do:
```
Duration alignedCheckpointTimeout =
configuration.get(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT);
```
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -353,6 +353,30 @@ public void testTimeoutAlignmentOnFirstBarrier() throws
Exception {
assertEquals(1, target.getTriggeredCheckpointCounter());
}
+ @Test
+ public void testTimeoutAlignmentBeforeFirstBarrier() throws Exception {
+ // given: Local channels.
+ int numChannels = 2;
+ ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+ CheckpointedInputGate gate =
+ new TestCheckpointedInputGateBuilder(
+ numChannels,
getTestBarrierHandlerFactory(target))
+ .withTestChannels()
+ .withMailboxExecutor()
+ .build();
+
+ long alignedCheckpointTimeout = 100;
+ // when: Aligned checkpoint timeout expired before the first barrier
received.
+ Buffer checkpointBarrier = withTimeout(1, alignedCheckpointTimeout);
+ clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
+
+ ((TestInputChannel)
gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
+
+ // then: The UC is triggered as soon as the first barrier will be
received.
Review comment:
nit:
```suggestion
// then: The UC is triggered as soon as the first barrier is
received.
```
##########
File path: docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
##########
@@ -101,10 +101,10 @@
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:DashboardConfiguration:Features",
"properties" : {
- "web-submit" : {
Review comment:
Out of curiosity. Do you know where does the reordering comes from? Is
the file autogenerated? If you don't know don't bother looking for an answer.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
##########
@@ -207,6 +208,7 @@ public int hashCode() {
checkpointRetentionPolicy,
isExactlyOnce,
isUnalignedCheckpointsEnabled,
+ alignedCheckpointTimeout,
Review comment:
Good catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]