This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5c43291 [FLINK-22055][runtime] Fix RpcEndpoint MainThreadExecutor
schedules callables with potential wrong time unit.
5c43291 is described below
commit 5c43291494bb57c0b4cf5668227fe2cadc0bb2a4
Author: est08zw <[email protected]>
AuthorDate: Mon Mar 29 15:53:42 2021 +0800
[FLINK-22055][runtime] Fix RpcEndpoint MainThreadExecutor schedules
callables with potential wrong time unit.
This closes #15411
---
.../org/apache/flink/runtime/rpc/RpcEndpoint.java | 2 +-
.../apache/flink/runtime/rpc/RpcEndpointTest.java | 236 ++++++++++++++++++++-
2 files changed, 236 insertions(+), 2 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index fcc0e9e..fdca630 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -435,7 +435,7 @@ public abstract class RpcEndpoint implements RpcGateway,
AutoCloseableAsync {
final long delayMillis = TimeUnit.MILLISECONDS.convert(delay,
unit);
FutureTask<V> ft = new FutureTask<>(callable);
scheduleRunAsync(ft, delayMillis);
- return new ScheduledFutureAdapter<>(ft, delay,
TimeUnit.MILLISECONDS);
+ return new ScheduledFutureAdapter<>(ft, delayMillis,
TimeUnit.MILLISECONDS);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index dad04de..5e1424c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -35,15 +35,17 @@ import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-/** Tests for the RpcEndpoint and its self gateways. */
+/** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor
scheduling command. */
public class RpcEndpointTest extends TestLogger {
private static final Time TIMEOUT = Time.seconds(10L);
@@ -193,6 +195,11 @@ public class RpcEndpointTest extends TestLogger {
private final int foobarValue;
+ protected BaseEndpoint(RpcService rpcService) {
+ super(rpcService);
+ this.foobarValue = Integer.MAX_VALUE;
+ }
+
protected BaseEndpoint(RpcService rpcService, int foobarValue) {
super(rpcService);
@@ -262,4 +269,231 @@ public class RpcEndpointTest extends TestLogger {
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** Tests running the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testRunAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final CompletableFuture<Void> asyncExecutionFuture = new
CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .runAsync(
+ () -> {
+ // no need to catch the validation failure
+ // if the validation fail, the future will
never complete
+ endpoint.validateRunsInMainThread();
+ asyncExecutionFuture.complete(null);
+ });
+ asyncExecutionFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * Tests scheduling the runnable in the main thread of the underlying RPC
endpoint, with a delay
+ * of the given number of milliseconds.
+ */
+ @Test
+ public void testScheduleRunAsync()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final long expectedDelayMs = 100L;
+ final CompletableFuture<Long> actualDelayMsFuture = new
CompletableFuture<>();
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .scheduleRunAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelayMs);
+ final long actualDelayMs =
+ actualDelayMsFuture.get(expectedDelayMs * 2,
TimeUnit.MILLISECONDS);
+ assertTrue(actualDelayMs > expectedDelayMs * 0.5);
+ assertTrue(actualDelayMs < expectedDelayMs * 1.5);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** Tests executing the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testExecute() throws InterruptedException, ExecutionException,
TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final CompletableFuture<Void> asyncExecutionFuture = new
CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .execute(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ asyncExecutionFuture.complete(null);
+ });
+ asyncExecutionFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** Tests scheduling runnable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleRunnable()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** Tests scheduling callable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleCallable()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final int expectedInt = 12345;
+ final String expectedString = "Flink";
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ final ScheduledFuture<Integer> intScheduleFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedInt;
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ final ScheduledFuture<String> stringScheduledFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedString;
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ final int actualInteger =
+ intScheduleFuture.get(expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final String actualString =
+ stringScheduledFuture.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ assertEquals(expectedInt, actualInteger);
+ assertEquals(expectedString, actualString);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * Tests executing the callable in the main thread of the underlying RPC
service, returning a
+ * future for the result of the callable. If the callable is not completed
within the given
+ * timeout, then the future will be failed with a TimeoutException. This
schedule method is
+ * called directly from RpcEndpoint, MainThreadExecutor do not support
this method.
+ */
+ @Test
+ public void testCallAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final Integer expectedInteger = 12345;
+ try {
+ endpoint.start();
+ final CompletableFuture<Integer> integerFuture =
+ endpoint.callAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ return expectedInteger;
+ },
+ TIMEOUT);
+ assertEquals(expectedInteger, integerFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit()));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * Make the callable sleep some time more than specified timeout, so
TimeoutException is
+ * expected.
+ */
+ @Test
+ public void testCallAsyncTimeout()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final Time timeout = Time.milliseconds(100);
+ try {
+ endpoint.start();
+ final CompletableFuture<Throwable> throwableFuture =
+ endpoint.callAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+
TimeUnit.MILLISECONDS.sleep(timeout.toMilliseconds() * 2);
+ return 12345;
+ },
+ timeout)
+ .handle((ignore, throwable) -> throwable);
+ final Throwable throwable =
+ throwableFuture.get(timeout.getSize() * 2,
timeout.getUnit());
+ assertTrue(throwable instanceof TimeoutException);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
}