This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c43291  [FLINK-22055][runtime] Fix RpcEndpoint MainThreadExecutor 
schedules callables with potential wrong time unit.
5c43291 is described below

commit 5c43291494bb57c0b4cf5668227fe2cadc0bb2a4
Author: est08zw <[email protected]>
AuthorDate: Mon Mar 29 15:53:42 2021 +0800

    [FLINK-22055][runtime] Fix RpcEndpoint MainThreadExecutor schedules 
callables with potential wrong time unit.
    
    This closes #15411
---
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  |   2 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 236 ++++++++++++++++++++-
 2 files changed, 236 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index fcc0e9e..fdca630 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -435,7 +435,7 @@ public abstract class RpcEndpoint implements RpcGateway, 
AutoCloseableAsync {
             final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, 
unit);
             FutureTask<V> ft = new FutureTask<>(callable);
             scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delay, 
TimeUnit.MILLISECONDS);
+            return new ScheduledFutureAdapter<>(ft, delayMillis, 
TimeUnit.MILLISECONDS);
         }
 
         @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index dad04de..5e1424c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -35,15 +35,17 @@ import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-/** Tests for the RpcEndpoint and its self gateways. */
+/** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor 
scheduling command. */
 public class RpcEndpointTest extends TestLogger {
 
     private static final Time TIMEOUT = Time.seconds(10L);
@@ -193,6 +195,11 @@ public class RpcEndpointTest extends TestLogger {
 
         private final int foobarValue;
 
+        protected BaseEndpoint(RpcService rpcService) {
+            super(rpcService);
+            this.foobarValue = Integer.MAX_VALUE;
+        }
+
         protected BaseEndpoint(RpcService rpcService, int foobarValue) {
             super(rpcService);
 
@@ -262,4 +269,231 @@ public class RpcEndpointTest extends TestLogger {
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** Tests running the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testRunAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final CompletableFuture<Void> asyncExecutionFuture = new 
CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .runAsync(
+                            () -> {
+                                // no need to catch the validation failure
+                                // if the validation fail, the future will 
never complete
+                                endpoint.validateRunsInMainThread();
+                                asyncExecutionFuture.complete(null);
+                            });
+            asyncExecutionFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * Tests scheduling the runnable in the main thread of the underlying RPC 
endpoint, with a delay
+     * of the given number of milliseconds.
+     */
+    @Test
+    public void testScheduleRunAsync()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final long expectedDelayMs = 100L;
+        final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .scheduleRunAsync(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelayMs);
+            final long actualDelayMs =
+                    actualDelayMsFuture.get(expectedDelayMs * 2, 
TimeUnit.MILLISECONDS);
+            assertTrue(actualDelayMs > expectedDelayMs * 0.5);
+            assertTrue(actualDelayMs < expectedDelayMs * 1.5);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** Tests executing the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testExecute() throws InterruptedException, ExecutionException, 
TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final CompletableFuture<Void> asyncExecutionFuture = new 
CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .execute(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                asyncExecutionFuture.complete(null);
+                            });
+            asyncExecutionFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** Tests scheduling runnable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleRunnable()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture1.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay1.getSize(),
+                            expectedDelay1.getUnit());
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture2.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay2.getSize(),
+                            expectedDelay2.getUnit());
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** Tests scheduling callable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleCallable()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final int expectedInt = 12345;
+        final String expectedString = "Flink";
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            final ScheduledFuture<Integer> intScheduleFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture1.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedInt;
+                                    },
+                                    expectedDelay1.getSize(),
+                                    expectedDelay1.getUnit());
+            final ScheduledFuture<String> stringScheduledFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture2.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedString;
+                                    },
+                                    expectedDelay2.getSize(),
+                                    expectedDelay2.getUnit());
+
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            final int actualInteger =
+                    intScheduleFuture.get(expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final String actualString =
+                    stringScheduledFuture.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+            assertEquals(expectedInt, actualInteger);
+            assertEquals(expectedString, actualString);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * Tests executing the callable in the main thread of the underlying RPC 
service, returning a
+     * future for the result of the callable. If the callable is not completed 
within the given
+     * timeout, then the future will be failed with a TimeoutException. This 
schedule method is
+     * called directly from RpcEndpoint, MainThreadExecutor do not support 
this method.
+     */
+    @Test
+    public void testCallAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final Integer expectedInteger = 12345;
+        try {
+            endpoint.start();
+            final CompletableFuture<Integer> integerFuture =
+                    endpoint.callAsync(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                return expectedInteger;
+                            },
+                            TIMEOUT);
+            assertEquals(expectedInteger, integerFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit()));
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * Make the callable sleep some time more than specified timeout, so 
TimeoutException is
+     * expected.
+     */
+    @Test
+    public void testCallAsyncTimeout()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final Time timeout = Time.milliseconds(100);
+        try {
+            endpoint.start();
+            final CompletableFuture<Throwable> throwableFuture =
+                    endpoint.callAsync(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        
TimeUnit.MILLISECONDS.sleep(timeout.toMilliseconds() * 2);
+                                        return 12345;
+                                    },
+                                    timeout)
+                            .handle((ignore, throwable) -> throwable);
+            final Throwable throwable =
+                    throwableFuture.get(timeout.getSize() * 2, 
timeout.getUnit());
+            assertTrue(throwable instanceof TimeoutException);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
 }

Reply via email to