This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5d3ca598ab37d3dfec5b01e19080874e541212af Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Jul 8 10:51:54 2021 +0200 [FLINK-21368] Remove RpcService#getExecutor --- .../apache/flink/runtime/rpc/akka/AkkaRpcService.java | 16 ++++------------ .../apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java | 2 +- .../java/org/apache/flink/runtime/rpc/RpcService.java | 16 +--------------- .../flink/runtime/registration/RetryingRegistration.java | 8 ++++---- .../runtime/taskexecutor/DefaultJobLeaderService.java | 2 +- .../apache/flink/runtime/taskexecutor/TaskExecutor.java | 6 +++--- .../registration/RegisteredRpcConnectionTest.java | 10 +++++----- .../runtime/registration/RetryingRegistrationTest.java | 8 +++++--- .../apache/flink/runtime/rpc/FencedRpcEndpointTest.java | 2 +- .../taskexecutor/TaskSubmissionTestEnvironment.java | 4 +++- .../OperatorEventSendingCheckpointITCase.java | 6 ------ 11 files changed, 28 insertions(+), 52 deletions(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index bba81a7..4c1a788 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -40,6 +40,7 @@ import org.apache.flink.util.TimeUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; +import org.apache.flink.util.function.FunctionUtils; import akka.actor.AbstractActor; import akka.actor.ActorRef; @@ -48,7 +49,6 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.DeadLetter; import akka.actor.Props; -import akka.dispatch.Futures; import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +69,6 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; @@ -77,7 +76,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import scala.Option; -import scala.concurrent.Future; import scala.reflect.ClassTag$; import static org.apache.flink.util.Preconditions.checkArgument; @@ -468,11 +466,6 @@ public class AkkaRpcService implements RpcService { } @Override - public Executor getExecutor() { - return actorSystem.dispatcher(); - } - - @Override public ScheduledExecutor getScheduledExecutor() { return internalScheduledExecutor; } @@ -488,14 +481,13 @@ public class AkkaRpcService implements RpcService { @Override public void execute(Runnable runnable) { - actorSystem.dispatcher().execute(runnable); + getScheduledExecutor().execute(runnable); } @Override public <T> CompletableFuture<T> execute(Callable<T> callable) { - Future<T> scalaFuture = Futures.<T>future(callable, actorSystem.dispatcher()); - - return AkkaFutureUtils.toJava(scalaFuture); + return CompletableFuture.supplyAsync( + FunctionUtils.uncheckedSupplier(callable::call), getScheduledExecutor()); } // --------------------------------------------------------------------------------------- diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 01437e4..b522d44 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -192,7 +192,7 @@ public class AkkaRpcActorTest extends TestLogger { assertFalse(terminationFuture.isDone()); - CompletableFuture.runAsync(rpcEndpoint::closeAsync, akkaRpcService.getExecutor()); + CompletableFuture.runAsync(rpcEndpoint::closeAsync, akkaRpcService.getScheduledExecutor()); // wait until the rpc endpoint has terminated terminationFuture.get(); diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index eb36deb..d8c2d66 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -24,7 +24,6 @@ import org.apache.flink.util.concurrent.ScheduledExecutor; import java.io.Serializable; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -125,19 +124,6 @@ public interface RpcService { CompletableFuture<Void> getTerminationFuture(); /** - * Gets the executor, provided by this RPC service. This executor can be used for example for - * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures. - * - * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any - * concurrent invocations and is therefore not suitable to run completion methods of futures - * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link - * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}. - * - * @return The execution context provided by the RPC service - */ - Executor getExecutor(); - - /** * Gets a scheduled executor from the RPC service. This executor can be used to schedule tasks * to be executed in the future. * @@ -152,7 +138,7 @@ public interface RpcService { /** * Execute the runnable in the execution context of this RPC Service, as returned by {@link - * #getExecutor()}, after a scheduled delay. + * #getScheduledExecutor()} ()}, after a scheduled delay. * * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index ec20dbf..428efcf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -166,7 +166,7 @@ public abstract class RetryingRegistration< retryingRegistrationConfiguration .getInitialRegistrationTimeoutMillis()); }, - rpcService.getExecutor()); + rpcService.getScheduledExecutor()); // upon failure, retry, unless this is cancelled rpcGatewayAcceptFuture.whenCompleteAsync( @@ -194,7 +194,7 @@ public abstract class RetryingRegistration< retryingRegistrationConfiguration.getErrorDelayMillis()); } }, - rpcService.getExecutor()); + rpcService.getScheduledExecutor()); } catch (Throwable t) { completionFuture.completeExceptionally(t); cancel(); @@ -272,7 +272,7 @@ public abstract class RetryingRegistration< } } }, - rpcService.getExecutor()); + rpcService.getScheduledExecutor()); // upon failure, retry registrationAcceptFuture.whenCompleteAsync( @@ -320,7 +320,7 @@ public abstract class RetryingRegistration< } } }, - rpcService.getExecutor()); + rpcService.getScheduledExecutor()); } catch (Throwable t) { completionFuture.completeExceptionally(t); cancel(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java index a5d206e..3ab1fe7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java @@ -341,7 +341,7 @@ public class DefaultJobLeaderService implements JobLeaderService { currentJobMasterId = jobMasterId; rpcConnection = new JobManagerRegisteredRpcConnection( - LOG, leaderAddress, jobMasterId, rpcService.getExecutor()); + LOG, leaderAddress, jobMasterId, rpcService.getScheduledExecutor()); LOG.info( "Try to register at job manager {} with leader id {}.", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 8ab3c2d..9e0474e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -729,7 +729,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, - getRpcService().getExecutor()); + getRpcService().getScheduledExecutor()); taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured); @@ -870,7 +870,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { task.failExternally(e); } }, - getRpcService().getExecutor())); + getRpcService().getScheduledExecutor())); } return CompletableFuture.completedFuture(Acknowledge.get()); } else { @@ -1723,7 +1723,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( jobMasterGateway, - getRpcService().getExecutor(), + getRpcService().getScheduledExecutor(), taskManagerConfiguration.getRpcTimeout()); PartitionProducerStateChecker partitionStateChecker = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java index 7373e86..7f66424 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java @@ -78,7 +78,7 @@ public class RegisteredRpcConnectionTest extends TestLogger { new TestRpcConnection( testRpcConnectionEndpointAddress, leaderId, - rpcService.getExecutor(), + rpcService.getScheduledExecutor(), rpcService); connection.start(); @@ -125,7 +125,7 @@ public class RegisteredRpcConnectionTest extends TestLogger { new TestRpcConnection( testRpcConnectionEndpointAddress, leaderId, - rpcService.getExecutor(), + rpcService.getScheduledExecutor(), rpcService); connection.start(); @@ -162,7 +162,7 @@ public class RegisteredRpcConnectionTest extends TestLogger { new TestRpcConnection( testRegistrationGateway.getAddress(), UUID.randomUUID(), - rpcService.getExecutor(), + rpcService.getScheduledExecutor(), rpcService); connection.start(); @@ -194,7 +194,7 @@ public class RegisteredRpcConnectionTest extends TestLogger { new TestRpcConnection( testRpcConnectionEndpointAddress, leaderId, - rpcService.getExecutor(), + rpcService.getScheduledExecutor(), rpcService); connection.start(); // close the connection @@ -226,7 +226,7 @@ public class RegisteredRpcConnectionTest extends TestLogger { new TestRpcConnection( testRpcConnectionEndpointAddress, leaderId, - rpcService.getExecutor(), + rpcService.getScheduledExecutor(), rpcService); connection.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index f1a1920..95c2a6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.testutils.TestingUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; import org.junit.After; import org.junit.Before; @@ -36,7 +37,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -146,7 +147,7 @@ public class RetryingRegistrationTest extends TestLogger { final String testId = "laissez les bon temps roulez"; final UUID leaderId = UUID.randomUUID(); - ExecutorService executor = TestingUtils.defaultExecutor(); + ScheduledExecutorService executor = TestingUtils.defaultExecutor(); ManualResponseTestRegistrationGateway testGateway = new ManualResponseTestRegistrationGateway(new TestRegistrationSuccess(testId)); @@ -162,7 +163,8 @@ public class RetryingRegistrationTest extends TestLogger { CompletableFuture.completedFuture( testGateway) // second connection attempt succeeds ); - when(rpc.getExecutor()).thenReturn(executor); + when(rpc.getScheduledExecutor()) + .thenReturn(new ScheduledExecutorServiceAdapter(executor)); when(rpc.scheduleRunnable(any(Runnable.class), anyLong(), any(TimeUnit.class))) .thenAnswer( (InvocationOnMock invocation) -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java index b833c51..4dc2710 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java @@ -382,7 +382,7 @@ public class FencedRpcEndpointTest extends TestLogger { return value; }, - getRpcService().getExecutor()) + getRpcService().getScheduledExecutor()) .thenApplyAsync((String v) -> Acknowledge.get(), getMainThreadExecutor()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 86580f6..4030d78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -212,7 +212,9 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { new TestCheckpointResponder(), new TestGlobalAggregateManager(), new RpcResultPartitionConsumableNotifier( - jobMasterGateway, testingRpcService.getExecutor(), timeout), + jobMasterGateway, + testingRpcService.getScheduledExecutor(), + timeout), TestingPartitionProducerStateChecker.newBuilder() .setPartitionProducerStateFunction( (jobID, intermediateDataSetID, resultPartitionID) -> diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java index e31336c..944620f 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java @@ -73,7 +73,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -512,11 +511,6 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { } @Override - public Executor getExecutor() { - return rpcService.getExecutor(); - } - - @Override public ScheduledExecutor getScheduledExecutor() { return rpcService.getScheduledExecutor(); }
