This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a3e711ffe8 [FLINK-30002][checkpoint] Change the alignmentTimeout to 
alignedCheckpointTimeout
0a3e711ffe8 is described below

commit 0a3e711ffe8c433d3881b20261446d4dd026827f
Author: 1996fanrui <[email protected]>
AuthorDate: Sat Nov 12 12:15:27 2022 +0800

    [FLINK-30002][checkpoint] Change the alignmentTimeout to 
alignedCheckpointTimeout
---
 .../network/api/serialization/EventSerializer.java |   5 +-
 .../AbstractAlignedBarrierHandlerState.java        |   2 +-
 ...tractAlternatingAlignedBarrierHandlerState.java |   5 +-
 .../AlternatingCollectingBarriers.java             |   4 +-
 .../AlternatingCollectingBarriersUnaligned.java    |   2 +-
 .../AlternatingWaitingForFirstBarrier.java         |   2 +-
 ...AlternatingWaitingForFirstBarrierUnaligned.java |   2 +-
 .../io/checkpointing/BarrierHandlerState.java      |   3 +-
 .../SingleCheckpointBarrierHandler.java            |   2 +-
 .../checkpointing/AlternatingCheckpointsTest.java  | 139 +++++++++++----------
 10 files changed, 91 insertions(+), 75 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 40ccf17c585..c2e5c6549dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -320,12 +320,13 @@ public class EventSerializer {
         }
         final CheckpointOptions.AlignmentType alignmentType =
                 CheckpointOptions.AlignmentType.values()[buffer.get()];
-        final long alignmentTimeout = buffer.getLong();
+        final long alignedCheckpointTimeout = buffer.getLong();
 
         return new CheckpointBarrier(
                 id,
                 timestamp,
-                new CheckpointOptions(snapshotType, locationRef, 
alignmentType, alignmentTimeout));
+                new CheckpointOptions(
+                        snapshotType, locationRef, alignmentType, 
alignedCheckpointTimeout));
     }
 
     private static SavepointType decodeSavepointType(byte checkpointTypeCode, 
ByteBuffer buffer)
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlignedBarrierHandlerState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlignedBarrierHandlerState.java
index 8f92d5f93a5..2fa77ab07de 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlignedBarrierHandlerState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlignedBarrierHandlerState.java
@@ -36,7 +36,7 @@ abstract class AbstractAlignedBarrierHandlerState implements 
BarrierHandlerState
     }
 
     @Override
-    public final BarrierHandlerState alignmentTimeout(
+    public final BarrierHandlerState alignedCheckpointTimeout(
             Controller controller, CheckpointBarrier checkpointBarrier)
             throws IOException, CheckpointException {
         throw new IllegalStateException(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
index 97e148f3c14..3a89af88d14 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
@@ -51,7 +51,8 @@ abstract class AbstractAlternatingAlignedBarrierHandlerState 
implements BarrierH
             boolean markChannelBlocked)
             throws IOException, CheckpointException {
         if (checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
-            BarrierHandlerState unalignedState = alignmentTimeout(controller, 
checkpointBarrier);
+            BarrierHandlerState unalignedState =
+                    alignedCheckpointTimeout(controller, checkpointBarrier);
             return unalignedState.barrierReceived(
                     controller, channelInfo, checkpointBarrier, 
markChannelBlocked);
         }
@@ -67,7 +68,7 @@ abstract class AbstractAlternatingAlignedBarrierHandlerState 
implements BarrierH
             controller.triggerGlobalCheckpoint(checkpointBarrier);
             return finishCheckpoint();
         } else if (controller.isTimedOut(checkpointBarrier)) {
-            return alignmentTimeout(controller, checkpointBarrier)
+            return alignedCheckpointTimeout(controller, checkpointBarrier)
                     .barrierReceived(
                             controller,
                             channelInfo,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
index eacd9f58afb..8ca37055bc3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
@@ -38,7 +38,7 @@ final class AlternatingCollectingBarriers extends 
AbstractAlternatingAlignedBarr
     }
 
     @Override
-    public BarrierHandlerState alignmentTimeout(
+    public BarrierHandlerState alignedCheckpointTimeout(
             Controller controller, CheckpointBarrier checkpointBarrier)
             throws IOException, CheckpointException {
         state.prioritizeAllAnnouncements();
@@ -71,7 +71,7 @@ final class AlternatingCollectingBarriers extends 
AbstractAlternatingAlignedBarr
             controller.triggerGlobalCheckpoint(pendingCheckpointBarrier);
             return finishCheckpoint();
         } else if (controller.isTimedOut(pendingCheckpointBarrier)) {
-            return alignmentTimeout(controller, pendingCheckpointBarrier)
+            return alignedCheckpointTimeout(controller, 
pendingCheckpointBarrier)
                     .endOfPartitionReceived(controller, channelInfo);
         }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
index c9ccb87fa8e..eb4f15f7479 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriersUnaligned.java
@@ -42,7 +42,7 @@ final class AlternatingCollectingBarriersUnaligned implements 
BarrierHandlerStat
     }
 
     @Override
-    public BarrierHandlerState alignmentTimeout(
+    public BarrierHandlerState alignedCheckpointTimeout(
             Controller controller, CheckpointBarrier checkpointBarrier) {
         // ignore already processing unaligned checkpoints
         return this;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrier.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrier.java
index bd11296ae6e..c8b718a520f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrier.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrier.java
@@ -32,7 +32,7 @@ final class AlternatingWaitingForFirstBarrier
     }
 
     @Override
-    public BarrierHandlerState alignmentTimeout(
+    public BarrierHandlerState alignedCheckpointTimeout(
             Controller controller, CheckpointBarrier checkpointBarrier)
             throws IOException, CheckpointException {
         state.prioritizeAllAnnouncements();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
index e375e62dceb..af04f4f8107 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingWaitingForFirstBarrierUnaligned.java
@@ -40,7 +40,7 @@ final class AlternatingWaitingForFirstBarrierUnaligned 
implements BarrierHandler
     }
 
     @Override
-    public BarrierHandlerState alignmentTimeout(
+    public BarrierHandlerState alignedCheckpointTimeout(
             Controller controller, CheckpointBarrier checkpointBarrier) {
         // ignore already processing unaligned checkpoints
         return this;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierHandlerState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierHandlerState.java
index 65b68628a39..7ae41196519 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierHandlerState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierHandlerState.java
@@ -40,7 +40,8 @@ import java.io.IOException;
  * actions.
  */
 interface BarrierHandlerState {
-    BarrierHandlerState alignmentTimeout(Controller controller, 
CheckpointBarrier checkpointBarrier)
+    BarrierHandlerState alignedCheckpointTimeout(
+            Controller controller, CheckpointBarrier checkpointBarrier)
             throws IOException, CheckpointException;
 
     BarrierHandlerState announcementReceived(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index aaacd64eb63..c1bd9ad6c85 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -318,7 +318,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                                 if (currentCheckpointId == barrierId
                                         && 
!getAllBarriersReceivedFuture(barrierId).isDone()) {
                                     currentState =
-                                            currentState.alignmentTimeout(
+                                            
currentState.alignedCheckpointTimeout(
                                                     context, announcedBarrier);
                                 }
                             } catch (CheckpointException ex) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
index 2642bbcd73a..3d4a52a091d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
@@ -205,7 +205,7 @@ public class AlternatingCheckpointsTest {
     public void testAlignedAfterTimedOut() throws Exception {
         int numChannels = 1;
         ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
-        long alignmentTimeOut = 100L;
+        long alignedCheckpointTimeout = 100L;
         try (CheckpointedInputGate gate =
                 new TestCheckpointedInputGateBuilder(
                                 numChannels, 
getTestBarrierHandlerFactory(target))
@@ -218,10 +218,12 @@ public class AlternatingCheckpointsTest {
                             1,
                             clock.relativeTimeMillis(),
                             alignedWithTimeout(
-                                    CheckpointType.CHECKPOINT, getDefault(), 
alignmentTimeOut));
+                                    CheckpointType.CHECKPOINT,
+                                    getDefault(),
+                                    alignedCheckpointTimeout));
             ((RemoteInputChannel) 
gate.getChannel(0)).onBuffer(barrier1.retainBuffer(), 0, 0);
             assertAnnouncement(gate);
-            clock.advanceTime(alignmentTimeOut + 1, TimeUnit.MILLISECONDS);
+            clock.advanceTime(alignedCheckpointTimeout + 1, 
TimeUnit.MILLISECONDS);
             assertBarrier(gate);
 
             assertEquals(1, target.getTriggeredCheckpointCounter());
@@ -230,7 +232,9 @@ public class AlternatingCheckpointsTest {
                             2,
                             clock.relativeTimeMillis(),
                             alignedWithTimeout(
-                                    CheckpointType.CHECKPOINT, getDefault(), 
alignmentTimeOut));
+                                    CheckpointType.CHECKPOINT,
+                                    getDefault(),
+                                    alignedCheckpointTimeout));
             ((RemoteInputChannel) 
gate.getChannel(0)).onBuffer(barrier2.retainBuffer(), 1, 0);
             assertAnnouncement(gate);
             assertBarrier(gate);
@@ -241,7 +245,9 @@ public class AlternatingCheckpointsTest {
                     contains(
                             unaligned(CheckpointType.CHECKPOINT, getDefault()),
                             alignedWithTimeout(
-                                    CheckpointType.CHECKPOINT, getDefault(), 
alignmentTimeOut)));
+                                    CheckpointType.CHECKPOINT,
+                                    getDefault(),
+                                    alignedCheckpointTimeout)));
         }
     }
 
@@ -303,9 +309,11 @@ public class AlternatingCheckpointsTest {
     }
 
     private void testTimeoutBarrierOnTwoChannels(
-            ValidatingCheckpointHandler target, CheckpointedInputGate gate, 
long alignmentTimeout)
+            ValidatingCheckpointHandler target,
+            CheckpointedInputGate gate,
+            long alignedCheckpointTimeout)
             throws Exception {
-        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+        Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
         getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
         getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
@@ -315,7 +323,7 @@ public class AlternatingCheckpointsTest {
 
         assertEquals(0, target.getTriggeredCheckpointCounter());
         assertAnnouncement(gate);
-        clock.advanceTime(alignmentTimeout * 2, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout * 2, TimeUnit.MILLISECONDS);
         assertAnnouncement(gate);
         assertBarrier(gate);
         assertBarrier(gate);
@@ -348,8 +356,8 @@ public class AlternatingCheckpointsTest {
                         .withMailboxExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
-        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+        long alignedCheckpointTimeout = 100;
+        Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
         for (int i = 0; i < numChannels; i++) {
             (getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(), 
0, 0);
@@ -361,7 +369,7 @@ public class AlternatingCheckpointsTest {
         }
         assertEquals(0, target.getTriggeredCheckpointCounter());
 
-        clock.advanceTime(alignmentTimeout * 4, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout * 4, TimeUnit.MILLISECONDS);
 
         assertBarrier(gate);
         assertEquals(1, target.getTriggeredCheckpointCounter());
@@ -403,8 +411,8 @@ public class AlternatingCheckpointsTest {
                         .withMailboxExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
-        Buffer checkpointBarrier = withTimeout(1, alignmentTimeout);
+        long alignedCheckpointTimeout = 100;
+        Buffer checkpointBarrier = withTimeout(1, alignedCheckpointTimeout);
 
         // when: Execute the first checkpoint when announcement received first.
         ((TestInputChannel) 
gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
@@ -421,9 +429,9 @@ public class AlternatingCheckpointsTest {
         assertEquals(1, target.getTriggeredCheckpointCounter());
 
         // given: The time in the future.
-        clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
 
-        checkpointBarrier = withTimeout(2, alignmentTimeout);
+        checkpointBarrier = withTimeout(2, alignedCheckpointTimeout);
 
         // when: Execute the second checkpoint when barrier from local channel 
without announcement
         // received first.
@@ -435,7 +443,7 @@ public class AlternatingCheckpointsTest {
 
         // when: Receiving the barrier from second channel(with/without) 
announcement after time
         // more than alignment timeout.
-        clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
         (getChannel(gate, 1)).onBuffer(checkpointBarrier.retainBuffer(), 1, 0);
         assertAnnouncement(gate);
         assertBarrier(gate);
@@ -458,8 +466,8 @@ public class AlternatingCheckpointsTest {
                         .withMixedChannels(0)
                         .withMailboxExecutor()
                         .build()) {
-            long alignmentTimeout = 10;
-            Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+            long alignedCheckpointTimeout = 10;
+            Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
             ((TestInputChannel) 
gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
             ((TestInputChannel) gate.getChannel(0)).read(dataBuffer());
@@ -469,7 +477,8 @@ public class AlternatingCheckpointsTest {
             getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 
0);
 
             assertEquals(0, target.getTriggeredCheckpointCounter());
-            clock.advanceTimeWithoutRunningCallables(alignmentTimeout + 1, 
TimeUnit.MILLISECONDS);
+            clock.advanceTimeWithoutRunningCallables(
+                    alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
             // the announcement should passively time out causing the barriers 
to overtake the data
             // buffers
             assertAnnouncement(gate);
@@ -502,11 +511,11 @@ public class AlternatingCheckpointsTest {
                         .withMailboxExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
-        performFirstCheckpoint(numChannels, target, gate, alignmentTimeout);
+        long alignedCheckpointTimeout = 100;
+        performFirstCheckpoint(numChannels, target, gate, 
alignedCheckpointTimeout);
         assertEquals(1, target.getTriggeredCheckpointCounter());
 
-        Buffer checkpointBarrier = withTimeout(2, alignmentTimeout);
+        Buffer checkpointBarrier = withTimeout(2, alignedCheckpointTimeout);
 
         for (int i = 0; i < numChannels; i++) {
             (getChannel(gate, i)).onBuffer(dataBuffer(), 1, 0);
@@ -519,7 +528,7 @@ public class AlternatingCheckpointsTest {
         }
         assertEquals(1, target.getTriggeredCheckpointCounter());
 
-        clock.advanceTime(alignmentTimeout * 4, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout * 4, TimeUnit.MILLISECONDS);
         // the barrier should overtake the data buffers
         assertBarrier(gate);
         assertEquals(2, target.getTriggeredCheckpointCounter());
@@ -529,9 +538,9 @@ public class AlternatingCheckpointsTest {
             int numChannels,
             ValidatingCheckpointHandler target,
             CheckpointedInputGate gate,
-            long alignmentTimeout)
+            long alignedCheckpointTimeout)
             throws IOException, InterruptedException {
-        Buffer checkpointBarrier = withTimeout(1, alignmentTimeout);
+        Buffer checkpointBarrier = withTimeout(1, alignedCheckpointTimeout);
         for (int i = 0; i < numChannels; i++) {
             (getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(), 
0, 0);
         }
@@ -555,14 +564,15 @@ public class AlternatingCheckpointsTest {
                         .withMailboxExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
-        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+        long alignedCheckpointTimeout = 100;
+        Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
         (getChannel(gate, 0)).onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
         assertEquals(0, target.getTriggeredCheckpointCounter());
         assertAnnouncement(gate);
         assertBarrier(gate);
-        clock.advanceTimeWithoutRunningCallables(alignmentTimeout * 4, 
TimeUnit.MILLISECONDS);
+        clock.advanceTimeWithoutRunningCallables(
+                alignedCheckpointTimeout * 4, TimeUnit.MILLISECONDS);
         (getChannel(gate, 1)).onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
         assertAnnouncement(gate);
 
@@ -581,19 +591,19 @@ public class AlternatingCheckpointsTest {
                         .withSyncExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
-        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+        long alignedCheckpointTimeout = 100;
+        Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
         send(checkpointBarrier, 0, gate);
 
-        clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
         assertThat(
                 target.getTriggeredCheckpointOptions(),
                 contains(unaligned(CheckpointType.CHECKPOINT, getDefault())));
     }
 
     @Test
-    public void testAllChannelsUnblockedAfterAlignmentTimeout() throws 
Exception {
+    public void testAllChannelsUnblockedAfteralignedCheckpointTimeout() throws 
Exception {
         int numberOfChannels = 2;
         ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
         CheckpointedInputGate gate =
@@ -603,13 +613,13 @@ public class AlternatingCheckpointsTest {
                         .withSyncExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
+        long alignedCheckpointTimeout = 100;
         CheckpointBarrier checkpointBarrier =
                 new CheckpointBarrier(
                         1,
                         clock.relativeTimeMillis(),
                         alignedWithTimeout(
-                                CheckpointType.CHECKPOINT, getDefault(), 
alignmentTimeout));
+                                CheckpointType.CHECKPOINT, getDefault(), 
alignedCheckpointTimeout));
         Buffer checkpointBarrierBuffer = toBuffer(checkpointBarrier, false);
 
         // we set timer on announcement and test channels do not produce 
announcements by themselves
@@ -618,7 +628,7 @@ public class AlternatingCheckpointsTest {
         ((TestInputChannel) gate.getChannel(0)).setBlocked(true);
         send(checkpointBarrierBuffer, 0, gate);
 
-        clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
         send(EventSerializer.toBuffer(new EventAnnouncement(checkpointBarrier, 
0), true), 1, gate);
         // emulate blocking channels on aligned barriers
         ((TestInputChannel) gate.getChannel(1)).setBlocked(true);
@@ -643,12 +653,12 @@ public class AlternatingCheckpointsTest {
                         .withSyncExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
-        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+        long alignedCheckpointTimeout = 100;
+        Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
         send(checkpointBarrier, 0, gate);
         send(checkpointBarrier, 1, gate);
-        clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
 
         assertThat(
                 target.getTriggeredCheckpointOptions(),
@@ -666,13 +676,13 @@ public class AlternatingCheckpointsTest {
                         .withSyncExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
-        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+        long alignedCheckpointTimeout = 100;
+        Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
         send(checkpointBarrier, 0, gate);
         send(toBuffer(new CancelCheckpointMarker(1L), true), 0, gate);
         send(toBuffer(new CancelCheckpointMarker(1L), true), 1, gate);
-        clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+        clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
 
         assertThat(target.getTriggeredCheckpointOptions().size(), equalTo(0));
     }
@@ -700,12 +710,12 @@ public class AlternatingCheckpointsTest {
                         .withSyncExecutor()
                         .build();
 
-        long alignmentTimeout = 100;
-        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+        long alignedCheckpointTimeout = 100;
+        Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
         send(checkpointBarrier, 0, gate);
         gate.close();
-        clockWithDelayedActions.advanceTime(alignmentTimeout + 1, 
TimeUnit.MILLISECONDS);
+        clockWithDelayedActions.advanceTime(alignedCheckpointTimeout + 1, 
TimeUnit.MILLISECONDS);
 
         assertThat(target.getTriggeredCheckpointOptions().size(), equalTo(0));
     }
@@ -720,8 +730,8 @@ public class AlternatingCheckpointsTest {
                         .withRemoteChannels()
                         .withMailboxExecutor()
                         .build()) {
-            long alignmentTimeout = 10;
-            Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+            long alignedCheckpointTimeout = 10;
+            Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
             getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
             getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
@@ -733,7 +743,7 @@ public class AlternatingCheckpointsTest {
             assertAnnouncement(gate);
             assertAnnouncement(gate);
             // the announcement should time out causing the barriers to 
overtake the data buffers
-            clock.advanceTime(alignmentTimeout + 1, TimeUnit.MILLISECONDS);
+            clock.advanceTime(alignedCheckpointTimeout + 1, 
TimeUnit.MILLISECONDS);
             assertBarrier(gate);
             assertBarrier(gate);
             assertEquals(1, target.getTriggeredCheckpointCounter());
@@ -757,8 +767,8 @@ public class AlternatingCheckpointsTest {
                         .withRemoteChannels()
                         .withMailboxExecutor()
                         .build()) {
-            long alignmentTimeout = 10;
-            Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+            long alignedCheckpointTimeout = 10;
+            Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
             getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
             getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
@@ -768,7 +778,8 @@ public class AlternatingCheckpointsTest {
 
             assertEquals(0, target.getTriggeredCheckpointCounter());
             assertAnnouncement(gate);
-            clock.advanceTimeWithoutRunningCallables(alignmentTimeout + 1, 
TimeUnit.MILLISECONDS);
+            clock.advanceTimeWithoutRunningCallables(
+                    alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
             // the announcement should passively time out causing the barriers 
to overtake the data
             // buffers
             assertAnnouncement(gate);
@@ -841,8 +852,8 @@ public class AlternatingCheckpointsTest {
                         .withRemoteChannels()
                         .withMailboxExecutor()
                         .build()) {
-            long alignmentTimeout = 10;
-            Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+            long alignedCheckpointTimeout = 10;
+            Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
             getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
             getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
@@ -853,7 +864,8 @@ public class AlternatingCheckpointsTest {
             // we simulate active time out firing after the passive one
             assertData(gate);
             assertData(gate);
-            clock.advanceTimeWithoutRunningCallables(alignmentTimeout + 1, 
TimeUnit.MILLISECONDS);
+            clock.advanceTimeWithoutRunningCallables(
+                    alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
             // the first barrier should passively time out causing the second 
barrier to overtake
             // the remaining data buffer
             assertBarrier(gate);
@@ -922,7 +934,7 @@ public class AlternatingCheckpointsTest {
     public void testTimeoutAlignmentAfterReceivedEndOfPartition() throws 
Exception {
         int numChannels = 3;
         ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
-        long alignmentTimeOut = 100L;
+        long alignedCheckpointTimeout = 100L;
 
         try (CheckpointedInputGate gate =
                 new TestCheckpointedInputGateBuilder(
@@ -939,7 +951,7 @@ public class AlternatingCheckpointsTest {
                                     alignedWithTimeout(
                                             CheckpointType.CHECKPOINT,
                                             getDefault(),
-                                            alignmentTimeOut)),
+                                            alignedCheckpointTimeout)),
                             0,
                             0);
             assertAnnouncement(gate);
@@ -947,7 +959,8 @@ public class AlternatingCheckpointsTest {
 
             // Advances time but do not execute the registered callable which 
would turns into
             // unaligned checkpoint.
-            clock.advanceTimeWithoutRunningCallables(alignmentTimeOut + 1, 
TimeUnit.MILLISECONDS);
+            clock.advanceTimeWithoutRunningCallables(
+                    alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);
 
             // The EndOfPartition should convert the checkpoint into unaligned.
             getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0);
@@ -978,7 +991,7 @@ public class AlternatingCheckpointsTest {
     public void testStartNewCheckpointViaAnnouncement() throws Exception {
         int numChannels = 3;
         ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
-        long alignmentTimeOut = 10000L;
+        long alignedCheckpointTimeout = 10000L;
 
         try (CheckpointedInputGate gate =
                 new TestCheckpointedInputGateBuilder(
@@ -994,7 +1007,7 @@ public class AlternatingCheckpointsTest {
                                     alignedWithTimeout(
                                             CheckpointType.CHECKPOINT,
                                             getDefault(),
-                                            alignmentTimeOut)),
+                                            alignedCheckpointTimeout)),
                             0,
                             0);
             getChannel(gate, 1).onBuffer(endOfPartition(), 0, 0);
@@ -1018,7 +1031,7 @@ public class AlternatingCheckpointsTest {
                                     alignedWithTimeout(
                                             CheckpointType.CHECKPOINT,
                                             getDefault(),
-                                            alignmentTimeOut)),
+                                            alignedCheckpointTimeout)),
                             0,
                             0);
             assertAnnouncement(gate);
@@ -1296,16 +1309,16 @@ public class AlternatingCheckpointsTest {
                         .withSyncExecutor()
                         .build();
 
-        long alignmentTimeout = 10000;
-        Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+        long alignedCheckpointTimeout = 10000;
+        Buffer checkpointBarrier = withTimeout(alignedCheckpointTimeout);
 
         send(checkpointBarrier, 0, gate);
         clock.advanceTime(Duration.ofSeconds(1));
-        send(withTimeout(2, alignmentTimeout), 0, gate);
+        send(withTimeout(2, alignedCheckpointTimeout), 0, gate);
         clock.advanceTime(Duration.ofSeconds(1));
         send(toBuffer(new CancelCheckpointMarker(1L), true), 1, gate);
         clock.advanceTime(Duration.ofSeconds(1));
-        send(withTimeout(2, alignmentTimeout), 1, gate);
+        send(withTimeout(2, alignedCheckpointTimeout), 1, gate);
         clock.advanceTime(Duration.ofSeconds(1));
 
         assertEquals(

Reply via email to