This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
commit cc64d2c40762af0d72e532b457ec5c0c15d9e175 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Nov 13 11:42:08 2018 +0100 [FLINK-10419] Added IT test to check declineCheckpoint invocation --- .../org/apache/flink/runtime/rpc/RpcUtils.java | 27 ++++++++ .../flink/runtime/jobmaster/JobMasterTest.java | 76 ++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java index c90a8b5..2f656d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java @@ -19,9 +19,13 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,6 +91,29 @@ public class RpcUtils { rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } + /** + * Shuts the given rpc services down and waits for their termination. + * + * @param rpcServices to shut down + * @param timeout for this operation + * @throws InterruptedException if the operation has been interrupted + * @throws ExecutionException if a problem occurred + * @throws TimeoutException if a timeout occurred + */ + public static void terminateRpcServices( + Time timeout, + RpcService... rpcServices) throws InterruptedException, ExecutionException, TimeoutException { + final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(rpcServices.length); + + for (RpcService service : rpcServices) { + if (service != null) { + terminationFutures.add(service.stopService()); + } + } + + FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + // We don't want this class to be instantiable private RpcUtils() {} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index b231b04..5fce5de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointProperties; @@ -64,26 +65,33 @@ import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMet import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.ClassLoaderUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; +import akka.actor.ActorSystem; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; @@ -99,6 +107,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URLClassLoader; import java.util.Collection; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; @@ -207,6 +216,73 @@ public class JobMasterTest extends TestLogger { } @Test + public void testDeclineCheckpointInvocationWithUserException() throws Exception { + RpcService rpcService1 = null; + RpcService rpcService2 = null; + try { + final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem(); + final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); + + rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout); + rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout); + + final CompletableFuture<Throwable> declineCheckpointMessageFuture = new CompletableFuture<>(); + + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); + final JobMaster jobMaster = new JobMaster( + rpcService1, + jobMasterConfiguration, + jmResourceId, + jobGraph, + haServices, + DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1), + jobManagerSharedServices, + heartbeatServices, + blobServer, + UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, + new NoOpOnCompletionActions(), + testingFatalErrorHandler, + JobMasterTest.class.getClassLoader()) { + @Override + public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) { + declineCheckpointMessageFuture.complete(declineCheckpoint.getReason()); + } + }; + + jobMaster.start(jobMasterId, testingTimeout).get(); + + final String className = "UserException"; + final URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava( + temporaryFolder.newFolder(), + className + ".java", + String.format("public class %s extends RuntimeException { public %s() {super(\"UserMessage\");} }", + className, + className)); + + Throwable userException = (Throwable) Class.forName(className, false, userClassLoader).newInstance(); + + JobMasterGateway jobMasterGateway = + rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class).get(); + + RpcCheckpointResponder rpcCheckpointResponder = new RpcCheckpointResponder(jobMasterGateway); + rpcCheckpointResponder.declineCheckpoint( + jobGraph.getJobID(), + new ExecutionAttemptID(1, 1), + 1, + userException + ); + + Throwable throwable = declineCheckpointMessageFuture.get(testingTimeout.toMilliseconds(), + TimeUnit.MILLISECONDS); + assertThat(throwable, instanceOf(SerializedThrowable.class)); + assertThat(throwable.getMessage(), equalTo(userException.getMessage())); + } finally { + RpcUtils.terminateRpcServices(testingTimeout, rpcService1, rpcService2); + } + } + + @Test public void testHeartbeatTimeoutWithTaskManager() throws Exception { final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>(); final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();