This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d3ca598ab37d3dfec5b01e19080874e541212af
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Jul 8 10:51:54 2021 +0200

    [FLINK-21368] Remove RpcService#getExecutor
---
 .../apache/flink/runtime/rpc/akka/AkkaRpcService.java    | 16 ++++------------
 .../apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java  |  2 +-
 .../java/org/apache/flink/runtime/rpc/RpcService.java    | 16 +---------------
 .../flink/runtime/registration/RetryingRegistration.java |  8 ++++----
 .../runtime/taskexecutor/DefaultJobLeaderService.java    |  2 +-
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java  |  6 +++---
 .../registration/RegisteredRpcConnectionTest.java        | 10 +++++-----
 .../runtime/registration/RetryingRegistrationTest.java   |  8 +++++---
 .../apache/flink/runtime/rpc/FencedRpcEndpointTest.java  |  2 +-
 .../taskexecutor/TaskSubmissionTestEnvironment.java      |  4 +++-
 .../OperatorEventSendingCheckpointITCase.java            |  6 ------
 11 files changed, 28 insertions(+), 52 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index bba81a7..4c1a788 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -40,6 +40,7 @@ import org.apache.flink.util.TimeUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.function.FunctionUtils;
 
 import akka.actor.AbstractActor;
 import akka.actor.ActorRef;
@@ -48,7 +49,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
-import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +69,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
@@ -77,7 +76,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import scala.Option;
-import scala.concurrent.Future;
 import scala.reflect.ClassTag$;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -468,11 +466,6 @@ public class AkkaRpcService implements RpcService {
     }
 
     @Override
-    public Executor getExecutor() {
-        return actorSystem.dispatcher();
-    }
-
-    @Override
     public ScheduledExecutor getScheduledExecutor() {
         return internalScheduledExecutor;
     }
@@ -488,14 +481,13 @@ public class AkkaRpcService implements RpcService {
 
     @Override
     public void execute(Runnable runnable) {
-        actorSystem.dispatcher().execute(runnable);
+        getScheduledExecutor().execute(runnable);
     }
 
     @Override
     public <T> CompletableFuture<T> execute(Callable<T> callable) {
-        Future<T> scalaFuture = Futures.<T>future(callable, 
actorSystem.dispatcher());
-
-        return AkkaFutureUtils.toJava(scalaFuture);
+        return CompletableFuture.supplyAsync(
+                FunctionUtils.uncheckedSupplier(callable::call), 
getScheduledExecutor());
     }
 
     // 
---------------------------------------------------------------------------------------
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 01437e4..b522d44 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -192,7 +192,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
         assertFalse(terminationFuture.isDone());
 
-        CompletableFuture.runAsync(rpcEndpoint::closeAsync, 
akkaRpcService.getExecutor());
+        CompletableFuture.runAsync(rpcEndpoint::closeAsync, 
akkaRpcService.getScheduledExecutor());
 
         // wait until the rpc endpoint has terminated
         terminationFuture.get();
diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index eb36deb..d8c2d66 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
 import java.io.Serializable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -125,19 +124,6 @@ public interface RpcService {
     CompletableFuture<Void> getTerminationFuture();
 
     /**
-     * Gets the executor, provided by this RPC service. This executor can be 
used for example for
-     * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of 
futures.
-     *
-     * <p><b>IMPORTANT:</b> This executor does not isolate the method 
invocations against any
-     * concurrent invocations and is therefore not suitable to run completion 
methods of futures
-     * that modify state of an {@link RpcEndpoint}. For such operations, one 
needs to use the {@link
-     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that 
{@code RpcEndpoint}.
-     *
-     * @return The execution context provided by the RPC service
-     */
-    Executor getExecutor();
-
-    /**
      * Gets a scheduled executor from the RPC service. This executor can be 
used to schedule tasks
      * to be executed in the future.
      *
@@ -152,7 +138,7 @@ public interface RpcService {
 
     /**
      * Execute the runnable in the execution context of this RPC Service, as 
returned by {@link
-     * #getExecutor()}, after a scheduled delay.
+     * #getScheduledExecutor()} ()}, after a scheduled delay.
      *
      * @param runnable Runnable to be executed
      * @param delay The delay after which the runnable will be executed
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index ec20dbf..428efcf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -166,7 +166,7 @@ public abstract class RetryingRegistration<
                                         retryingRegistrationConfiguration
                                                 
.getInitialRegistrationTimeoutMillis());
                             },
-                            rpcService.getExecutor());
+                            rpcService.getScheduledExecutor());
 
             // upon failure, retry, unless this is cancelled
             rpcGatewayAcceptFuture.whenCompleteAsync(
@@ -194,7 +194,7 @@ public abstract class RetryingRegistration<
                                     
retryingRegistrationConfiguration.getErrorDelayMillis());
                         }
                     },
-                    rpcService.getExecutor());
+                    rpcService.getScheduledExecutor());
         } catch (Throwable t) {
             completionFuture.completeExceptionally(t);
             cancel();
@@ -272,7 +272,7 @@ public abstract class RetryingRegistration<
                                     }
                                 }
                             },
-                            rpcService.getExecutor());
+                            rpcService.getScheduledExecutor());
 
             // upon failure, retry
             registrationAcceptFuture.whenCompleteAsync(
@@ -320,7 +320,7 @@ public abstract class RetryingRegistration<
                             }
                         }
                     },
-                    rpcService.getExecutor());
+                    rpcService.getScheduledExecutor());
         } catch (Throwable t) {
             completionFuture.completeExceptionally(t);
             cancel();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java
index a5d206e..3ab1fe7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.java
@@ -341,7 +341,7 @@ public class DefaultJobLeaderService implements 
JobLeaderService {
             currentJobMasterId = jobMasterId;
             rpcConnection =
                     new JobManagerRegisteredRpcConnection(
-                            LOG, leaderAddress, jobMasterId, 
rpcService.getExecutor());
+                            LOG, leaderAddress, jobMasterId, 
rpcService.getScheduledExecutor());
 
             LOG.info(
                     "Try to register at job manager {} with leader id {}.",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8ab3c2d..9e0474e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -729,7 +729,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                             taskMetricGroup,
                             resultPartitionConsumableNotifier,
                             partitionStateChecker,
-                            getRpcService().getExecutor());
+                            getRpcService().getScheduledExecutor());
 
             taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, 
task::isBackPressured);
 
@@ -870,7 +870,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                         task.failExternally(e);
                                     }
                                 },
-                                getRpcService().getExecutor()));
+                                getRpcService().getScheduledExecutor()));
             }
             return CompletableFuture.completedFuture(Acknowledge.get());
         } else {
@@ -1723,7 +1723,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         ResultPartitionConsumableNotifier resultPartitionConsumableNotifier =
                 new RpcResultPartitionConsumableNotifier(
                         jobMasterGateway,
-                        getRpcService().getExecutor(),
+                        getRpcService().getScheduledExecutor(),
                         taskManagerConfiguration.getRpcTimeout());
 
         PartitionProducerStateChecker partitionStateChecker =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
index 7373e86..7f66424 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -78,7 +78,7 @@ public class RegisteredRpcConnectionTest extends TestLogger {
                     new TestRpcConnection(
                             testRpcConnectionEndpointAddress,
                             leaderId,
-                            rpcService.getExecutor(),
+                            rpcService.getScheduledExecutor(),
                             rpcService);
             connection.start();
 
@@ -125,7 +125,7 @@ public class RegisteredRpcConnectionTest extends TestLogger 
{
                 new TestRpcConnection(
                         testRpcConnectionEndpointAddress,
                         leaderId,
-                        rpcService.getExecutor(),
+                        rpcService.getScheduledExecutor(),
                         rpcService);
         connection.start();
 
@@ -162,7 +162,7 @@ public class RegisteredRpcConnectionTest extends TestLogger 
{
                 new TestRpcConnection(
                         testRegistrationGateway.getAddress(),
                         UUID.randomUUID(),
-                        rpcService.getExecutor(),
+                        rpcService.getScheduledExecutor(),
                         rpcService);
         connection.start();
 
@@ -194,7 +194,7 @@ public class RegisteredRpcConnectionTest extends TestLogger 
{
                     new TestRpcConnection(
                             testRpcConnectionEndpointAddress,
                             leaderId,
-                            rpcService.getExecutor(),
+                            rpcService.getScheduledExecutor(),
                             rpcService);
             connection.start();
             // close the connection
@@ -226,7 +226,7 @@ public class RegisteredRpcConnectionTest extends TestLogger 
{
                 new TestRpcConnection(
                         testRpcConnectionEndpointAddress,
                         leaderId,
-                        rpcService.getExecutor(),
+                        rpcService.getScheduledExecutor(),
                         rpcService);
         connection.start();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index f1a1920..95c2a6f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.testutils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import org.junit.After;
 import org.junit.Before;
@@ -36,7 +37,7 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -146,7 +147,7 @@ public class RetryingRegistrationTest extends TestLogger {
         final String testId = "laissez les bon temps roulez";
         final UUID leaderId = UUID.randomUUID();
 
-        ExecutorService executor = TestingUtils.defaultExecutor();
+        ScheduledExecutorService executor = TestingUtils.defaultExecutor();
         ManualResponseTestRegistrationGateway testGateway =
                 new ManualResponseTestRegistrationGateway(new 
TestRegistrationSuccess(testId));
 
@@ -162,7 +163,8 @@ public class RetryingRegistrationTest extends TestLogger {
                             CompletableFuture.completedFuture(
                                     testGateway) // second connection attempt 
succeeds
                             );
-            when(rpc.getExecutor()).thenReturn(executor);
+            when(rpc.getScheduledExecutor())
+                    .thenReturn(new ScheduledExecutorServiceAdapter(executor));
             when(rpc.scheduleRunnable(any(Runnable.class), anyLong(), 
any(TimeUnit.class)))
                     .thenAnswer(
                             (InvocationOnMock invocation) -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index b833c51..4dc2710 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -382,7 +382,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 
                                 return value;
                             },
-                            getRpcService().getExecutor())
+                            getRpcService().getScheduledExecutor())
                     .thenApplyAsync((String v) -> Acknowledge.get(), 
getMainThreadExecutor());
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 86580f6..4030d78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -212,7 +212,9 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                             new TestCheckpointResponder(),
                             new TestGlobalAggregateManager(),
                             new RpcResultPartitionConsumableNotifier(
-                                    jobMasterGateway, 
testingRpcService.getExecutor(), timeout),
+                                    jobMasterGateway,
+                                    testingRpcService.getScheduledExecutor(),
+                                    timeout),
                             TestingPartitionProducerStateChecker.newBuilder()
                                     .setPartitionProducerStateFunction(
                                             (jobID, intermediateDataSetID, 
resultPartitionID) ->
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index e31336c..944620f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -73,7 +73,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -512,11 +511,6 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
         }
 
         @Override
-        public Executor getExecutor() {
-            return rpcService.getExecutor();
-        }
-
-        @Override
         public ScheduledExecutor getScheduledExecutor() {
             return rpcService.getScheduledExecutor();
         }

Reply via email to