This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0a3e711ffe8 [FLINK-30002][checkpoint] Change the alignmentTimeout to
alignedCheckpointTimeout
0a3e711ffe8 is described below
commit 0a3e711ffe8c433d3881b20261446d4dd026827f
Author: 1996fanrui <[email protected]>
AuthorDate: Sat Nov 12 12:15:27 2022 +0800
[FLINK-30002][checkpoint] Change the alignmentTimeout to
alignedCheckpointTimeout
---
.../network/api/serialization/EventSerializer.java | 5 +-
.../AbstractAlignedBarrierHandlerState.java | 2 +-
...tractAlternatingAlignedBarrierHandlerState.java | 5 +-
.../AlternatingCollectingBarriers.java | 4 +-
.../AlternatingCollectingBarriersUnaligned.java | 2 +-
.../AlternatingWaitingForFirstBarrier.java | 2 +-
...AlternatingWaitingForFirstBarrierUnaligned.java | 2 +-
.../io/checkpointing/BarrierHandlerState.java | 3 +-
.../SingleCheckpointBarrierHandler.java | 2 +-
.../checkpointing/AlternatingCheckpointsTest.java | 139 +++++++++++----------
10 files changed, 91 insertions(+), 75 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 40ccf17c585..c2e5c6549dc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -320,12 +320,13 @@ public class EventSerializer {
}
final CheckpointOptions.AlignmentType alignmentType =
CheckpointOptions.AlignmentType.values()[buffer.get()];
- final long alignmentTimeout = buffer.getLong();
+ final long alignedCheckpointTimeout = buffer.getLong();
return new CheckpointBarrier(
id,
timestamp,
- new CheckpointOptions(snapshotType, locationRef,
alignmentType, alignmentTimeout));
+ new CheckpointOptions(
+ snapshotType, locationRef, alignmentType,
alignedCheckpointTimeout));
}
private static SavepointType decodeSavepointType(byte checkpointTypeCode,
ByteBuffer buffer)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlignedBarrierHandlerState.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlignedBarrierHandlerState.java
index 8f92d5f93a5..2fa77ab07de 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlignedBarrierHandlerState.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlignedBarrierHandlerState.java
@@ -36,7 +36,7 @@ abstract class AbstractAlignedBarrierHandlerState implements
BarrierHandlerState
}
@Override
- public final BarrierHandlerState alignmentTimeout(
+ public final BarrierHandlerState alignedCheckpointTimeout(
Controller controller, CheckpointBarrier checkpointBarrier)
throws IOException, CheckpointException {
throw new IllegalStateException(
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
index 97e148f3c14..3a89af88d14 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
@@ -51,7 +51,8 @@ abstract class AbstractAlternatingAlignedBarrierHandlerState
implements BarrierH
boolean markChannelBlocked)
throws IOException, CheckpointException {
if (checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
- BarrierHandlerState unalignedState = alignmentTimeout(controller,
checkpointBarrier);
+ BarrierHandlerState unalignedState =
+ alignedCheckpointTimeout(controller, checkpointBarrier);
return unalignedState.barrierReceived(
controller, channelInfo, checkpointBarrier,
markChannelBlocked);
}
@@ -67,7 +68,7 @@ abstract class AbstractAlternatingAlignedBarrierHandlerState
implements BarrierH
controller.triggerGlobalCheckpoint(checkpointBarrier);
return finishCheckpoint();
} else if (controller.isTimedOut(checkpointBarrier)) {
- return alignmentTimeout(controller, checkpointBarrier)
+ return alignedCheckpointTimeout(controller, checkpointBarrier)
.barrierReceived(
controller,
channelInfo,
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
index eacd9f58afb..8ca37055bc3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
@@ -38,7 +38,7 @@ final class AlternatingCollectingBarriers extends
AbstractAlternatingAlignedBarr
}
@Override
- public BarrierHandlerState alignmentTimeout(
+ public BarrierHandlerState alignedCheckpointTimeout(
Controller controller, CheckpointBarrier checkpointBarrier)
throws IOException, CheckpointException {
state.prioritizeAllAnnouncements();
@@ -71,7 +71,7 @@ final class AlternatingCollectingBarriers extends
AbstractAlternatingAlignedBarr
controller.triggerGlobalCheckpoint(pendingCheckpointBarrier);
return finishCheckpoint();
} else if (controller.isTimedOut(pendingCheckpointBarrier)) {
- return alignmentTimeout(controller, pendingCheckpointBarrier)
+ return alignedCheckpointTimeout(controller,
pendingCheckpointBarrier)
.endOfPartitionReceived(controller, channelInfo);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
index c9ccb87fa8e..eb4f15f7479 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
@@ -42,7 +42,7 @@ final class AlternatingCollectingBarriersUnaligned implements
BarrierHandlerStat
}
@Override
- public BarrierHandlerState alignmentTimeout(
+ public BarrierHandlerState alignedCheckpointTimeout(
Controller controller, CheckpointBarrier checkpointBarrier) {
// ignore already processing unaligned checkpoints
return this;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrier.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrier.java
index bd11296ae6e..c8b718a520f 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrier.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrier.java
@@ -32,7 +32,7 @@ final class AlternatingWaitingForFirstBarrier
}
@Override
- public BarrierHandlerState alignmentTimeout(
+ public BarrierHandlerState alignedCheckpointTimeout(
Controller controller, CheckpointBarrier checkpointBarrier)
throws IOException, CheckpointException {
state.prioritizeAllAnnouncements();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
index e375e62dceb..af04f4f8107 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
@@ -40,7 +40,7 @@ final class AlternatingWaitingForFirstBarrierUnaligned
implements BarrierHandler
}
@Override
- public BarrierHandlerState alignmentTimeout(
+ public BarrierHandlerState alignedCheckpointTimeout(
Controller controller, CheckpointBarrier checkpointBarrier) {
// ignore already processing unaligned checkpoints
return this;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierHandlerState.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierHandlerState.java
index 65b68628a39..7ae41196519 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierHandlerState.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierHandlerState.java
@@ -40,7 +40,8 @@ import java.io.IOException;
* actions.
*/
interface BarrierHandlerState {
- BarrierHandlerState alignmentTimeout(Controller controller,
CheckpointBarrier checkpointBarrier)
+ BarrierHandlerState alignedCheckpointTimeout(
+ Controller controller, CheckpointBarrier checkpointBarrier)
throws IOException, CheckpointException;
BarrierHandlerState announcementReceived(
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index aaacd64eb63..c1bd9ad6c85 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -318,7 +318,7 @@ public class SingleCheckpointBarrierHandler extends
CheckpointBarrierHandler {
if (currentCheckpointId == barrierId
&&
!getAllBarriersReceivedFuture(barrierId).isDone()) {
currentState =
- currentState.alignmentTimeout(
+
currentState.alignedCheckpointTimeout(
context, announcedBarrier);
}
} catch (CheckpointException ex) {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
index 2642bbcd73a..3d4a52a091d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
@@ -205,7 +205,7 @@ public class AlternatingCheckpointsTest {
public void testAlignedAfterTimedOut() throws Exception {
int numChannels = 1;
ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
- long alignmentTimeOut = 100L;
+ long alignedCheckpointTimeout = 100L;
try (CheckpointedInputGate gate =
new TestCheckpointedInputGateBuilder(
numChannels,
getTestBarrierHandlerFactory(target))
@@ -218,10 +218,12 @@ public class AlternatingCheckpointsTest {
1,
clock.relativeTimeMillis(),
alignedWithTimeout(
- CheckpointType.CHECKPOINT, getDefault(),
alignmentTimeOut));
+ CheckpointType.CHECKPOINT,
+ getDefault(),
+ alignedCheckpointTimeout));
((RemoteInputChannel)
gate.getChannel(0)).onBuffer(barrier1.retainBuffer(), 0, 0);
assertAnnouncement(gate);
- clock.advanceTime(alignmentTimeOut + 1, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout + 1,
TimeUnit.MILLISECONDS);
assertBarrier(gate);
assertEquals(1, target.getTriggeredCheckpointCounter());
@@ -230,7 +232,9 @@ public class AlternatingCheckpointsTest {
2,
clock.relativeTimeMillis(),
alignedWithTimeout(
- CheckpointType.CHECKPOINT, getDefault(),
alignmentTimeOut));
+ CheckpointType.CHECKPOINT,
+ getDefault(),
+ alignedCheckpointTimeout));
((RemoteInputChannel)
gate.getChannel(0)).onBuffer(barrier2.retainBuffer(), 1, 0);
assertAnnouncement(gate);
assertBarrier(gate);
@@ -241,7 +245,9 @@ public class AlternatingCheckpointsTest {
contains(
unaligned(CheckpointType.CHECKPOINT, getDefault()),
alignedWithTimeout(
- CheckpointType.CHECKPOINT, getDefault(),
alignmentTimeOut)));
+ CheckpointType.CHECKPOINT,
+ getDefault(),
+ alignedCheckpointTimeout)));
}
}
@@ -303,9 +309,11 @@ public class AlternatingCheckpointsTest {
}
private void testTimeoutBarrierOnTwoChannels(
- ValidatingCheckpointHandler target, CheckpointedInputGate gate,
long alignmentTimeout)
+ ValidatingCheckpointHandler target,
+ CheckpointedInputGate gate,
+ long alignedCheckpointTimeout)
throws Exception {
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
@@ -315,7 +323,7 @@ public class AlternatingCheckpointsTest {
assertEquals(0, target.getTriggeredCheckpointCounter());
assertAnnouncement(gate);
- clock.advanceTime(alignmentTimeout * 2, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout * 2, TimeUnit.MILLISECONDS);
assertAnnouncement(gate);
assertBarrier(gate);
assertBarrier(gate);
@@ -348,8 +356,8 @@ public class AlternatingCheckpointsTest {
.withMailboxExecutor()
.build();
- long alignmentTimeout = 100;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 100;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
for (int i = 0; i < numChannels; i++) {
(getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(),
0, 0);
@@ -361,7 +369,7 @@ public class AlternatingCheckpointsTest {
}
assertEquals(0, target.getTriggeredCheckpointCounter());
- clock.advanceTime(alignmentTimeout * 4, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout * 4, TimeUnit.MILLISECONDS);
assertBarrier(gate);
assertEquals(1, target.getTriggeredCheckpointCounter());
@@ -403,8 +411,8 @@ public class AlternatingCheckpointsTest {
.withMailboxExecutor()
.build();
- long alignmentTimeout = 100;
- Buffer checkpointBarrier = withTimeout(1, alignmentTimeout);
+ long alignedCheckpointTimeout = 100;
+ Buffer checkpointBarrier = withTimeout(1, alignedCheckpointTimeout);
// when: Execute the first checkpoint when announcement received first.
((TestInputChannel)
gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
@@ -421,9 +429,9 @@ public class AlternatingCheckpointsTest {
assertEquals(1, target.getTriggeredCheckpointCounter());
// given: The time in the future.
- clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
- checkpointBarrier = withTimeout(2, alignmentTimeout);
+ checkpointBarrier = withTimeout(2, alignedCheckpointTimeout);
// when: Execute the second checkpoint when barrier from local channel
without announcement
// received first.
@@ -435,7 +443,7 @@ public class AlternatingCheckpointsTest {
// when: Receiving the barrier from second channel(with/without)
announcement after time
// more than alignment timeout.
- clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
(getChannel(gate, 1)).onBuffer(checkpointBarrier.retainBuffer(), 1, 0);
assertAnnouncement(gate);
assertBarrier(gate);
@@ -458,8 +466,8 @@ public class AlternatingCheckpointsTest {
.withMixedChannels(0)
.withMailboxExecutor()
.build()) {
- long alignmentTimeout = 10;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 10;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
((TestInputChannel)
gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
((TestInputChannel) gate.getChannel(0)).read(dataBuffer());
@@ -469,7 +477,8 @@ public class AlternatingCheckpointsTest {
getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1,
0);
assertEquals(0, target.getTriggeredCheckpointCounter());
- clock.advanceTimeWithoutRunningCallables(alignmentTimeout + 1,
TimeUnit.MILLISECONDS);
+ clock.advanceTimeWithoutRunningCallables(
+ alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
// the announcement should passively time out causing the barriers
to overtake the data
// buffers
assertAnnouncement(gate);
@@ -502,11 +511,11 @@ public class AlternatingCheckpointsTest {
.withMailboxExecutor()
.build();
- long alignmentTimeout = 100;
- performFirstCheckpoint(numChannels, target, gate, alignmentTimeout);
+ long alignedCheckpointTimeout = 100;
+ performFirstCheckpoint(numChannels, target, gate,
alignedCheckpointTimeout);
assertEquals(1, target.getTriggeredCheckpointCounter());
- Buffer checkpointBarrier = withTimeout(2, alignmentTimeout);
+ Buffer checkpointBarrier = withTimeout(2, alignedCheckpointTimeout);
for (int i = 0; i < numChannels; i++) {
(getChannel(gate, i)).onBuffer(dataBuffer(), 1, 0);
@@ -519,7 +528,7 @@ public class AlternatingCheckpointsTest {
}
assertEquals(1, target.getTriggeredCheckpointCounter());
- clock.advanceTime(alignmentTimeout * 4, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout * 4, TimeUnit.MILLISECONDS);
// the barrier should overtake the data buffers
assertBarrier(gate);
assertEquals(2, target.getTriggeredCheckpointCounter());
@@ -529,9 +538,9 @@ public class AlternatingCheckpointsTest {
int numChannels,
ValidatingCheckpointHandler target,
CheckpointedInputGate gate,
- long alignmentTimeout)
+ long alignedCheckpointTimeout)
throws IOException, InterruptedException {
- Buffer checkpointBarrier = withTimeout(1, alignmentTimeout);
+ Buffer checkpointBarrier = withTimeout(1, alignedCheckpointTimeout);
for (int i = 0; i < numChannels; i++) {
(getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(),
0, 0);
}
@@ -555,14 +564,15 @@ public class AlternatingCheckpointsTest {
.withMailboxExecutor()
.build();
- long alignmentTimeout = 100;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 100;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
(getChannel(gate, 0)).onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
assertEquals(0, target.getTriggeredCheckpointCounter());
assertAnnouncement(gate);
assertBarrier(gate);
- clock.advanceTimeWithoutRunningCallables(alignmentTimeout * 4,
TimeUnit.MILLISECONDS);
+ clock.advanceTimeWithoutRunningCallables(
+ alignedCheckpointTimeout * 4, TimeUnit.MILLISECONDS);
(getChannel(gate, 1)).onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
assertAnnouncement(gate);
@@ -581,19 +591,19 @@ public class AlternatingCheckpointsTest {
.withSyncExecutor()
.build();
- long alignmentTimeout = 100;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 100;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
send(checkpointBarrier, 0, gate);
- clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
assertThat(
target.getTriggeredCheckpointOptions(),
contains(unaligned(CheckpointType.CHECKPOINT, getDefault())));
}
@Test
- public void testAllChannelsUnblockedAfterAlignmentTimeout() throws
Exception {
+ public void testAllChannelsUnblockedAfteralignedCheckpointTimeout() throws
Exception {
int numberOfChannels = 2;
ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
CheckpointedInputGate gate =
@@ -603,13 +613,13 @@ public class AlternatingCheckpointsTest {
.withSyncExecutor()
.build();
- long alignmentTimeout = 100;
+ long alignedCheckpointTimeout = 100;
CheckpointBarrier checkpointBarrier =
new CheckpointBarrier(
1,
clock.relativeTimeMillis(),
alignedWithTimeout(
- CheckpointType.CHECKPOINT, getDefault(),
alignmentTimeout));
+ CheckpointType.CHECKPOINT, getDefault(),
alignedCheckpointTimeout));
Buffer checkpointBarrierBuffer = toBuffer(checkpointBarrier, false);
// we set timer on announcement and test channels do not produce
announcements by themselves
@@ -618,7 +628,7 @@ public class AlternatingCheckpointsTest {
((TestInputChannel) gate.getChannel(0)).setBlocked(true);
send(checkpointBarrierBuffer, 0, gate);
- clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
send(EventSerializer.toBuffer(new EventAnnouncement(checkpointBarrier,
0), true), 1, gate);
// emulate blocking channels on aligned barriers
((TestInputChannel) gate.getChannel(1)).setBlocked(true);
@@ -643,12 +653,12 @@ public class AlternatingCheckpointsTest {
.withSyncExecutor()
.build();
- long alignmentTimeout = 100;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 100;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
send(checkpointBarrier, 0, gate);
send(checkpointBarrier, 1, gate);
- clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
assertThat(
target.getTriggeredCheckpointOptions(),
@@ -666,13 +676,13 @@ public class AlternatingCheckpointsTest {
.withSyncExecutor()
.build();
- long alignmentTimeout = 100;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 100;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
send(checkpointBarrier, 0, gate);
send(toBuffer(new CancelCheckpointMarker(1L), true), 0, gate);
send(toBuffer(new CancelCheckpointMarker(1L), true), 1, gate);
- clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
assertThat(target.getTriggeredCheckpointOptions().size(), equalTo(0));
}
@@ -700,12 +710,12 @@ public class AlternatingCheckpointsTest {
.withSyncExecutor()
.build();
- long alignmentTimeout = 100;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 100;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
send(checkpointBarrier, 0, gate);
gate.close();
- clockWithDelayedActions.advanceTime(alignmentTimeout + 1,
TimeUnit.MILLISECONDS);
+ clockWithDelayedActions.advanceTime(alignedCheckpointTimeout + 1,
TimeUnit.MILLISECONDS);
assertThat(target.getTriggeredCheckpointOptions().size(), equalTo(0));
}
@@ -720,8 +730,8 @@ public class AlternatingCheckpointsTest {
.withRemoteChannels()
.withMailboxExecutor()
.build()) {
- long alignmentTimeout = 10;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 10;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
@@ -733,7 +743,7 @@ public class AlternatingCheckpointsTest {
assertAnnouncement(gate);
assertAnnouncement(gate);
// the announcement should time out causing the barriers to
overtake the data buffers
- clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+ clock.advanceTime(alignedCheckpointTimeout + 1,
TimeUnit.MILLISECONDS);
assertBarrier(gate);
assertBarrier(gate);
assertEquals(1, target.getTriggeredCheckpointCounter());
@@ -757,8 +767,8 @@ public class AlternatingCheckpointsTest {
.withRemoteChannels()
.withMailboxExecutor()
.build()) {
- long alignmentTimeout = 10;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 10;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
@@ -768,7 +778,8 @@ public class AlternatingCheckpointsTest {
assertEquals(0, target.getTriggeredCheckpointCounter());
assertAnnouncement(gate);
- clock.advanceTimeWithoutRunningCallables(alignmentTimeout + 1,
TimeUnit.MILLISECONDS);
+ clock.advanceTimeWithoutRunningCallables(
+ alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
// the announcement should passively time out causing the barriers
to overtake the data
// buffers
assertAnnouncement(gate);
@@ -841,8 +852,8 @@ public class AlternatingCheckpointsTest {
.withRemoteChannels()
.withMailboxExecutor()
.build()) {
- long alignmentTimeout = 10;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 10;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
@@ -853,7 +864,8 @@ public class AlternatingCheckpointsTest {
// we simulate active time out firing after the passive one
assertData(gate);
assertData(gate);
- clock.advanceTimeWithoutRunningCallables(alignmentTimeout + 1,
TimeUnit.MILLISECONDS);
+ clock.advanceTimeWithoutRunningCallables(
+ alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
// the first barrier should passively time out causing the second
barrier to overtake
// the remaining data buffer
assertBarrier(gate);
@@ -922,7 +934,7 @@ public class AlternatingCheckpointsTest {
public void testTimeoutAlignmentAfterReceivedEndOfPartition() throws
Exception {
int numChannels = 3;
ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
- long alignmentTimeOut = 100L;
+ long alignedCheckpointTimeout = 100L;
try (CheckpointedInputGate gate =
new TestCheckpointedInputGateBuilder(
@@ -939,7 +951,7 @@ public class AlternatingCheckpointsTest {
alignedWithTimeout(
CheckpointType.CHECKPOINT,
getDefault(),
- alignmentTimeOut)),
+ alignedCheckpointTimeout)),
0,
0);
assertAnnouncement(gate);
@@ -947,7 +959,8 @@ public class AlternatingCheckpointsTest {
// Advances time but do not execute the registered callable which
would turns into
// unaligned checkpoint.
- clock.advanceTimeWithoutRunningCallables(alignmentTimeOut + 1,
TimeUnit.MILLISECONDS);
+ clock.advanceTimeWithoutRunningCallables(
+ alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
// The EndOfPartition should convert the checkpoint into unaligned.
getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0);
@@ -978,7 +991,7 @@ public class AlternatingCheckpointsTest {
public void testStartNewCheckpointViaAnnouncement() throws Exception {
int numChannels = 3;
ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
- long alignmentTimeOut = 10000L;
+ long alignedCheckpointTimeout = 10000L;
try (CheckpointedInputGate gate =
new TestCheckpointedInputGateBuilder(
@@ -994,7 +1007,7 @@ public class AlternatingCheckpointsTest {
alignedWithTimeout(
CheckpointType.CHECKPOINT,
getDefault(),
- alignmentTimeOut)),
+ alignedCheckpointTimeout)),
0,
0);
getChannel(gate, 1).onBuffer(endOfPartition(), 0, 0);
@@ -1018,7 +1031,7 @@ public class AlternatingCheckpointsTest {
alignedWithTimeout(
CheckpointType.CHECKPOINT,
getDefault(),
- alignmentTimeOut)),
+ alignedCheckpointTimeout)),
0,
0);
assertAnnouncement(gate);
@@ -1296,16 +1309,16 @@ public class AlternatingCheckpointsTest {
.withSyncExecutor()
.build();
- long alignmentTimeout = 10000;
- Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+ long alignedCheckpointTimeout = 10000;
+ Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
send(checkpointBarrier, 0, gate);
clock.advanceTime(Duration.ofSeconds(1));
- send(withTimeout(2, alignmentTimeout), 0, gate);
+ send(withTimeout(2, alignedCheckpointTimeout), 0, gate);
clock.advanceTime(Duration.ofSeconds(1));
send(toBuffer(new CancelCheckpointMarker(1L), true), 1, gate);
clock.advanceTime(Duration.ofSeconds(1));
- send(withTimeout(2, alignmentTimeout), 1, gate);
+ send(withTimeout(2, alignedCheckpointTimeout), 1, gate);
clock.advanceTime(Duration.ofSeconds(1));
assertEquals(