This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new af3d3ef  [FLINK-10419] Call JobMasterGateway through 
RpcCheckpointResponder in test
af3d3ef is described below

commit af3d3ef778998dfe96dbe6d5a0ceaf887b7c00f4
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Nov 14 13:24:52 2018 +0100

    [FLINK-10419] Call JobMasterGateway through RpcCheckpointResponder in test
---
 .../flink/runtime/jobmaster/JobMasterTest.java     | 23 +++++++++++-----------
 1 file changed, 11 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 184f5a6..0f384b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -81,6 +81,7 @@ import 
org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -266,18 +267,16 @@ public class JobMasterTest extends TestLogger {
 
                        Throwable userException = (Throwable) 
Class.forName(className, false, userClassLoader).newInstance();
 
-                       CompletableFuture<JobMasterGateway> jobMasterGateway =
-                               rpcService2.connect(jobMaster.getAddress(), 
jobMaster.getFencingToken(), JobMasterGateway.class);
-
-                       jobMasterGateway.thenAccept(gateway -> {
-                               gateway.declineCheckpoint(new DeclineCheckpoint(
-                                               jobGraph.getJobID(),
-                                               new ExecutionAttemptID(1, 1),
-                                               1,
-                                               userException
-                                       )
-                               );
-                       });
+                       JobMasterGateway jobMasterGateway =
+                               rpcService2.connect(jobMaster.getAddress(), 
jobMaster.getFencingToken(), JobMasterGateway.class).get();
+
+                       RpcCheckpointResponder rpcCheckpointResponder = new 
RpcCheckpointResponder(jobMasterGateway);
+                       rpcCheckpointResponder.declineCheckpoint(
+                               jobGraph.getJobID(),
+                               new ExecutionAttemptID(1, 1),
+                               1,
+                               userException
+                       );
 
                        Throwable throwable = 
declineCheckpointMessageFuture.get(testingTimeout.toMilliseconds(),
                                TimeUnit.MILLISECONDS);

Reply via email to