This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc64d2c40762af0d72e532b457ec5c0c15d9e175
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Nov 13 11:42:08 2018 +0100

    [FLINK-10419] Added IT test to check declineCheckpoint invocation
---
 .../org/apache/flink/runtime/rpc/RpcUtils.java     | 27 ++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java     | 76 ++++++++++++++++++++++
 2 files changed, 103 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
index c90a8b5..2f656d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
@@ -19,9 +19,13 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -87,6 +91,29 @@ public class RpcUtils {
                rpcService.stopService().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
        }
 
+       /**
+        * Shuts the given rpc services down and waits for their termination.
+        *
+        * @param rpcServices to shut down
+        * @param timeout for this operation
+        * @throws InterruptedException if the operation has been interrupted
+        * @throws ExecutionException if a problem occurred
+        * @throws TimeoutException if a timeout occurred
+        */
+       public static void terminateRpcServices(
+                       Time timeout,
+                       RpcService... rpcServices) throws InterruptedException, 
ExecutionException, TimeoutException {
+               final Collection<CompletableFuture<?>> terminationFutures = new 
ArrayList<>(rpcServices.length);
+
+               for (RpcService service : rpcServices) {
+                       if (service != null) {
+                               terminationFutures.add(service.stopService());
+                       }
+               }
+
+               
FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+       }
+
        // We don't want this class to be instantiable
        private RpcUtils() {}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index b231b04..5fce5de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -64,26 +65,33 @@ import 
org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMet
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
+import akka.actor.ActorSystem;
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -99,6 +107,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URLClassLoader;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -207,6 +216,73 @@ public class JobMasterTest extends TestLogger {
        }
 
        @Test
+       public void testDeclineCheckpointInvocationWithUserException() throws 
Exception {
+               RpcService rpcService1 = null;
+               RpcService rpcService2 = null;
+               try {
+                       final ActorSystem actorSystem1 = 
AkkaUtils.createDefaultActorSystem();
+                       final ActorSystem actorSystem2 = 
AkkaUtils.createDefaultActorSystem();
+
+                       rpcService1 = new AkkaRpcService(actorSystem1, 
testingTimeout);
+                       rpcService2 = new AkkaRpcService(actorSystem2, 
testingTimeout);
+
+                       final CompletableFuture<Throwable> 
declineCheckpointMessageFuture = new CompletableFuture<>();
+
+                       final JobManagerSharedServices jobManagerSharedServices 
= new TestingJobManagerSharedServicesBuilder().build();
+                       final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
+                       final JobMaster jobMaster = new JobMaster(
+                               rpcService1,
+                               jobMasterConfiguration,
+                               jmResourceId,
+                               jobGraph,
+                               haServices,
+                               
DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1),
+                               jobManagerSharedServices,
+                               heartbeatServices,
+                               blobServer,
+                               
UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+                               new NoOpOnCompletionActions(),
+                               testingFatalErrorHandler,
+                               JobMasterTest.class.getClassLoader()) {
+                               @Override
+                               public void declineCheckpoint(DeclineCheckpoint 
declineCheckpoint) {
+                                       
declineCheckpointMessageFuture.complete(declineCheckpoint.getReason());
+                               }
+                       };
+
+                       jobMaster.start(jobMasterId, testingTimeout).get();
+
+                       final String className = "UserException";
+                       final URLClassLoader userClassLoader = 
ClassLoaderUtils.compileAndLoadJava(
+                               temporaryFolder.newFolder(),
+                               className + ".java",
+                               String.format("public class %s extends 
RuntimeException { public %s() {super(\"UserMessage\");} }",
+                                       className,
+                                       className));
+
+                       Throwable userException = (Throwable) 
Class.forName(className, false, userClassLoader).newInstance();
+
+                       JobMasterGateway jobMasterGateway =
+                               rpcService2.connect(jobMaster.getAddress(), 
jobMaster.getFencingToken(), JobMasterGateway.class).get();
+
+                       RpcCheckpointResponder rpcCheckpointResponder = new 
RpcCheckpointResponder(jobMasterGateway);
+                       rpcCheckpointResponder.declineCheckpoint(
+                               jobGraph.getJobID(),
+                               new ExecutionAttemptID(1, 1),
+                               1,
+                               userException
+                       );
+
+                       Throwable throwable = 
declineCheckpointMessageFuture.get(testingTimeout.toMilliseconds(),
+                               TimeUnit.MILLISECONDS);
+                       assertThat(throwable, 
instanceOf(SerializedThrowable.class));
+                       assertThat(throwable.getMessage(), 
equalTo(userException.getMessage()));
+               } finally {
+                       RpcUtils.terminateRpcServices(testingTimeout, 
rpcService1, rpcService2);
+               }
+       }
+
+       @Test
        public void testHeartbeatTimeoutWithTaskManager() throws Exception {
                final CompletableFuture<ResourceID> heartbeatResourceIdFuture = 
new CompletableFuture<>();
                final CompletableFuture<JobID> disconnectedJobManagerFuture = 
new CompletableFuture<>();

Reply via email to