This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 8a9c943661c [FLINK-32655][runtime] Fix checkpoint aborted message 
being swallowed by RecreateOnResetOperatorCoordinator.
8a9c943661c is described below

commit 8a9c943661c227ee58b2d744dee2c92f0f61a51b
Author: liming.1018 <[email protected]>
AuthorDate: Mon Jul 24 15:42:58 2023 +0800

    [FLINK-32655][runtime] Fix checkpoint aborted message being swallowed by 
RecreateOnResetOperatorCoordinator.
    
    reformat tests and fix checkstyle
    
    Co-authored-by: Rui Fan <[email protected]>
---
 .../coordination/RecreateOnResetOperatorCoordinator.java  |  5 +++++
 .../RecreateOnResetOperatorCoordinatorTest.java           | 15 +++++++++++++++
 .../coordination/TestingOperatorCoordinator.java          | 12 ++++++++++++
 3 files changed, 32 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 219bae1b6a8..0d03f868900 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -115,6 +115,11 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
         coordinator.applyCall("checkpointComplete", c -> 
c.notifyCheckpointComplete(checkpointId));
     }
 
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) {
+        coordinator.applyCall("checkpointAborted", c -> 
c.notifyCheckpointAborted(checkpointId));
+    }
+
     @Override
     public void resetToCheckpoint(final long checkpointId, @Nullable final 
byte[] checkpointData) {
         // First bump up the coordinator epoch to fence out the active 
coordinator.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
index 86647653117..e0115bd4ccb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -264,6 +264,21 @@ public class RecreateOnResetOperatorCoordinatorTest {
                 "Timed out when waiting for the coordinator to close.");
     }
 
+    @Test
+    public void testNotifyCheckpointAbortedSuccess() throws Exception {
+        TestingCoordinatorProvider provider = new 
TestingCoordinatorProvider(null);
+        MockOperatorCoordinatorContext context =
+                new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+        RecreateOnResetOperatorCoordinator coordinator = 
createCoordinator(provider, context);
+        TestingOperatorCoordinator internalCoordinatorAfterReset =
+                getInternalCoordinator(coordinator);
+
+        long checkpointId = 10L;
+        coordinator.notifyCheckpointAborted(checkpointId);
+        assertThat(internalCoordinatorAfterReset.getLastCheckpointAborted())
+                .isEqualTo(checkpointId);
+    }
+
     // ---------------
 
     private RecreateOnResetOperatorCoordinator createCoordinator(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 5f6fedff2e4..08b2a3ff9ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -54,6 +54,8 @@ public class TestingOperatorCoordinator implements 
OperatorCoordinator {
 
     private final BlockingQueue<Long> lastCheckpointComplete;
 
+    private final BlockingQueue<Long> lastCheckpointAborted;
+
     private final BlockingQueue<OperatorEvent> receivedOperatorEvents;
 
     private final Map<Integer, SubtaskGateway> subtaskGateways;
@@ -72,6 +74,7 @@ public class TestingOperatorCoordinator implements 
OperatorCoordinator {
         this.context = context;
         this.triggeredCheckpoints = new LinkedBlockingQueue<>();
         this.lastCheckpointComplete = new LinkedBlockingQueue<>();
+        this.lastCheckpointAborted = new LinkedBlockingQueue<>();
         this.receivedOperatorEvents = new LinkedBlockingQueue<>();
         this.blockOnCloseLatch = blockOnCloseLatch;
         this.subtaskGateways = new HashMap<>();
@@ -125,6 +128,11 @@ public class TestingOperatorCoordinator implements 
OperatorCoordinator {
         lastCheckpointComplete.offer(checkpointId);
     }
 
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) {
+        lastCheckpointAborted.offer(checkpointId);
+    }
+
     @Override
     public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) {
         if (resetToCheckpointConsumer != null) {
@@ -185,6 +193,10 @@ public class TestingOperatorCoordinator implements 
OperatorCoordinator {
         return lastCheckpointComplete.take();
     }
 
+    public long getLastCheckpointAborted() throws InterruptedException {
+        return lastCheckpointAborted.take();
+    }
+
     @Nullable
     public OperatorEvent getNextReceivedOperatorEvent() {
         return receivedOperatorEvents.poll();

Reply via email to