tillrohrmann commented on a change in pull request #15741:
URL: https://github.com/apache/flink/pull/15741#discussion_r627494482
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
##########
@@ -560,6 +561,47 @@ public void canRespondWithSerializedValueLocally() throws
Exception {
}
}
+ /**
+ * Verifies that actions scheduled via the main thread executor are
eventually run while
+ * adhering to the provided delays.
+ *
+ * <p>This test does not assert any upper bounds for how late something is
run, because that
+ * would make the test unstable in some environments, and there is no
guarantee that such an
+ * upper bound exists in the first place.
+ *
+ * <p>There are various failure points for this test, including the
scheduling from the {@link
+ * RpcEndpoint} to the {@link AkkaInvocationHandler}, the conversion of
these calls by the
+ * handler into Call-/RunAsync messages, the handling of said messages by
the {@link
+ * AkkaRpcActor} and in the case of RunAsync the actual scheduling by the
underlying actor
+ * system. This isn't an ideal test setup, but these components are
difficult to test in
+ * isolation.
+ */
+ @Test
+ public void testScheduling() throws ExecutionException,
InterruptedException {
+ final SchedulingRpcEndpoint endpoint = new
SchedulingRpcEndpoint(akkaRpcService);
+
+ endpoint.start();
+
+ final SchedulingRpcEndpointGateway gateway =
+ endpoint.getSelfGateway(SchedulingRpcEndpointGateway.class);
+
+ final CompletableFuture<Void> scheduleRunnableFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> scheduleCallableFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> executeFuture = new
CompletableFuture<>();
+
+ final long scheduleTime = System.currentTimeMillis();
Review comment:
Let's use `System.nanoTime` to avoid clock resets.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
##########
@@ -560,6 +561,47 @@ public void canRespondWithSerializedValueLocally() throws
Exception {
}
}
+ /**
+ * Verifies that actions scheduled via the main thread executor are
eventually run while
+ * adhering to the provided delays.
+ *
+ * <p>This test does not assert any upper bounds for how late something is
run, because that
+ * would make the test unstable in some environments, and there is no
guarantee that such an
+ * upper bound exists in the first place.
+ *
+ * <p>There are various failure points for this test, including the
scheduling from the {@link
+ * RpcEndpoint} to the {@link AkkaInvocationHandler}, the conversion of
these calls by the
+ * handler into Call-/RunAsync messages, the handling of said messages by
the {@link
+ * AkkaRpcActor} and in the case of RunAsync the actual scheduling by the
underlying actor
+ * system. This isn't an ideal test setup, but these components are
difficult to test in
+ * isolation.
+ */
+ @Test
+ public void testScheduling() throws ExecutionException,
InterruptedException {
+ final SchedulingRpcEndpoint endpoint = new
SchedulingRpcEndpoint(akkaRpcService);
+
+ endpoint.start();
+
+ final SchedulingRpcEndpointGateway gateway =
+ endpoint.getSelfGateway(SchedulingRpcEndpointGateway.class);
+
+ final CompletableFuture<Void> scheduleRunnableFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> scheduleCallableFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> executeFuture = new
CompletableFuture<>();
+
+ final long scheduleTime = System.currentTimeMillis();
+ gateway.schedule(scheduleRunnableFuture, scheduleCallableFuture,
executeFuture);
+
+ assertThat(
+ scheduleRunnableFuture.thenApply(ignored ->
System.currentTimeMillis()).get(),
+ greaterThanOrEqualTo(scheduleTime +
SchedulingRpcEndpoint.DELAY_MILLIS));
+ assertThat(
+ scheduleCallableFuture.thenApply(ignored ->
System.currentTimeMillis()).get(),
Review comment:
Same here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
##########
@@ -560,6 +561,47 @@ public void canRespondWithSerializedValueLocally() throws
Exception {
}
}
+ /**
+ * Verifies that actions scheduled via the main thread executor are
eventually run while
+ * adhering to the provided delays.
+ *
+ * <p>This test does not assert any upper bounds for how late something is
run, because that
+ * would make the test unstable in some environments, and there is no
guarantee that such an
+ * upper bound exists in the first place.
+ *
+ * <p>There are various failure points for this test, including the
scheduling from the {@link
+ * RpcEndpoint} to the {@link AkkaInvocationHandler}, the conversion of
these calls by the
+ * handler into Call-/RunAsync messages, the handling of said messages by
the {@link
+ * AkkaRpcActor} and in the case of RunAsync the actual scheduling by the
underlying actor
+ * system. This isn't an ideal test setup, but these components are
difficult to test in
+ * isolation.
+ */
+ @Test
+ public void testScheduling() throws ExecutionException,
InterruptedException {
+ final SchedulingRpcEndpoint endpoint = new
SchedulingRpcEndpoint(akkaRpcService);
+
+ endpoint.start();
+
+ final SchedulingRpcEndpointGateway gateway =
+ endpoint.getSelfGateway(SchedulingRpcEndpointGateway.class);
+
+ final CompletableFuture<Void> scheduleRunnableFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> scheduleCallableFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> executeFuture = new
CompletableFuture<>();
+
+ final long scheduleTime = System.currentTimeMillis();
+ gateway.schedule(scheduleRunnableFuture, scheduleCallableFuture,
executeFuture);
+
+ assertThat(
+ scheduleRunnableFuture.thenApply(ignored ->
System.currentTimeMillis()).get(),
Review comment:
Same here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]