This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push: new af3d3ef [FLINK-10419] Call JobMasterGateway through RpcCheckpointResponder in test af3d3ef is described below commit af3d3ef778998dfe96dbe6d5a0ceaf887b7c00f4 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Nov 14 13:24:52 2018 +0100 [FLINK-10419] Call JobMasterGateway through RpcCheckpointResponder in test --- .../flink/runtime/jobmaster/JobMasterTest.java | 23 +++++++++++----------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 184f5a6..0f384b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -81,6 +81,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -266,18 +267,16 @@ public class JobMasterTest extends TestLogger { Throwable userException = (Throwable) Class.forName(className, false, userClassLoader).newInstance(); - CompletableFuture<JobMasterGateway> jobMasterGateway = - rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class); - - jobMasterGateway.thenAccept(gateway -> { - gateway.declineCheckpoint(new DeclineCheckpoint( - jobGraph.getJobID(), - new ExecutionAttemptID(1, 1), - 1, - userException - ) - ); - }); + JobMasterGateway jobMasterGateway = + rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class).get(); + + RpcCheckpointResponder rpcCheckpointResponder = new RpcCheckpointResponder(jobMasterGateway); + rpcCheckpointResponder.declineCheckpoint( + jobGraph.getJobID(), + new ExecutionAttemptID(1, 1), + 1, + userException + ); Throwable throwable = declineCheckpointMessageFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);