This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e3609257687ddb726a1bbca4c5954b0714d2986a Author: Chesnay Schepler <[email protected]> AuthorDate: Wed Jun 15 13:23:20 2022 +0200 [FLINK-27933][coordination] Fix/document RPC serializability violations - allow tests to opt-out of serializability force - never force serialization in AkkaRpcActorOversizedResponseMessageTest, because it affected the payload size - DispatcherTest no longer relies on retrieving the same object instance via RPC - ResourceManager#requestTaskExecutorThreadInfoGateway is just not serializable; see FLINK-27954 - ClassLoaderITCase wrongly assumed that serialization occurs --- .../apache/flink/configuration/AkkaOptions.java | 9 ++-- .../AkkaRpcActorOversizedResponseMessageTest.java | 2 + .../rpc/akka/ContextClassLoadingSettingTest.java | 2 + .../runtime/resourcemanager/ResourceManager.java | 2 + .../flink/runtime/dispatcher/DispatcherTest.java | 9 ++-- .../runtime/dispatcher/TestingDispatcher.java | 6 +++ .../flink/test/classloading/ClassLoaderITCase.java | 55 +++++++++------------- 7 files changed, 46 insertions(+), 39 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 49d5544c587..f77498d5d5b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -35,7 +35,7 @@ public class AkkaOptions { @Internal @Documentation.ExcludeFromDocumentation("Internal use only") - private static final ConfigOption<Boolean> FORCE_RPC_INVOCATION_SERIALIZATION = + public static final ConfigOption<Boolean> FORCE_RPC_INVOCATION_SERIALIZATION = ConfigOptions.key("akka.rpc.force-invocation-serialization") .booleanType() .defaultValue(false) @@ -49,8 +49,11 @@ public class AkkaOptions { .build()); public static boolean isForceRpcInvocationSerializationEnabled(Configuration config) { - return config.get(FORCE_RPC_INVOCATION_SERIALIZATION) - || System.getProperties().containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key()); + return config.getOptional(FORCE_RPC_INVOCATION_SERIALIZATION) + .orElse( + FORCE_RPC_INVOCATION_SERIALIZATION.defaultValue() + || System.getProperties() + .containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key())); } /** Flag whether to capture call stacks for RPC ask calls. */ diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java index 9a35ccb6510..f66f6ca688d 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java @@ -57,6 +57,8 @@ class AkkaRpcActorOversizedResponseMessageTest { @BeforeAll static void setupClass() throws Exception { final Configuration configuration = new Configuration(); + // some tests explicitly test local communication where no serialization should occur + configuration.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, false); configuration.setString(AkkaOptions.FRAMESIZE, FRAMESIZE + " b"); rpcService1 = diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java index 68c10067f3e..48318d5a36e 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rpc.akka; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.Local; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; @@ -439,6 +440,7 @@ class ContextClassLoadingSettingTest { } @Override + @Local public CompletableFuture<ClassLoader> getContextClassLoader() { return CompletableFuture.completedFuture( Thread.currentThread().getContextClassLoader()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 634b56f1e32..c94d6f60a03 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -60,6 +60,7 @@ import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; +import org.apache.flink.runtime.rpc.Local; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServiceUtils; import org.apache.flink.runtime.security.token.DelegationTokenManager; @@ -818,6 +819,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } @Override + @Local // Bug; see FLINK-27954 public CompletableFuture<TaskExecutorThreadInfoGateway> requestTaskExecutorThreadInfoGateway( ResourceID taskManagerId, Time timeout) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 9db64cd1c68..a51d8bfc8d8 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -557,7 +557,7 @@ public class DispatcherTest extends AbstractDispatcherTest { .deserializeError(ClassLoader.getSystemClassLoader()); // ensure correct exception type - assertThat(throwable, is(testFailure)); + assertThat(throwable.getMessage(), equalTo(testFailure.getMessage())); } /** Test that {@link JobResult} is cached when the job finishes. */ @@ -590,9 +590,10 @@ public class DispatcherTest extends AbstractDispatcherTest { assertThat( dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(), equalTo(expectedState)); - assertThat( - dispatcherGateway.requestExecutionGraphInfo(failedJobId, TIMEOUT).get(), - equalTo(failedExecutionGraphInfo)); + final CompletableFuture<ExecutionGraphInfo> completableFutureCompletableFuture = + dispatcher.callAsyncInMainThread( + () -> dispatcher.requestExecutionGraphInfo(failedJobId, TIMEOUT)); + assertThat(completableFutureCompletableFuture.get(), is(failedExecutionGraphInfo)); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java index 90b31e60c17..ae003a0bc40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TimeUtils; @@ -52,6 +53,7 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -140,6 +142,10 @@ class TestingDispatcher extends Dispatcher { }); } + <T> CompletableFuture<T> callAsyncInMainThread(Callable<CompletableFuture<T>> callable) { + return callAsync(callable, TestingUtils.TESTING_DURATION).thenCompose(Function.identity()); + } + CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) { return callAsync(() -> getJobTerminationFuture(jobId), TimeUtils.toDuration(timeout)) .thenCompose(Function.identity()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index e7ddb0663ce..ee81ee5f689 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.MiniClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; @@ -31,6 +32,7 @@ import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.testutils.MiniClusterResource; @@ -40,6 +42,7 @@ import org.apache.flink.test.testdata.KMeansData; import org.apache.flink.test.util.SuccessException; import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -56,7 +59,6 @@ import java.io.IOException; import java.net.URL; import java.util.Collection; import java.util.Collections; -import java.util.Optional; import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Deadline; @@ -65,6 +67,7 @@ import scala.concurrent.duration.FiniteDuration; import static org.apache.flink.changelog.fs.FsStateChangelogOptions.BASE_PATH; import static org.apache.flink.changelog.fs.FsStateChangelogStorageFactory.IDENTIFIER; import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -129,6 +132,10 @@ public class ClassLoaderITCase extends TestLogger { config.setString(STATE_CHANGE_LOG_STORAGE, IDENTIFIER); config.setString(BASE_PATH, FOLDER.newFolder().getAbsolutePath()); + // some tests check for serialization problems related to class-loading + // this requires all RPCs to actually go through serialization + config.setBoolean(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, true); + miniClusterResource = new MiniClusterResource( new MiniClusterResourceConfiguration.Builder() @@ -238,37 +245,21 @@ public class ClassLoaderITCase extends TestLogger { Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList()); - try { - streamingCheckpointedProg.invokeInteractiveModeForExecution(); - } catch (Exception e) { - // Program should terminate with a 'SuccessException': - // the exception class is contained in the user-jar, but is not present on the maven - // classpath - // the deserialization of the exception should thus fail here - Optional<Throwable> exception = - ExceptionUtils.findThrowable( - e, - candidate -> - candidate - .getClass() - .getName() - .equals( - "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException")); - - if (!exception.isPresent()) { - // if this is achieved, either we failed due to another exception or the - // user-specific - // exception is not serialized between JobManager and JobClient. - throw e; - } - - try { - Class.forName(exception.get().getClass().getName()); - fail("Deserialization of user exception should have failed."); - } catch (ClassNotFoundException expected) { - // expected - } - } + // sanity check that the exception from the user-jar is not on the classpath + assertThatThrownBy( + () -> + Class.forName( + "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException")) + .isInstanceOf(ClassNotFoundException.class); + + // Program should terminate with a 'SuccessException' + // the exception should be contained in a SerializedThrowable, which failed to deserialize + // the original exception because it is only contained in the user-jar + assertThatThrownBy(() -> streamingCheckpointedProg.invokeInteractiveModeForExecution()) + .satisfies( + FlinkAssertions.anyCauseMatches( + SerializedThrowable.class, + "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException")); } @Test
