This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4910076bafc [FLINK-29250][rpc] Drop RcService#getTerminationFuture
4910076bafc is described below
commit 4910076bafc7d5d6091e3dae505d0eabbff0b72d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Sep 16 17:33:14 2022 +0200
[FLINK-29250][rpc] Drop RcService#getTerminationFuture
---
.../flink/runtime/rpc/akka/AkkaRpcService.java | 5 --
.../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 67 +++++++++++-----------
.../org/apache/flink/runtime/rpc/RpcService.java | 7 ---
.../flink/runtime/rpc/TestingRpcService.java | 5 --
.../OperatorEventSendingCheckpointITCase.java | 5 --
5 files changed, 35 insertions(+), 54 deletions(-)
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 40bb5bbbb09..c00db1dad3d 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -450,11 +450,6 @@ public class AkkaRpcService implements RpcService {
return rpcEndpoint.getTerminationFuture();
}
- @Override
- public CompletableFuture<Void> getTerminationFuture() {
- return terminationFuture;
- }
-
@Override
public ScheduledExecutor getScheduledExecutor() {
return internalScheduledExecutor;
diff --git
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 1db75ee82bb..2998f1af5b3 100644
---
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -39,6 +39,7 @@ import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -128,20 +129,6 @@ class AkkaRpcServiceTest {
.isEqualTo(AkkaUtils.getAddress(actorSystem).port().get());
}
- /** Tests that we can wait for the termination of the rpc service. */
- @Test
- void testTerminationFuture() throws Exception {
- final AkkaRpcService rpcService = startAkkaRpcService();
-
- CompletableFuture<Void> terminationFuture =
rpcService.getTerminationFuture();
-
- assertThat(terminationFuture).isNotDone();
-
- rpcService.stopService();
-
- terminationFuture.get();
- }
-
/**
* Tests a simple scheduled runnable being executed by the RPC services
scheduled executor
* service.
@@ -279,16 +266,15 @@ class AkkaRpcServiceTest {
try {
final int numberActors = 5;
- CompletableFuture<Void> terminationFuture =
akkaRpcService.getTerminationFuture();
-
- final Collection<CompletableFuture<Void>> onStopFutures =
+ final RpcServiceShutdownTestHelper rpcServiceShutdownTestHelper =
startStopNCountingAsynchronousOnStopEndpoints(akkaRpcService, numberActors);
- for (CompletableFuture<Void> onStopFuture : onStopFutures) {
+ for (CompletableFuture<Void> onStopFuture :
+ rpcServiceShutdownTestHelper.getStopFutures()) {
onStopFuture.complete(null);
}
- terminationFuture.get();
+ rpcServiceShutdownTestHelper.waitForRpcServiceTermination();
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted()).isTrue();
} finally {
RpcUtils.terminateRpcService(akkaRpcService);
@@ -305,12 +291,11 @@ class AkkaRpcServiceTest {
final int numberActors = 5;
- CompletableFuture<Void> terminationFuture =
akkaRpcService.getTerminationFuture();
-
- final Collection<CompletableFuture<Void>> onStopFutures =
+ final RpcServiceShutdownTestHelper rpcServiceShutdownTestHelper =
startStopNCountingAsynchronousOnStopEndpoints(akkaRpcService,
numberActors);
- Iterator<CompletableFuture<Void>> iterator = onStopFutures.iterator();
+ final Iterator<CompletableFuture<Void>> iterator =
+ rpcServiceShutdownTestHelper.getStopFutures().iterator();
for (int i = 0; i < numberActors - 1; i++) {
iterator.next().complete(null);
@@ -318,11 +303,7 @@ class AkkaRpcServiceTest {
iterator.next().completeExceptionally(new OnStopException("onStop
exception occurred."));
- for (CompletableFuture<Void> onStopFuture : onStopFutures) {
- onStopFuture.complete(null);
- }
-
- assertThatThrownBy(() -> terminationFuture.get())
+
assertThatThrownBy(rpcServiceShutdownTestHelper::waitForRpcServiceTermination)
.satisfies(FlinkAssertions.anyCauseMatches(OnStopException.class));
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted()).isTrue();
@@ -349,8 +330,8 @@ class AkkaRpcServiceTest {
}
}
- private Collection<CompletableFuture<Void>>
startStopNCountingAsynchronousOnStopEndpoints(
- AkkaRpcService akkaRpcService, int numberActors) throws Exception {
+ private static RpcServiceShutdownTestHelper
startStopNCountingAsynchronousOnStopEndpoints(
+ AkkaRpcService akkaRpcService, int numberActors) throws
InterruptedException {
final Collection<CompletableFuture<Void>> onStopFutures = new
ArrayList<>(numberActors);
final CountDownLatch countDownLatch = new CountDownLatch(numberActors);
@@ -366,12 +347,34 @@ class AkkaRpcServiceTest {
CompletableFuture<Void> terminationFuture =
akkaRpcService.stopService();
+ countDownLatch.await();
+
assertThat(terminationFuture).isNotDone();
assertThat(akkaRpcService.getActorSystem().whenTerminated().isCompleted()).isFalse();
- countDownLatch.await();
+ return new RpcServiceShutdownTestHelper(
+ Collections.unmodifiableCollection(onStopFutures),
terminationFuture);
+ }
+
+ private static class RpcServiceShutdownTestHelper {
+
+ private final Collection<CompletableFuture<Void>> stopFutures;
+ private final CompletableFuture<Void> terminationFuture;
- return onStopFutures;
+ public RpcServiceShutdownTestHelper(
+ Collection<CompletableFuture<Void>> stopFutures,
+ CompletableFuture<Void> terminationFuture) {
+ this.stopFutures = stopFutures;
+ this.terminationFuture = terminationFuture;
+ }
+
+ public Collection<CompletableFuture<Void>> getStopFutures() {
+ return stopFutures;
+ }
+
+ public void waitForRpcServiceTermination() throws ExecutionException,
InterruptedException {
+ terminationFuture.get();
+ }
}
@Nonnull
diff --git
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 4edf3f70e10..1eaa01c5906 100644
---
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -99,13 +99,6 @@ public interface RpcService {
*/
CompletableFuture<Void> stopService();
- /**
- * Returns a future indicating when the RPC service has been shut down.
- *
- * @return Termination future
- */
- CompletableFuture<Void> getTerminationFuture();
-
/**
* Gets a scheduled executor from the RPC service. This executor can be
used to schedule tasks
* to be executed in the future.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index b2c34c1b958..f73bdcc4674 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -199,11 +199,6 @@ public class TestingRpcService implements RpcService {
backingRpcService.stopServer(selfGateway);
}
- @Override
- public CompletableFuture<Void> getTerminationFuture() {
- return backingRpcService.getTerminationFuture();
- }
-
@Override
public ScheduledExecutor getScheduledExecutor() {
return backingRpcService.getScheduledExecutor();
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index 9757de12a2d..f54fd30ff5a 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -504,11 +504,6 @@ public class OperatorEventSendingCheckpointITCase extends
TestLogger {
return rpcService.stopService();
}
- @Override
- public CompletableFuture<Void> getTerminationFuture() {
- return rpcService.getTerminationFuture();
- }
-
@Override
public ScheduledExecutor getScheduledExecutor() {
return rpcService.getScheduledExecutor();