This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0bbee3047ed [FLINK-29249][rpc] Drop RpcService#execute/scheduleRunnable
0bbee3047ed is described below
commit 0bbee3047ed81d39d25fd736946c527e627a2bad
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Sep 13 20:42:58 2022 +0200
[FLINK-29249][rpc] Drop RpcService#execute/scheduleRunnable
---
.../flink/runtime/rpc/akka/AkkaRpcService.java | 24 -----
.../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 29 +-----
.../rpc/akka/ContextClassLoadingSettingTest.java | 103 ++++++++++++++-------
.../org/apache/flink/runtime/rpc/RpcService.java | 40 --------
.../runtime/registration/RetryingRegistration.java | 19 ++--
.../runtime/metrics/util/MetricUtilsTest.java | 7 +-
.../registration/RetryingRegistrationTest.java | 10 --
.../flink/runtime/rpc/TestingRpcService.java | 18 ----
.../OperatorEventSendingCheckpointITCase.java | 17 ----
9 files changed, 91 insertions(+), 176 deletions(-)
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 0b5a4819164..40bb5bbbb09 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -38,7 +38,6 @@ import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.apache.flink.util.function.FunctionUtils;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
@@ -64,12 +63,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -79,7 +76,6 @@ import scala.reflect.ClassTag$;
import static
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.guardCompletionWithContextClassLoader;
import static
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader;
import static
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.withContextClassLoader;
-import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -464,26 +460,6 @@ public class AkkaRpcService implements RpcService {
return internalScheduledExecutor;
}
- @Override
- public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay,
TimeUnit unit) {
- checkNotNull(runnable, "runnable");
- checkNotNull(unit, "unit");
- checkArgument(delay >= 0L, "delay must be zero or larger");
-
- return internalScheduledExecutor.schedule(runnable, delay, unit);
- }
-
- @Override
- public void execute(Runnable runnable) {
- getScheduledExecutor().execute(runnable);
- }
-
- @Override
- public <T> CompletableFuture<T> execute(Callable<T> callable) {
- return CompletableFuture.supplyAsync(
- FunctionUtils.uncheckedSupplier(callable::call),
getScheduledExecutor());
- }
-
//
---------------------------------------------------------------------------------------
// Private helper methods
//
---------------------------------------------------------------------------------------
diff --git
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index f4512f81cd1..1db75ee82bb 100644
---
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -85,7 +85,6 @@ class AkkaRpcServiceTest {
// ------------------------------------------------------------------------
// tests
// ------------------------------------------------------------------------
-
@Test
void testScheduleRunnable() throws Exception {
final OneShotLatch latch = new OneShotLatch();
@@ -93,7 +92,9 @@ class AkkaRpcServiceTest {
final long start = System.nanoTime();
ScheduledFuture<?> scheduledFuture =
- akkaRpcService.scheduleRunnable(latch::trigger, delay,
TimeUnit.MILLISECONDS);
+ akkaRpcService
+ .getScheduledExecutor()
+ .schedule(latch::trigger, delay,
TimeUnit.MILLISECONDS);
scheduledFuture.get();
@@ -110,33 +111,11 @@ class AkkaRpcServiceTest {
void testExecuteRunnable() throws Exception {
final OneShotLatch latch = new OneShotLatch();
- akkaRpcService.execute(latch::trigger);
+ akkaRpcService.getScheduledExecutor().execute(latch::trigger);
latch.await(30L, TimeUnit.SECONDS);
}
- /**
- * Tests that the {@link AkkaRpcService} can execute callables and returns
their result as a
- * {@link CompletableFuture}.
- */
- @Test
- void testExecuteCallable() throws Exception {
- final OneShotLatch latch = new OneShotLatch();
- final int expected = 42;
-
- CompletableFuture<Integer> result =
- akkaRpcService.execute(
- () -> {
- latch.trigger();
- return expected;
- });
-
- int actual = result.get(30L, TimeUnit.SECONDS);
-
- assertThat(actual).isEqualTo(expected);
- assertThat(latch.isTriggered()).isTrue();
- }
-
@Test
void testGetAddress() {
assertThat(akkaRpcService.getAddress())
diff --git
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
index 48318d5a36e..c3bfc0f600b 100644
---
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
+++
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
@@ -40,8 +40,9 @@ import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.concurrent.Callable;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -107,49 +108,89 @@ class ContextClassLoadingSettingTest {
void testAkkaRpcService_ExecuteRunnableSetsFlinkContextClassLoader()
throws ExecutionException, InterruptedException {
final CompletableFuture<ClassLoader> contextClassLoader = new
CompletableFuture<>();
- akkaRpcService.execute(
- () ->
contextClassLoader.complete(Thread.currentThread().getContextClassLoader()));
- assertIsFlinkClassLoader(contextClassLoader.get());
+ akkaRpcService
+ .getScheduledExecutor()
+ .execute(
+ () ->
+ contextClassLoader.complete(
+
Thread.currentThread().getContextClassLoader()));
+ assertThat(contextClassLoader.get()).isSameAs(pretendFlinkClassLoader);
}
@Test
- void testAkkaRpcService_ExecuteCallableSetsFlinkContextClassLoader()
+ void testAkkaRpcService_ScheduleCallableSetsFlinkContextClassLoader()
throws ExecutionException, InterruptedException {
- final CompletableFuture<ClassLoader> contextClassLoader =
- akkaRpcService.execute(() ->
Thread.currentThread().getContextClassLoader());
- assertIsFlinkClassLoader(contextClassLoader.get());
+ final ClassLoader contextClassLoader =
+ akkaRpcService
+ .getScheduledExecutor()
+ .schedule(
+ () ->
Thread.currentThread().getContextClassLoader(),
+ 0,
+ TimeUnit.MILLISECONDS)
+ .get();
+ assertThat(contextClassLoader).isSameAs(pretendFlinkClassLoader);
}
@Test
- void
testAkkaRpcService_ExecuteCallableResultCompletedWithFlinkContextClassLoader()
+ void testAkkaRpcService_ScheduleRunnableSetsFlinkContextClassLoader()
throws ExecutionException, InterruptedException {
-
- final CompletableFuture<Void> blocker = new CompletableFuture<>();
-
- final CompletableFuture<ClassLoader> contextClassLoader =
- runWithContextClassLoader(
+ final CompletableFuture<ClassLoader> contextClassLoader = new
CompletableFuture<>();
+ akkaRpcService
+ .getScheduledExecutor()
+ .schedule(
() ->
- akkaRpcService
- .execute((Callable<Void>) blocker::get)
- .thenApply(
- ignored ->
- Thread.currentThread()
-
.getContextClassLoader()),
- testClassLoader);
- blocker.complete(null);
+ contextClassLoader.complete(
+
Thread.currentThread().getContextClassLoader()),
+ 5,
+ TimeUnit.MILLISECONDS);
+ assertThat(contextClassLoader.get()).isSameAs(pretendFlinkClassLoader);
+ }
- assertIsFlinkClassLoader(contextClassLoader.get());
+ @Test
+ void
testAkkaRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader() {
+ final int numberOfScheduledRuns = 2;
+ final List<ClassLoader> contextClassLoaders = new
ArrayList<>(numberOfScheduledRuns);
+ akkaRpcService
+ .getScheduledExecutor()
+ .scheduleAtFixedRate(
+ () -> {
+ if (contextClassLoaders.size() <
numberOfScheduledRuns) {
+ contextClassLoaders.add(
+
Thread.currentThread().getContextClassLoader());
+ } else {
+ throw new RuntimeException("cancel task");
+ }
+ },
+ 0,
+ 1,
+ TimeUnit.MILLISECONDS);
+
+ assertThat(contextClassLoaders)
+ .allSatisfy(
+ classLoader ->
assertThat(classLoader).isSameAs(pretendFlinkClassLoader));
}
@Test
- void testAkkaRpcService_ScheduleSetsFlinkContextClassLoader()
- throws ExecutionException, InterruptedException {
- final CompletableFuture<ClassLoader> contextClassLoader = new
CompletableFuture<>();
- akkaRpcService.scheduleRunnable(
- () ->
contextClassLoader.complete(Thread.currentThread().getContextClassLoader()),
- 5,
- TimeUnit.MILLISECONDS);
- assertThat(contextClassLoader.get()).isSameAs(pretendFlinkClassLoader);
+ void
testAkkaRpcService_ScheduleRunnableWithFixedDelaySetsFlinkContextClassLoader() {
+ final int numberOfScheduledRuns = 2;
+ final List<ClassLoader> contextClassLoaders = new
ArrayList<>(numberOfScheduledRuns);
+ akkaRpcService
+ .getScheduledExecutor()
+ .scheduleWithFixedDelay(
+ () -> {
+ if (contextClassLoaders.size() <
numberOfScheduledRuns) {
+ contextClassLoaders.add(
+
Thread.currentThread().getContextClassLoader());
+ } else {
+ throw new RuntimeException("cancel task");
+ }
+ },
+ 0,
+ 1,
+ TimeUnit.MILLISECONDS);
+ assertThat(contextClassLoaders)
+ .allSatisfy(
+ classLoader ->
assertThat(classLoader).isSameAs(pretendFlinkClassLoader));
}
@Test
diff --git
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index a09ae9ead37..4edf3f70e10 100644
---
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -22,10 +22,7 @@ import
org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import java.io.Serializable;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
/**
* Interface for rpc services. An rpc service is used to start and connect to
a {@link RpcEndpoint}.
@@ -121,41 +118,4 @@ public interface RpcService {
* @return The RPC service provided scheduled executor
*/
ScheduledExecutor getScheduledExecutor();
-
- /**
- * Execute the runnable in the execution context of this RPC Service, as
returned by {@link
- * #getScheduledExecutor()} ()}, after a scheduled delay.
- *
- * @param runnable Runnable to be executed
- * @param delay The delay after which the runnable will be executed
- */
- ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay,
TimeUnit unit);
-
- /**
- * Execute the given runnable in the executor of the RPC service. This
method can be used to run
- * code outside of the main thread of a {@link RpcEndpoint}.
- *
- * <p><b>IMPORTANT:</b> This executor does not isolate the method
invocations against any
- * concurrent invocations and is therefore not suitable to run completion
methods of futures
- * that modify state of an {@link RpcEndpoint}. For such operations, one
needs to use the {@link
- * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
{@code RpcEndpoint}.
- *
- * @param runnable to execute
- */
- void execute(Runnable runnable);
-
- /**
- * Execute the given callable and return its result as a {@link
CompletableFuture}. This method
- * can be used to run code outside of the main thread of a {@link
RpcEndpoint}.
- *
- * <p><b>IMPORTANT:</b> This executor does not isolate the method
invocations against any
- * concurrent invocations and is therefore not suitable to run completion
methods of futures
- * that modify state of an {@link RpcEndpoint}. For such operations, one
needs to use the {@link
- * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
{@code RpcEndpoint}.
- *
- * @param callable to execute
- * @param <T> is the return value type
- * @return Future containing the callable's future result
- */
- <T> CompletableFuture<T> execute(Callable<T> callable);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index c7b2ca43a0b..299962b4e4f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -329,19 +329,18 @@ public abstract class RetryingRegistration<
private void registerLater(
final G gateway, final int attempt, final long timeoutMillis, long
delay) {
- rpcService.scheduleRunnable(
- new Runnable() {
- @Override
- public void run() {
- register(gateway, attempt, timeoutMillis);
- }
- },
- delay,
- TimeUnit.MILLISECONDS);
+ rpcService
+ .getScheduledExecutor()
+ .schedule(
+ () -> register(gateway, attempt, timeoutMillis),
+ delay,
+ TimeUnit.MILLISECONDS);
}
private void startRegistrationLater(final long delay) {
- rpcService.scheduleRunnable(this::startRegistration, delay,
TimeUnit.MILLISECONDS);
+ rpcService
+ .getScheduledExecutor()
+ .schedule(this::startRegistration, delay,
TimeUnit.MILLISECONDS);
}
static final class RetryingRegistrationResult<G, S, R> {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
index 7966c6f4e4b..28da2209cee 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
@@ -49,6 +49,7 @@ import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static
org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_FLINK;
import static
org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_MANAGED_MEMORY;
@@ -84,7 +85,11 @@ public class MetricUtilsTest extends TestLogger {
try {
final int threadPriority =
- rpcService.execute(() ->
Thread.currentThread().getPriority()).get();
+ rpcService
+ .getScheduledExecutor()
+ .schedule(
+ () ->
Thread.currentThread().getPriority(), 0, TimeUnit.SECONDS)
+ .get();
assertThat(threadPriority, is(expectedThreadPriority));
} finally {
rpcService.stopService().get();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 9d1bd05fabc..4c02c443e02 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -31,7 +31,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
@@ -52,7 +51,6 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -171,14 +169,6 @@ public class RetryingRegistrationTest extends TestLogger {
testGateway) // second connection attempt
succeeds
);
when(rpc.getScheduledExecutor()).thenReturn(executor);
- when(rpc.scheduleRunnable(any(Runnable.class), anyLong(),
any(TimeUnit.class)))
- .thenAnswer(
- (InvocationOnMock invocation) -> {
- final Runnable runnable =
invocation.getArgument(0);
- final long delay = invocation.getArgument(1);
- final TimeUnit timeUnit =
invocation.getArgument(2);
- return executor.schedule(runnable, delay,
timeUnit);
- });
TestRetryingRegistration registration =
new TestRetryingRegistration(rpc, "foobar address",
leaderId);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 1b7990cb447..b2c34c1b958 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,11 +23,8 @@ import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import java.io.Serializable;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -211,19 +208,4 @@ public class TestingRpcService implements RpcService {
public ScheduledExecutor getScheduledExecutor() {
return backingRpcService.getScheduledExecutor();
}
-
- @Override
- public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay,
TimeUnit unit) {
- return backingRpcService.scheduleRunnable(runnable, delay, unit);
- }
-
- @Override
- public void execute(Runnable runnable) {
- backingRpcService.execute(runnable);
- }
-
- @Override
- public <T> CompletableFuture<T> execute(Callable<T> callable) {
- return backingRpcService.execute(callable);
- }
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index aed994c9bbf..9757de12a2d 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -72,9 +72,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -516,21 +514,6 @@ public class OperatorEventSendingCheckpointITCase extends
TestLogger {
return rpcService.getScheduledExecutor();
}
- @Override
- public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long
delay, TimeUnit unit) {
- return rpcService.scheduleRunnable(runnable, delay, unit);
- }
-
- @Override
- public void execute(Runnable runnable) {
- rpcService.execute(runnable);
- }
-
- @Override
- public <T> CompletableFuture<T> execute(Callable<T> callable) {
- return rpcService.execute(callable);
- }
-
@SuppressWarnings("unchecked")
private <C extends RpcGateway> CompletableFuture<C> decorateTmGateway(
CompletableFuture<C> future) {