xintongsong commented on a change in pull request #15411:
URL: https://github.com/apache/flink/pull/15411#discussion_r605325307



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** test run the runnable in the main thread of the underlying RPC 
endpoint. */

Review comment:
       ```suggestion
       /** Tests running the runnable in the main thread of the underlying RPC 
endpoint. */
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** test run the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testRunAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .runAsync(
+                            () -> {
+                                // no need to catch the validation failure
+                                // if the validation fail, the future will 
never complete
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualFinished);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * test scheduling the runnable in the main thread of the underlying RPC 
endpoint, with a delay
+     * of the given number of milliseconds.
+     */
+    @Test
+    public void testScheduleRunAsyncTime()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final long expectedDelayMs = 500L;

Review comment:
       100ms might be good enough.
   
   I was suggesting 500ms for the test case with different units because that 
case needs at least 1s for anyway.
   
   And in that case, we might want to relax the assertion range to (0.5, 1.5).

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** test run the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testRunAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .runAsync(
+                            () -> {
+                                // no need to catch the validation failure
+                                // if the validation fail, the future will 
never complete
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualFinished);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * test scheduling the runnable in the main thread of the underlying RPC 
endpoint, with a delay
+     * of the given number of milliseconds.
+     */
+    @Test
+    public void testScheduleRunAsyncTime()

Review comment:
       nit: It's would be better to align the name of test case with the method 
name being tested.
   ```suggestion
       public void testScheduleRunAsync()
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** test run the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testRunAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .runAsync(
+                            () -> {
+                                // no need to catch the validation failure
+                                // if the validation fail, the future will 
never complete
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualFinished);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * test scheduling the runnable in the main thread of the underlying RPC 
endpoint, with a delay
+     * of the given number of milliseconds.
+     */
+    @Test
+    public void testScheduleRunAsyncTime()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final long expectedDelayMs = 500L;
+        final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .scheduleRunAsync(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelayMs);
+            Long actualDelayMs =
+                    actualDelayMsFuture.get(expectedDelayMs * 2, 
TimeUnit.MILLISECONDS);
+            assertTrue(actualDelayMs > expectedDelayMs * 0.8);
+            assertTrue(actualDelayMs < expectedDelayMs * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test executing the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testExecute() throws InterruptedException, ExecutionException, 
TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .execute(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualCondition = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualCondition);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test scheduling runnable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleRunnableTimeUnit()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture1.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay1.getSize(),
+                            expectedDelay1.getUnit());
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture2.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay2.getSize(),
+                            expectedDelay2.getUnit());
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test scheduling callable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleCallableTimeUnit()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final int expectedInt = 12345;
+        final String expectedString = "Flink";
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            ScheduledFuture<Integer> intScheduleFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture1.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedInt;
+                                    },
+                                    expectedDelay1.getSize(),
+                                    expectedDelay1.getUnit());
+            ScheduledFuture<String> stringScheduledFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture2.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedString;
+                                    },
+                                    expectedDelay2.getSize(),
+                                    expectedDelay2.getUnit());
+
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            final int actualInteger =
+                    intScheduleFuture.get(expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final String actualString =
+                    stringScheduledFuture.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+            assertEquals(actualInteger, expectedInt);
+            assertEquals(actualString, expectedString);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** not implemented currently, exception is expected. */
+    @Test(expected = UnsupportedOperationException.class)
+    public void testScheduleAtFixedRate()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .scheduleAtFixedRate(
+                            endpoint::validateRunsInMainThread, 100, 100, 
TimeUnit.MILLISECONDS);
+            fail(
+                    "Expected to fail with a UnsupportedOperationException"
+                            + " since we have not implemented this method 
currently.");
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** not implemented currently, exception is expected. */
+    @Test(expected = UnsupportedOperationException.class)
+    public void testScheduleWithFixedDelay()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .scheduleWithFixedDelay(
+                            endpoint::validateRunsInMainThread, 100, 100, 
TimeUnit.MILLISECONDS);
+            fail(
+                    "Expected to fail with a UnsupportedOperationException"
+                            + " since we have not implemented this method 
currently.");
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * test execute the callable in the main thread of the underlying RPC 
service, returning a
+     * future for the result of the callable. If the callable is not completed 
within the given
+     * timeout, then the future will be failed with a TimeoutException. This 
schedule method is
+     * called directly from RpcEndpoint, MainThreadExecutor do not support 
this method.
+     */
+    @Test
+    public void testCallAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final Time timeout = Time.milliseconds(500);
+        final Integer expectedInteger = 12345;
+        try {
+            endpoint.start();
+            CompletableFuture<Integer> integerFuture =
+                    endpoint.callAsync(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                return expectedInteger;
+                            },
+                            timeout);
+            assertEquals(integerFuture.get(timeout.getSize(), 
timeout.getUnit()), expectedInteger);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * make the callable sleep some time more than specified timeout, so 
TimeoutException is
+     * expected.
+     */
+    @Test(expected = TimeoutException.class)
+    public void testCallAsyncTimeOut()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final Time timeout = Time.milliseconds(200);
+        final Integer expectedInteger = 12345;
+        try {
+            endpoint.start();
+            CompletableFuture<Integer> integerFuture =
+                    endpoint.callAsync(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                TimeUnit.MILLISECONDS.sleep(300);
+                                return expectedInteger;
+                            },
+                            timeout);
+            integerFuture.get(timeout.getSize(), timeout.getUnit());

Review comment:
       This should not work.
   
   ```
   future = endpoint.callAsync(callable, timeout1);
   future.get(timeout2.getSize(), timeout2.getUnit);
   ```
   I think this case should test whether `timeout1` in the above example takes 
effect, while currently the `TimeoutException` comes from `timeout2`. You can 
verify that by replace `integerFuture.get(timeout.getSize(), 
timeout.getUnit())` with `integerFuture.get()`. An `ExecutionException` should 
be thrown.
   
   When the callable execution takes longer than `timeout2`, `integerFuture` 
will be completed exceptionally. Trying to `get` a `CompletableFuture` that is 
completed exceptionally will result in the `ExecutionException`. Instead, we 
should verify that `integerFuture` is completed with a timeout exception. 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** test run the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testRunAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .runAsync(
+                            () -> {
+                                // no need to catch the validation failure
+                                // if the validation fail, the future will 
never complete
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualFinished);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * test scheduling the runnable in the main thread of the underlying RPC 
endpoint, with a delay
+     * of the given number of milliseconds.
+     */
+    @Test
+    public void testScheduleRunAsyncTime()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final long expectedDelayMs = 500L;
+        final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .scheduleRunAsync(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelayMs);
+            Long actualDelayMs =
+                    actualDelayMsFuture.get(expectedDelayMs * 2, 
TimeUnit.MILLISECONDS);
+            assertTrue(actualDelayMs > expectedDelayMs * 0.8);
+            assertTrue(actualDelayMs < expectedDelayMs * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test executing the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testExecute() throws InterruptedException, ExecutionException, 
TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .execute(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualCondition = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualCondition);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test scheduling runnable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleRunnableTimeUnit()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture1.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay1.getSize(),
+                            expectedDelay1.getUnit());
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture2.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay2.getSize(),
+                            expectedDelay2.getUnit());
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test scheduling callable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleCallableTimeUnit()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final int expectedInt = 12345;
+        final String expectedString = "Flink";
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            ScheduledFuture<Integer> intScheduleFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture1.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedInt;
+                                    },
+                                    expectedDelay1.getSize(),
+                                    expectedDelay1.getUnit());
+            ScheduledFuture<String> stringScheduledFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture2.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedString;
+                                    },
+                                    expectedDelay2.getSize(),
+                                    expectedDelay2.getUnit());
+
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            final int actualInteger =
+                    intScheduleFuture.get(expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final String actualString =
+                    stringScheduledFuture.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+            assertEquals(actualInteger, expectedInt);
+            assertEquals(actualString, expectedString);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** not implemented currently, exception is expected. */
+    @Test(expected = UnsupportedOperationException.class)
+    public void testScheduleAtFixedRate()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .scheduleAtFixedRate(
+                            endpoint::validateRunsInMainThread, 100, 100, 
TimeUnit.MILLISECONDS);
+            fail(
+                    "Expected to fail with a UnsupportedOperationException"
+                            + " since we have not implemented this method 
currently.");
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** not implemented currently, exception is expected. */
+    @Test(expected = UnsupportedOperationException.class)
+    public void testScheduleWithFixedDelay()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .scheduleWithFixedDelay(
+                            endpoint::validateRunsInMainThread, 100, 100, 
TimeUnit.MILLISECONDS);
+            fail(
+                    "Expected to fail with a UnsupportedOperationException"
+                            + " since we have not implemented this method 
currently.");
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }

Review comment:
       I think we don't need these test cases. Doesn't make sense to verify the 
behaviors of unsupported methods.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** test run the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testRunAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .runAsync(
+                            () -> {
+                                // no need to catch the validation failure
+                                // if the validation fail, the future will 
never complete
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualFinished);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * test scheduling the runnable in the main thread of the underlying RPC 
endpoint, with a delay
+     * of the given number of milliseconds.
+     */
+    @Test
+    public void testScheduleRunAsyncTime()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final long expectedDelayMs = 500L;
+        final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .scheduleRunAsync(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelayMs);
+            Long actualDelayMs =
+                    actualDelayMsFuture.get(expectedDelayMs * 2, 
TimeUnit.MILLISECONDS);
+            assertTrue(actualDelayMs > expectedDelayMs * 0.8);
+            assertTrue(actualDelayMs < expectedDelayMs * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test executing the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testExecute() throws InterruptedException, ExecutionException, 
TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .execute(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualCondition = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualCondition);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test scheduling runnable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleRunnableTimeUnit()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture1.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay1.getSize(),
+                            expectedDelay1.getUnit());
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture2.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay2.getSize(),
+                            expectedDelay2.getUnit());
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test scheduling callable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleCallableTimeUnit()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final int expectedInt = 12345;
+        final String expectedString = "Flink";
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            ScheduledFuture<Integer> intScheduleFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture1.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedInt;
+                                    },
+                                    expectedDelay1.getSize(),
+                                    expectedDelay1.getUnit());
+            ScheduledFuture<String> stringScheduledFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture2.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedString;
+                                    },
+                                    expectedDelay2.getSize(),
+                                    expectedDelay2.getUnit());
+
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            final int actualInteger =
+                    intScheduleFuture.get(expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final String actualString =
+                    stringScheduledFuture.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+            assertEquals(actualInteger, expectedInt);
+            assertEquals(actualString, expectedString);

Review comment:
       nit: The order of arguments affects the error message if the assertion 
fail.
   ```suggestion
               assertEquals(expectedInt, actualInteger);
               assertEquals(expectedString, actualString);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** test run the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testRunAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();

Review comment:
       nit:
   1. `Boolean` type is not used in this case. `CompletableFuture<Void>` should 
be good enough.
   2. A `CompletableFuture` by nature carries the information whether the task 
has finished. It would be better the variable name reflects which task it 
represents.
   3. It would be nice to always use `final` for variables that is not intended 
to be changed.
   ```suggestion
           final CompletableFuture<Void> asyncExecutionFuture = new 
CompletableFuture<>();
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
             return CompletableFuture.completedFuture(isRunning());
         }
     }
+
+    /** test run the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testRunAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .runAsync(
+                            () -> {
+                                // no need to catch the validation failure
+                                // if the validation fail, the future will 
never complete
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualFinished);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * test scheduling the runnable in the main thread of the underlying RPC 
endpoint, with a delay
+     * of the given number of milliseconds.
+     */
+    @Test
+    public void testScheduleRunAsyncTime()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final long expectedDelayMs = 500L;
+        final CompletableFuture<Long> actualDelayMsFuture = new 
CompletableFuture<>();
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .scheduleRunAsync(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelayMs);
+            Long actualDelayMs =
+                    actualDelayMsFuture.get(expectedDelayMs * 2, 
TimeUnit.MILLISECONDS);
+            assertTrue(actualDelayMs > expectedDelayMs * 0.8);
+            assertTrue(actualDelayMs < expectedDelayMs * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test executing the runnable in the main thread of the underlying RPC 
endpoint. */
+    @Test
+    public void testExecute() throws InterruptedException, ExecutionException, 
TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .execute(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                finishedFuture.complete(true);
+                            });
+            Boolean actualCondition = finishedFuture.get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
+            assertTrue(actualCondition);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test scheduling runnable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleRunnableTimeUnit()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture1.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay1.getSize(),
+                            expectedDelay1.getUnit());
+            endpoint.getMainThreadExecutor()
+                    .schedule(
+                            () -> {
+                                endpoint.validateRunsInMainThread();
+                                actualDelayMsFuture2.complete(
+                                        System.currentTimeMillis() - 
startTime);
+                            },
+                            expectedDelay2.getSize(),
+                            expectedDelay2.getUnit());
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** test scheduling callable with delay specified in number and TimeUnit. 
*/
+    @Test
+    public void testScheduleCallableTimeUnit()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final Time expectedDelay1 = Time.seconds(1);
+        final Time expectedDelay2 = Time.milliseconds(500);
+        final CompletableFuture<Long> actualDelayMsFuture1 = new 
CompletableFuture<>();
+        final CompletableFuture<Long> actualDelayMsFuture2 = new 
CompletableFuture<>();
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final int expectedInt = 12345;
+        final String expectedString = "Flink";
+        try {
+            endpoint.start();
+            final long startTime = System.currentTimeMillis();
+            ScheduledFuture<Integer> intScheduleFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture1.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedInt;
+                                    },
+                                    expectedDelay1.getSize(),
+                                    expectedDelay1.getUnit());
+            ScheduledFuture<String> stringScheduledFuture =
+                    endpoint.getMainThreadExecutor()
+                            .schedule(
+                                    () -> {
+                                        endpoint.validateRunsInMainThread();
+                                        actualDelayMsFuture2.complete(
+                                                System.currentTimeMillis() - 
startTime);
+                                        return expectedString;
+                                    },
+                                    expectedDelay2.getSize(),
+                                    expectedDelay2.getUnit());
+
+            final long actualDelayMs1 =
+                    actualDelayMsFuture1.get(
+                            expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final long actualDelayMs2 =
+                    actualDelayMsFuture2.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            final int actualInteger =
+                    intScheduleFuture.get(expectedDelay1.getSize() * 2, 
expectedDelay1.getUnit());
+            final String actualString =
+                    stringScheduledFuture.get(
+                            expectedDelay2.getSize() * 2, 
expectedDelay2.getUnit());
+            assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+            assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+            assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+            assertEquals(actualInteger, expectedInt);
+            assertEquals(actualString, expectedString);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** not implemented currently, exception is expected. */
+    @Test(expected = UnsupportedOperationException.class)
+    public void testScheduleAtFixedRate()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .scheduleAtFixedRate(
+                            endpoint::validateRunsInMainThread, 100, 100, 
TimeUnit.MILLISECONDS);
+            fail(
+                    "Expected to fail with a UnsupportedOperationException"
+                            + " since we have not implemented this method 
currently.");
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /** not implemented currently, exception is expected. */
+    @Test(expected = UnsupportedOperationException.class)
+    public void testScheduleWithFixedDelay()
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        try {
+            endpoint.start();
+            endpoint.getMainThreadExecutor()
+                    .scheduleWithFixedDelay(
+                            endpoint::validateRunsInMainThread, 100, 100, 
TimeUnit.MILLISECONDS);
+            fail(
+                    "Expected to fail with a UnsupportedOperationException"
+                            + " since we have not implemented this method 
currently.");
+        } finally {
+            RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+        }
+    }
+
+    /**
+     * test execute the callable in the main thread of the underlying RPC 
service, returning a
+     * future for the result of the callable. If the callable is not completed 
within the given
+     * timeout, then the future will be failed with a TimeoutException. This 
schedule method is
+     * called directly from RpcEndpoint, MainThreadExecutor do not support 
this method.
+     */
+    @Test
+    public void testCallAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+        final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+        final Time timeout = Time.milliseconds(500);

Review comment:
       Reusing `TIMEOUT` should be good enough. It's okay to have a large time 
when the timeout is not expected to be triggered.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to