dawidwys closed pull request #7090: [FLINK-10419] Using DeclineCheckpoint message class when invoking RPC declineCheckpoint URL: https://github.com/apache/flink/pull/7090
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java index 6502eb32942..2bcae452287 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java @@ -23,21 +23,17 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.testutils.ClassLoaderUtils; import org.apache.flink.util.TestLogger; + import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.tools.JavaCompiler; -import javax.tools.ToolProvider; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; public class EnumSerializerUpgradeTest extends TestLogger { @@ -87,7 +83,7 @@ public void checkDifferentFieldOrder() throws Exception { private static CompatibilityResult checkCompatibility(String enumSourceA, String enumSourceB) throws IOException, ClassNotFoundException { - ClassLoader classLoader = compileAndLoadEnum( + ClassLoader classLoader = ClassLoaderUtils.compileAndLoadJava( temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceA); EnumSerializer enumSerializer = new EnumSerializer(classLoader.loadClass(ENUM_NAME)); @@ -102,7 +98,7 @@ private static CompatibilityResult checkCompatibility(String enumSourceA, String snapshotBytes = outBuffer.toByteArray(); } - ClassLoader classLoader2 = compileAndLoadEnum( + ClassLoader classLoader2 = ClassLoaderUtils.compileAndLoadJava( temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceB); TypeSerializerConfigSnapshot restoredSnapshot; @@ -116,29 +112,4 @@ private static CompatibilityResult checkCompatibility(String enumSourceA, String EnumSerializer enumSerializer2 = new EnumSerializer(classLoader2.loadClass(ENUM_NAME)); return enumSerializer2.ensureCompatibility(restoredSnapshot); } - - private static ClassLoader compileAndLoadEnum(File root, String filename, String source) throws IOException { - File file = writeSourceFile(root, filename, source); - - compileClass(file); - - return new URLClassLoader( - new URL[]{root.toURI().toURL()}, - Thread.currentThread().getContextClassLoader()); - } - - private static File writeSourceFile(File root, String filename, String source) throws IOException { - File file = new File(root, filename); - FileWriter fileWriter = new FileWriter(file); - - fileWriter.write(source); - fileWriter.close(); - - return file; - } - - private static int compileClass(File sourceFile) { - JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); - return compiler.run(null, null, null, sourceFile.getPath()); - } } diff --git a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java new file mode 100644 index 00000000000..0688c1df156 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.testutils; + +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; + +/** + * Utilities to create class loaders. + */ +public class ClassLoaderUtils { + public static URLClassLoader compileAndLoadJava(File root, String filename, String source) throws + IOException { + File file = writeSourceFile(root, filename, source); + + compileClass(file); + + return new URLClassLoader( + new URL[]{root.toURI().toURL()}, + Thread.currentThread().getContextClassLoader()); + } + + private static File writeSourceFile(File root, String filename, String source) throws IOException { + File file = new File(root, filename); + FileWriter fileWriter = new FileWriter(file); + + fileWriter.write(source); + fileWriter.close(); + + return file; + } + + private static int compileClass(File sourceFile) { + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + return compiler.run(null, null, null, "-proc:none", sourceFile.getPath()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java index 22244f6cb8d..b8dc5545706 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.rpc.RpcGateway; public interface CheckpointCoordinatorGateway extends RpcGateway { @@ -31,9 +32,5 @@ void acknowledgeCheckpoint( final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot subtaskState); - void declineCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointId, - Throwable cause); + void declineCheckpoint(DeclineCheckpoint declineCheckpoint); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index cd102b32f89..269f23e6781 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -687,13 +687,7 @@ public void acknowledgeCheckpoint( // TODO: This method needs a leader session ID @Override - public void declineCheckpoint( - final JobID jobID, - final ExecutionAttemptID executionAttemptID, - final long checkpointID, - final Throwable reason) { - final DeclineCheckpoint decline = new DeclineCheckpoint( - jobID, executionAttemptID, checkpointID, reason); + public void declineCheckpoint(DeclineCheckpoint decline) { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); if (checkpointCoordinator != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java index c90a8b5bbbc..2f656d09263 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java @@ -19,9 +19,13 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,6 +91,29 @@ public static void terminateRpcService(RpcService rpcService, Time timeout) thro rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } + /** + * Shuts the given rpc services down and waits for their termination. + * + * @param rpcServices to shut down + * @param timeout for this operation + * @throws InterruptedException if the operation has been interrupted + * @throws ExecutionException if a problem occurred + * @throws TimeoutException if a timeout occurred + */ + public static void terminateRpcServices( + Time timeout, + RpcService... rpcServices) throws InterruptedException, ExecutionException, TimeoutException { + final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(rpcServices.length); + + for (RpcService service : rpcServices) { + if (service != null) { + terminationFutures.add(service.stopService()); + } + } + + FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + // We don't want this class to be instantiable private RpcUtils() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java index 7b9fb887670..486816de8e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java @@ -203,12 +203,18 @@ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFound try { parameterTypes[i] = (Class<?>) ois.readObject(); } catch (IOException e) { + StringBuilder incompleteMethod = getIncompleteMethodString(i, 0); throw new IOException("Could not deserialize " + i + "th parameter type of method " + - methodName + '.', e); + incompleteMethod + '.', e); } catch (ClassNotFoundException e) { - throw new ClassNotFoundException("Could not deserialize " + i + "th " + - "parameter type of method " + methodName + ". This indicates that the parameter " + - "type is not part of the system class loader.", e); + // note: wrapping this CNFE into another CNFE does not overwrite the Exception + // stored in the ObjectInputStream (see ObjectInputStream#readSerialData) + // -> add a suppressed exception that adds a more specific message + StringBuilder incompleteMethod = getIncompleteMethodString(i, 0); + e.addSuppressed(new ClassNotFoundException("Could not deserialize " + i + "th " + + "parameter type of method " + incompleteMethod + ". This indicates that the parameter " + + "type is not part of the system class loader.")); + throw e; } } @@ -221,17 +227,37 @@ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFound try { args[i] = ois.readObject(); } catch (IOException e) { + StringBuilder incompleteMethod = getIncompleteMethodString(length, i); throw new IOException("Could not deserialize " + i + "th argument of method " + - methodName + '.', e); + incompleteMethod + '.', e); } catch (ClassNotFoundException e) { - throw new ClassNotFoundException("Could not deserialize " + i + "th " + - "argument of method " + methodName + ". This indicates that the argument " + - "type is not part of the system class loader.", e); + // note: wrapping this CNFE into another CNFE does not overwrite the Exception + // stored in the ObjectInputStream (see ObjectInputStream#readSerialData) + // -> add a suppressed exception that adds a more specific message + StringBuilder incompleteMethod = getIncompleteMethodString(length, i); + e.addSuppressed(new ClassNotFoundException("Could not deserialize " + i + "th " + + "argument of method " + incompleteMethod + ". This indicates that the argument " + + "type is not part of the system class loader.")); + throw e; } } } else { args = null; } } + + private StringBuilder getIncompleteMethodString(int lastMethodTypeIdx, int lastArgumentIdx) { + StringBuilder incompleteMethod = new StringBuilder(); + incompleteMethod.append(methodName).append('('); + for (int i = 0; i < lastMethodTypeIdx; ++i) { + incompleteMethod.append(parameterTypes[i].getCanonicalName()); + if (i < lastArgumentIdx) { + incompleteMethod.append(": ").append(args[i]); + } + incompleteMethod.append(", "); + } + incompleteMethod.append("...)"); // some parameters could not be deserialized + return incompleteMethod; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index c8f7357ab7e..10c9e2f123a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.util.Preconditions; @@ -57,6 +58,9 @@ public void declineCheckpoint( long checkpointId, Throwable cause) { - checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause); + checkpointCoordinatorGateway.declineCheckpoint(new DeclineCheckpoint(jobID, + executionAttemptID, + checkpointId, + cause)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java index c02278c5088..7c664ce0b32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java @@ -19,21 +19,78 @@ package org.apache.flink.runtime.classloading; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; +import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation; +import org.apache.flink.testutils.ClassLoaderUtils; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import java.net.URL; import java.net.URLClassLoader; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.Matchers.hasItemInArray; +import static org.hamcrest.Matchers.hasProperty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; /** - * Tests for classloading and class loder utilities. + * Tests for classloading and class loader utilities. */ public class ClassLoaderTest extends TestLogger { + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testMessageDecodingWithUnavailableClass() throws Exception { + final ClassLoader systemClassLoader = getClass().getClassLoader(); + + final String className = "UserClass"; + final URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava( + temporaryFolder.newFolder(), + className + ".java", + "import java.io.Serializable;\n" + + "public class " + className + " implements Serializable {}"); + + RemoteRpcInvocation method = new RemoteRpcInvocation( + "test", + new Class<?>[] { + int.class, + Class.forName(className, false, userClassLoader)}, + new Object[] { + 1, + Class.forName(className, false, userClassLoader).newInstance()}); + + SerializedValue<RemoteRpcInvocation> serializedMethod = new SerializedValue<>(method); + + expectedException.expect(ClassNotFoundException.class); + expectedException.expect( + allOf( + isA(ClassNotFoundException.class), + hasProperty("suppressed", + hasItemInArray( + allOf( + isA(ClassNotFoundException.class), + hasProperty("message", + containsString("Could not deserialize 1th parameter type of method test(int, ...)."))))))); + + RemoteRpcInvocation deserializedMethod = serializedMethod.deserializeValue(systemClassLoader); + deserializedMethod.getMethodName(); + + userClassLoader.close(); + } + @Test public void testParentFirstClassLoading() throws Exception { final ClassLoader parentClassLoader = getClass().getClassLoader(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index b231b041ead..5fce5deac6e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointProperties; @@ -64,26 +65,33 @@ import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.ClassLoaderUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; +import akka.actor.ActorSystem; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; @@ -99,6 +107,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URLClassLoader; import java.util.Collection; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; @@ -206,6 +215,73 @@ public static void teardownClass() { } } + @Test + public void testDeclineCheckpointInvocationWithUserException() throws Exception { + RpcService rpcService1 = null; + RpcService rpcService2 = null; + try { + final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem(); + final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); + + rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout); + rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout); + + final CompletableFuture<Throwable> declineCheckpointMessageFuture = new CompletableFuture<>(); + + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); + final JobMaster jobMaster = new JobMaster( + rpcService1, + jobMasterConfiguration, + jmResourceId, + jobGraph, + haServices, + DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService1), + jobManagerSharedServices, + heartbeatServices, + blobServer, + UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, + new NoOpOnCompletionActions(), + testingFatalErrorHandler, + JobMasterTest.class.getClassLoader()) { + @Override + public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) { + declineCheckpointMessageFuture.complete(declineCheckpoint.getReason()); + } + }; + + jobMaster.start(jobMasterId, testingTimeout).get(); + + final String className = "UserException"; + final URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava( + temporaryFolder.newFolder(), + className + ".java", + String.format("public class %s extends RuntimeException { public %s() {super(\"UserMessage\");} }", + className, + className)); + + Throwable userException = (Throwable) Class.forName(className, false, userClassLoader).newInstance(); + + JobMasterGateway jobMasterGateway = + rpcService2.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class).get(); + + RpcCheckpointResponder rpcCheckpointResponder = new RpcCheckpointResponder(jobMasterGateway); + rpcCheckpointResponder.declineCheckpoint( + jobGraph.getJobID(), + new ExecutionAttemptID(1, 1), + 1, + userException + ); + + Throwable throwable = declineCheckpointMessageFuture.get(testingTimeout.toMilliseconds(), + TimeUnit.MILLISECONDS); + assertThat(throwable, instanceOf(SerializedThrowable.class)); + assertThat(throwable.getMessage(), equalTo(userException.getMessage())); + } finally { + RpcUtils.terminateRpcServices(testingTimeout, rpcService1, rpcService2); + } + } + @Test public void testHeartbeatTimeoutWithTaskManager() throws Exception { final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index c9c55a1da8f..f5f7f8e3415 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -144,7 +145,7 @@ private final Consumer<Tuple5<JobID, ExecutionAttemptID, Long, CheckpointMetrics, TaskStateSnapshot>> acknowledgeCheckpointConsumer; @Nonnull - private final Consumer<Tuple4<JobID, ExecutionAttemptID, Long, Throwable>> declineCheckpointConsumer; + private final Consumer<DeclineCheckpoint> declineCheckpointConsumer; @Nonnull private final Supplier<JobMasterId> fencingTokenSupplier; @@ -183,7 +184,7 @@ public TestingJobMasterGateway( @Nonnull Function<JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction, @Nonnull BiConsumer<AllocationID, Throwable> notifyAllocationFailureConsumer, @Nonnull Consumer<Tuple5<JobID, ExecutionAttemptID, Long, CheckpointMetrics, TaskStateSnapshot>> acknowledgeCheckpointConsumer, - @Nonnull Consumer<Tuple4<JobID, ExecutionAttemptID, Long, Throwable>> declineCheckpointConsumer, + @Nonnull Consumer<DeclineCheckpoint> declineCheckpointConsumer, @Nonnull Supplier<JobMasterId> fencingTokenSupplier, @Nonnull BiFunction<JobID, String, CompletableFuture<KvStateLocation>> requestKvStateLocationFunction, @Nonnull Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction, @@ -335,8 +336,8 @@ public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttem } @Override - public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, Throwable cause) { - declineCheckpointConsumer.accept(Tuple4.of(jobID, executionAttemptID, checkpointId, cause)); + public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) { + declineCheckpointConsumer.accept(declineCheckpoint); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index e40b752f248..b52df9ee052 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; @@ -96,7 +97,7 @@ private Function<JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction = ignored -> CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(null)); private BiConsumer<AllocationID, Throwable> notifyAllocationFailureConsumer = (ignoredA, ignoredB) -> {}; private Consumer<Tuple5<JobID, ExecutionAttemptID, Long, CheckpointMetrics, TaskStateSnapshot>> acknowledgeCheckpointConsumer = ignored -> {}; - private Consumer<Tuple4<JobID, ExecutionAttemptID, Long, Throwable>> declineCheckpointConsumer = ignored -> {}; + private Consumer<DeclineCheckpoint> declineCheckpointConsumer = ignored -> {}; private Supplier<JobMasterId> fencingTokenSupplier = () -> JOB_MASTER_ID; private BiFunction<JobID, String, CompletableFuture<KvStateLocation>> requestKvStateLocationFunction = (ignoredA, registrationName) -> FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName)); private Function<Tuple6<JobID, JobVertexID, KeyGroupRange, String, KvStateID, InetSocketAddress>, CompletableFuture<Acknowledge>> notifyKvStateRegisteredFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); @@ -222,7 +223,7 @@ public TestingJobMasterGatewayBuilder setAcknowledgeCheckpointConsumer(Consumer< return this; } - public TestingJobMasterGatewayBuilder setDeclineCheckpointConsumer(Consumer<Tuple4<JobID, ExecutionAttemptID, Long, Throwable>> declineCheckpointConsumer) { + public TestingJobMasterGatewayBuilder setDeclineCheckpointConsumer(Consumer<DeclineCheckpoint> declineCheckpointConsumer) { this.declineCheckpointConsumer = declineCheckpointConsumer; return this; } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services