xintongsong commented on a change in pull request #15411:
URL: https://github.com/apache/flink/pull/15411#discussion_r605325307
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** test run the runnable in the main thread of the underlying RPC
endpoint. */
Review comment:
```suggestion
/** Tests running the runnable in the main thread of the underlying RPC
endpoint. */
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** test run the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testRunAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .runAsync(
+ () -> {
+ // no need to catch the validation failure
+ // if the validation fail, the future will
never complete
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualFinished);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * test scheduling the runnable in the main thread of the underlying RPC
endpoint, with a delay
+ * of the given number of milliseconds.
+ */
+ @Test
+ public void testScheduleRunAsyncTime()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final long expectedDelayMs = 500L;
Review comment:
100ms might be good enough.
I was suggesting 500ms for the test case with different units because that
case needs at least 1s for anyway.
And in that case, we might want to relax the assertion range to (0.5, 1.5).
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** test run the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testRunAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .runAsync(
+ () -> {
+ // no need to catch the validation failure
+ // if the validation fail, the future will
never complete
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualFinished);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * test scheduling the runnable in the main thread of the underlying RPC
endpoint, with a delay
+ * of the given number of milliseconds.
+ */
+ @Test
+ public void testScheduleRunAsyncTime()
Review comment:
nit: It's would be better to align the name of test case with the method
name being tested.
```suggestion
public void testScheduleRunAsync()
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** test run the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testRunAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .runAsync(
+ () -> {
+ // no need to catch the validation failure
+ // if the validation fail, the future will
never complete
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualFinished);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * test scheduling the runnable in the main thread of the underlying RPC
endpoint, with a delay
+ * of the given number of milliseconds.
+ */
+ @Test
+ public void testScheduleRunAsyncTime()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final long expectedDelayMs = 500L;
+ final CompletableFuture<Long> actualDelayMsFuture = new
CompletableFuture<>();
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .scheduleRunAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelayMs);
+ Long actualDelayMs =
+ actualDelayMsFuture.get(expectedDelayMs * 2,
TimeUnit.MILLISECONDS);
+ assertTrue(actualDelayMs > expectedDelayMs * 0.8);
+ assertTrue(actualDelayMs < expectedDelayMs * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test executing the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testExecute() throws InterruptedException, ExecutionException,
TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .execute(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualCondition = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualCondition);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test scheduling runnable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleRunnableTimeUnit()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test scheduling callable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleCallableTimeUnit()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final int expectedInt = 12345;
+ final String expectedString = "Flink";
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ ScheduledFuture<Integer> intScheduleFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedInt;
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ ScheduledFuture<String> stringScheduledFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedString;
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ final int actualInteger =
+ intScheduleFuture.get(expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final String actualString =
+ stringScheduledFuture.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ assertEquals(actualInteger, expectedInt);
+ assertEquals(actualString, expectedString);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** not implemented currently, exception is expected. */
+ @Test(expected = UnsupportedOperationException.class)
+ public void testScheduleAtFixedRate()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .scheduleAtFixedRate(
+ endpoint::validateRunsInMainThread, 100, 100,
TimeUnit.MILLISECONDS);
+ fail(
+ "Expected to fail with a UnsupportedOperationException"
+ + " since we have not implemented this method
currently.");
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** not implemented currently, exception is expected. */
+ @Test(expected = UnsupportedOperationException.class)
+ public void testScheduleWithFixedDelay()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .scheduleWithFixedDelay(
+ endpoint::validateRunsInMainThread, 100, 100,
TimeUnit.MILLISECONDS);
+ fail(
+ "Expected to fail with a UnsupportedOperationException"
+ + " since we have not implemented this method
currently.");
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * test execute the callable in the main thread of the underlying RPC
service, returning a
+ * future for the result of the callable. If the callable is not completed
within the given
+ * timeout, then the future will be failed with a TimeoutException. This
schedule method is
+ * called directly from RpcEndpoint, MainThreadExecutor do not support
this method.
+ */
+ @Test
+ public void testCallAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final Time timeout = Time.milliseconds(500);
+ final Integer expectedInteger = 12345;
+ try {
+ endpoint.start();
+ CompletableFuture<Integer> integerFuture =
+ endpoint.callAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ return expectedInteger;
+ },
+ timeout);
+ assertEquals(integerFuture.get(timeout.getSize(),
timeout.getUnit()), expectedInteger);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * make the callable sleep some time more than specified timeout, so
TimeoutException is
+ * expected.
+ */
+ @Test(expected = TimeoutException.class)
+ public void testCallAsyncTimeOut()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final Time timeout = Time.milliseconds(200);
+ final Integer expectedInteger = 12345;
+ try {
+ endpoint.start();
+ CompletableFuture<Integer> integerFuture =
+ endpoint.callAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ TimeUnit.MILLISECONDS.sleep(300);
+ return expectedInteger;
+ },
+ timeout);
+ integerFuture.get(timeout.getSize(), timeout.getUnit());
Review comment:
This should not work.
```
future = endpoint.callAsync(callable, timeout1);
future.get(timeout2.getSize(), timeout2.getUnit);
```
I think this case should test whether `timeout1` in the above example takes
effect, while currently the `TimeoutException` comes from `timeout2`. You can
verify that by replace `integerFuture.get(timeout.getSize(),
timeout.getUnit())` with `integerFuture.get()`. An `ExecutionException` should
be thrown.
When the callable execution takes longer than `timeout2`, `integerFuture`
will be completed exceptionally. Trying to `get` a `CompletableFuture` that is
completed exceptionally will result in the `ExecutionException`. Instead, we
should verify that `integerFuture` is completed with a timeout exception.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** test run the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testRunAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .runAsync(
+ () -> {
+ // no need to catch the validation failure
+ // if the validation fail, the future will
never complete
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualFinished);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * test scheduling the runnable in the main thread of the underlying RPC
endpoint, with a delay
+ * of the given number of milliseconds.
+ */
+ @Test
+ public void testScheduleRunAsyncTime()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final long expectedDelayMs = 500L;
+ final CompletableFuture<Long> actualDelayMsFuture = new
CompletableFuture<>();
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .scheduleRunAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelayMs);
+ Long actualDelayMs =
+ actualDelayMsFuture.get(expectedDelayMs * 2,
TimeUnit.MILLISECONDS);
+ assertTrue(actualDelayMs > expectedDelayMs * 0.8);
+ assertTrue(actualDelayMs < expectedDelayMs * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test executing the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testExecute() throws InterruptedException, ExecutionException,
TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .execute(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualCondition = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualCondition);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test scheduling runnable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleRunnableTimeUnit()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test scheduling callable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleCallableTimeUnit()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final int expectedInt = 12345;
+ final String expectedString = "Flink";
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ ScheduledFuture<Integer> intScheduleFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedInt;
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ ScheduledFuture<String> stringScheduledFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedString;
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ final int actualInteger =
+ intScheduleFuture.get(expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final String actualString =
+ stringScheduledFuture.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ assertEquals(actualInteger, expectedInt);
+ assertEquals(actualString, expectedString);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** not implemented currently, exception is expected. */
+ @Test(expected = UnsupportedOperationException.class)
+ public void testScheduleAtFixedRate()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .scheduleAtFixedRate(
+ endpoint::validateRunsInMainThread, 100, 100,
TimeUnit.MILLISECONDS);
+ fail(
+ "Expected to fail with a UnsupportedOperationException"
+ + " since we have not implemented this method
currently.");
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** not implemented currently, exception is expected. */
+ @Test(expected = UnsupportedOperationException.class)
+ public void testScheduleWithFixedDelay()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .scheduleWithFixedDelay(
+ endpoint::validateRunsInMainThread, 100, 100,
TimeUnit.MILLISECONDS);
+ fail(
+ "Expected to fail with a UnsupportedOperationException"
+ + " since we have not implemented this method
currently.");
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
Review comment:
I think we don't need these test cases. Doesn't make sense to verify the
behaviors of unsupported methods.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** test run the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testRunAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .runAsync(
+ () -> {
+ // no need to catch the validation failure
+ // if the validation fail, the future will
never complete
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualFinished);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * test scheduling the runnable in the main thread of the underlying RPC
endpoint, with a delay
+ * of the given number of milliseconds.
+ */
+ @Test
+ public void testScheduleRunAsyncTime()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final long expectedDelayMs = 500L;
+ final CompletableFuture<Long> actualDelayMsFuture = new
CompletableFuture<>();
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .scheduleRunAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelayMs);
+ Long actualDelayMs =
+ actualDelayMsFuture.get(expectedDelayMs * 2,
TimeUnit.MILLISECONDS);
+ assertTrue(actualDelayMs > expectedDelayMs * 0.8);
+ assertTrue(actualDelayMs < expectedDelayMs * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test executing the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testExecute() throws InterruptedException, ExecutionException,
TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .execute(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualCondition = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualCondition);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test scheduling runnable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleRunnableTimeUnit()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test scheduling callable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleCallableTimeUnit()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final int expectedInt = 12345;
+ final String expectedString = "Flink";
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ ScheduledFuture<Integer> intScheduleFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedInt;
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ ScheduledFuture<String> stringScheduledFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedString;
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ final int actualInteger =
+ intScheduleFuture.get(expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final String actualString =
+ stringScheduledFuture.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ assertEquals(actualInteger, expectedInt);
+ assertEquals(actualString, expectedString);
Review comment:
nit: The order of arguments affects the error message if the assertion
fail.
```suggestion
assertEquals(expectedInt, actualInteger);
assertEquals(expectedString, actualString);
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** test run the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testRunAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
Review comment:
nit:
1. `Boolean` type is not used in this case. `CompletableFuture<Void>` should
be good enough.
2. A `CompletableFuture` by nature carries the information whether the task
has finished. It would be better the variable name reflects which task it
represents.
3. It would be nice to always use `final` for variables that is not intended
to be changed.
```suggestion
final CompletableFuture<Void> asyncExecutionFuture = new
CompletableFuture<>();
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -262,4 +269,268 @@ protected ExtendedEndpoint(
return CompletableFuture.completedFuture(isRunning());
}
}
+
+ /** test run the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testRunAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .runAsync(
+ () -> {
+ // no need to catch the validation failure
+ // if the validation fail, the future will
never complete
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualFinished = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualFinished);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * test scheduling the runnable in the main thread of the underlying RPC
endpoint, with a delay
+ * of the given number of milliseconds.
+ */
+ @Test
+ public void testScheduleRunAsyncTime()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final long expectedDelayMs = 500L;
+ final CompletableFuture<Long> actualDelayMsFuture = new
CompletableFuture<>();
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .scheduleRunAsync(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelayMs);
+ Long actualDelayMs =
+ actualDelayMsFuture.get(expectedDelayMs * 2,
TimeUnit.MILLISECONDS);
+ assertTrue(actualDelayMs > expectedDelayMs * 0.8);
+ assertTrue(actualDelayMs < expectedDelayMs * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test executing the runnable in the main thread of the underlying RPC
endpoint. */
+ @Test
+ public void testExecute() throws InterruptedException, ExecutionException,
TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ CompletableFuture<Boolean> finishedFuture = new CompletableFuture<>();
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .execute(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ finishedFuture.complete(true);
+ });
+ Boolean actualCondition = finishedFuture.get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
+ assertTrue(actualCondition);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test scheduling runnable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleRunnableTimeUnit()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** test scheduling callable with delay specified in number and TimeUnit.
*/
+ @Test
+ public void testScheduleCallableTimeUnit()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final Time expectedDelay1 = Time.seconds(1);
+ final Time expectedDelay2 = Time.milliseconds(500);
+ final CompletableFuture<Long> actualDelayMsFuture1 = new
CompletableFuture<>();
+ final CompletableFuture<Long> actualDelayMsFuture2 = new
CompletableFuture<>();
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final int expectedInt = 12345;
+ final String expectedString = "Flink";
+ try {
+ endpoint.start();
+ final long startTime = System.currentTimeMillis();
+ ScheduledFuture<Integer> intScheduleFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture1.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedInt;
+ },
+ expectedDelay1.getSize(),
+ expectedDelay1.getUnit());
+ ScheduledFuture<String> stringScheduledFuture =
+ endpoint.getMainThreadExecutor()
+ .schedule(
+ () -> {
+ endpoint.validateRunsInMainThread();
+ actualDelayMsFuture2.complete(
+ System.currentTimeMillis() -
startTime);
+ return expectedString;
+ },
+ expectedDelay2.getSize(),
+ expectedDelay2.getUnit());
+
+ final long actualDelayMs1 =
+ actualDelayMsFuture1.get(
+ expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final long actualDelayMs2 =
+ actualDelayMsFuture2.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ final int actualInteger =
+ intScheduleFuture.get(expectedDelay1.getSize() * 2,
expectedDelay1.getUnit());
+ final String actualString =
+ stringScheduledFuture.get(
+ expectedDelay2.getSize() * 2,
expectedDelay2.getUnit());
+ assertTrue(actualDelayMs1 > expectedDelay1.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs1 < expectedDelay1.toMilliseconds() * 1.2);
+ assertTrue(actualDelayMs2 > expectedDelay2.toMilliseconds() * 0.8);
+ assertTrue(actualDelayMs2 < expectedDelay2.toMilliseconds() * 1.2);
+ assertEquals(actualInteger, expectedInt);
+ assertEquals(actualString, expectedString);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** not implemented currently, exception is expected. */
+ @Test(expected = UnsupportedOperationException.class)
+ public void testScheduleAtFixedRate()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .scheduleAtFixedRate(
+ endpoint::validateRunsInMainThread, 100, 100,
TimeUnit.MILLISECONDS);
+ fail(
+ "Expected to fail with a UnsupportedOperationException"
+ + " since we have not implemented this method
currently.");
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /** not implemented currently, exception is expected. */
+ @Test(expected = UnsupportedOperationException.class)
+ public void testScheduleWithFixedDelay()
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ try {
+ endpoint.start();
+ endpoint.getMainThreadExecutor()
+ .scheduleWithFixedDelay(
+ endpoint::validateRunsInMainThread, 100, 100,
TimeUnit.MILLISECONDS);
+ fail(
+ "Expected to fail with a UnsupportedOperationException"
+ + " since we have not implemented this method
currently.");
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * test execute the callable in the main thread of the underlying RPC
service, returning a
+ * future for the result of the callable. If the callable is not completed
within the given
+ * timeout, then the future will be failed with a TimeoutException. This
schedule method is
+ * called directly from RpcEndpoint, MainThreadExecutor do not support
this method.
+ */
+ @Test
+ public void testCallAsync() throws InterruptedException,
ExecutionException, TimeoutException {
+ final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
+ final Time timeout = Time.milliseconds(500);
Review comment:
Reusing `TIMEOUT` should be good enough. It's okay to have a large time
when the timeout is not expected to be triggered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]