This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 8a9c943661c [FLINK-32655][runtime] Fix checkpoint aborted message
being swallowed by RecreateOnResetOperatorCoordinator.
8a9c943661c is described below
commit 8a9c943661c227ee58b2d744dee2c92f0f61a51b
Author: liming.1018 <[email protected]>
AuthorDate: Mon Jul 24 15:42:58 2023 +0800
[FLINK-32655][runtime] Fix checkpoint aborted message being swallowed by
RecreateOnResetOperatorCoordinator.
reformat tests and fix checkstyle
Co-authored-by: Rui Fan <[email protected]>
---
.../coordination/RecreateOnResetOperatorCoordinator.java | 5 +++++
.../RecreateOnResetOperatorCoordinatorTest.java | 15 +++++++++++++++
.../coordination/TestingOperatorCoordinator.java | 12 ++++++++++++
3 files changed, 32 insertions(+)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 219bae1b6a8..0d03f868900 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -115,6 +115,11 @@ public class RecreateOnResetOperatorCoordinator implements
OperatorCoordinator {
coordinator.applyCall("checkpointComplete", c ->
c.notifyCheckpointComplete(checkpointId));
}
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) {
+ coordinator.applyCall("checkpointAborted", c ->
c.notifyCheckpointAborted(checkpointId));
+ }
+
@Override
public void resetToCheckpoint(final long checkpointId, @Nullable final
byte[] checkpointData) {
// First bump up the coordinator epoch to fence out the active
coordinator.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
index 86647653117..e0115bd4ccb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -264,6 +264,21 @@ public class RecreateOnResetOperatorCoordinatorTest {
"Timed out when waiting for the coordinator to close.");
}
+ @Test
+ public void testNotifyCheckpointAbortedSuccess() throws Exception {
+ TestingCoordinatorProvider provider = new
TestingCoordinatorProvider(null);
+ MockOperatorCoordinatorContext context =
+ new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+ RecreateOnResetOperatorCoordinator coordinator =
createCoordinator(provider, context);
+ TestingOperatorCoordinator internalCoordinatorAfterReset =
+ getInternalCoordinator(coordinator);
+
+ long checkpointId = 10L;
+ coordinator.notifyCheckpointAborted(checkpointId);
+ assertThat(internalCoordinatorAfterReset.getLastCheckpointAborted())
+ .isEqualTo(checkpointId);
+ }
+
// ---------------
private RecreateOnResetOperatorCoordinator createCoordinator(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 5f6fedff2e4..08b2a3ff9ff 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -54,6 +54,8 @@ public class TestingOperatorCoordinator implements
OperatorCoordinator {
private final BlockingQueue<Long> lastCheckpointComplete;
+ private final BlockingQueue<Long> lastCheckpointAborted;
+
private final BlockingQueue<OperatorEvent> receivedOperatorEvents;
private final Map<Integer, SubtaskGateway> subtaskGateways;
@@ -72,6 +74,7 @@ public class TestingOperatorCoordinator implements
OperatorCoordinator {
this.context = context;
this.triggeredCheckpoints = new LinkedBlockingQueue<>();
this.lastCheckpointComplete = new LinkedBlockingQueue<>();
+ this.lastCheckpointAborted = new LinkedBlockingQueue<>();
this.receivedOperatorEvents = new LinkedBlockingQueue<>();
this.blockOnCloseLatch = blockOnCloseLatch;
this.subtaskGateways = new HashMap<>();
@@ -125,6 +128,11 @@ public class TestingOperatorCoordinator implements
OperatorCoordinator {
lastCheckpointComplete.offer(checkpointId);
}
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) {
+ lastCheckpointAborted.offer(checkpointId);
+ }
+
@Override
public void resetToCheckpoint(long checkpointId, @Nullable byte[]
checkpointData) {
if (resetToCheckpointConsumer != null) {
@@ -185,6 +193,10 @@ public class TestingOperatorCoordinator implements
OperatorCoordinator {
return lastCheckpointComplete.take();
}
+ public long getLastCheckpointAborted() throws InterruptedException {
+ return lastCheckpointAborted.take();
+ }
+
@Nullable
public OperatorEvent getNextReceivedOperatorEvent() {
return receivedOperatorEvents.poll();