This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3609257687ddb726a1bbca4c5954b0714d2986a
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jun 15 13:23:20 2022 +0200

    [FLINK-27933][coordination] Fix/document RPC serializability violations
    
    - allow tests to opt-out of serializability force
    - never force serialization in AkkaRpcActorOversizedResponseMessageTest, 
because it affected the payload size
    - DispatcherTest no longer relies on retrieving the same object instance 
via RPC
    - ResourceManager#requestTaskExecutorThreadInfoGateway is just not 
serializable; see FLINK-27954
    - ClassLoaderITCase wrongly assumed that serialization occurs
---
 .../apache/flink/configuration/AkkaOptions.java    |  9 ++--
 .../AkkaRpcActorOversizedResponseMessageTest.java  |  2 +
 .../rpc/akka/ContextClassLoadingSettingTest.java   |  2 +
 .../runtime/resourcemanager/ResourceManager.java   |  2 +
 .../flink/runtime/dispatcher/DispatcherTest.java   |  9 ++--
 .../runtime/dispatcher/TestingDispatcher.java      |  6 +++
 .../flink/test/classloading/ClassLoaderITCase.java | 55 +++++++++-------------
 7 files changed, 46 insertions(+), 39 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 49d5544c587..f77498d5d5b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -35,7 +35,7 @@ public class AkkaOptions {
 
     @Internal
     @Documentation.ExcludeFromDocumentation("Internal use only")
-    private static final ConfigOption<Boolean> 
FORCE_RPC_INVOCATION_SERIALIZATION =
+    public static final ConfigOption<Boolean> 
FORCE_RPC_INVOCATION_SERIALIZATION =
             ConfigOptions.key("akka.rpc.force-invocation-serialization")
                     .booleanType()
                     .defaultValue(false)
@@ -49,8 +49,11 @@ public class AkkaOptions {
                                     .build());
 
     public static boolean 
isForceRpcInvocationSerializationEnabled(Configuration config) {
-        return config.get(FORCE_RPC_INVOCATION_SERIALIZATION)
-                || 
System.getProperties().containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key());
+        return config.getOptional(FORCE_RPC_INVOCATION_SERIALIZATION)
+                .orElse(
+                        FORCE_RPC_INVOCATION_SERIALIZATION.defaultValue()
+                                || System.getProperties()
+                                        
.containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key()));
     }
 
     /** Flag whether to capture call stacks for RPC ask calls. */
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
index 9a35ccb6510..f66f6ca688d 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
@@ -57,6 +57,8 @@ class AkkaRpcActorOversizedResponseMessageTest {
     @BeforeAll
     static void setupClass() throws Exception {
         final Configuration configuration = new Configuration();
+        // some tests explicitly test local communication where no 
serialization should occur
+        configuration.set(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, 
false);
         configuration.setString(AkkaOptions.FRAMESIZE, FRAMESIZE + " b");
 
         rpcService1 =
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
index 68c10067f3e..48318d5a36e 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
+import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -439,6 +440,7 @@ class ContextClassLoadingSettingTest {
         }
 
         @Override
+        @Local
         public CompletableFuture<ClassLoader> getContextClassLoader() {
             return CompletableFuture.completedFuture(
                     Thread.currentThread().getContextClassLoader());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 634b56f1e32..c94d6f60a03 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
@@ -818,6 +819,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
     }
 
     @Override
+    @Local // Bug; see FLINK-27954
     public CompletableFuture<TaskExecutorThreadInfoGateway> 
requestTaskExecutorThreadInfoGateway(
             ResourceID taskManagerId, Time timeout) {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 9db64cd1c68..a51d8bfc8d8 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -557,7 +557,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
                         .deserializeError(ClassLoader.getSystemClassLoader());
 
         // ensure correct exception type
-        assertThat(throwable, is(testFailure));
+        assertThat(throwable.getMessage(), equalTo(testFailure.getMessage()));
     }
 
     /** Test that {@link JobResult} is cached when the job finishes. */
@@ -590,9 +590,10 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
         assertThat(
                 dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(),
                 equalTo(expectedState));
-        assertThat(
-                dispatcherGateway.requestExecutionGraphInfo(failedJobId, 
TIMEOUT).get(),
-                equalTo(failedExecutionGraphInfo));
+        final CompletableFuture<ExecutionGraphInfo> 
completableFutureCompletableFuture =
+                dispatcher.callAsyncInMainThread(
+                        () -> 
dispatcher.requestExecutionGraphInfo(failedJobId, TIMEOUT));
+        assertThat(completableFutureCompletableFuture.get(), 
is(failedExecutionGraphInfo));
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 90b31e60c17..ae003a0bc40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TimeUtils;
 
@@ -52,6 +53,7 @@ import javax.annotation.Nullable;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -140,6 +142,10 @@ class TestingDispatcher extends Dispatcher {
                 });
     }
 
+    <T> CompletableFuture<T> 
callAsyncInMainThread(Callable<CompletableFuture<T>> callable) {
+        return callAsync(callable, 
TestingUtils.TESTING_DURATION).thenCompose(Function.identity());
+    }
+
     CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId, 
@Nonnull Time timeout) {
         return callAsync(() -> getJobTerminationFuture(jobId), 
TimeUtils.toDuration(timeout))
                 .thenCompose(Function.identity());
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index e7ddb0663ce..ee81ee5f689 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
@@ -31,6 +32,7 @@ import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
@@ -40,6 +42,7 @@ import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -56,7 +59,6 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.Deadline;
@@ -65,6 +67,7 @@ import scala.concurrent.duration.FiniteDuration;
 import static org.apache.flink.changelog.fs.FsStateChangelogOptions.BASE_PATH;
 import static 
org.apache.flink.changelog.fs.FsStateChangelogStorageFactory.IDENTIFIER;
 import static 
org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -129,6 +132,10 @@ public class ClassLoaderITCase extends TestLogger {
         config.setString(STATE_CHANGE_LOG_STORAGE, IDENTIFIER);
         config.setString(BASE_PATH, FOLDER.newFolder().getAbsolutePath());
 
+        // some tests check for serialization problems related to class-loading
+        // this requires all RPCs to actually go through serialization
+        config.setBoolean(AkkaOptions.FORCE_RPC_INVOCATION_SERIALIZATION, 
true);
+
         miniClusterResource =
                 new MiniClusterResource(
                         new MiniClusterResourceConfiguration.Builder()
@@ -238,37 +245,21 @@ public class ClassLoaderITCase extends TestLogger {
                 Collections.singleton(new 
Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
                 Collections.emptyList());
 
-        try {
-            streamingCheckpointedProg.invokeInteractiveModeForExecution();
-        } catch (Exception e) {
-            // Program should terminate with a 'SuccessException':
-            // the exception class is contained in the user-jar, but is not 
present on the maven
-            // classpath
-            // the deserialization of the exception should thus fail here
-            Optional<Throwable> exception =
-                    ExceptionUtils.findThrowable(
-                            e,
-                            candidate ->
-                                    candidate
-                                            .getClass()
-                                            .getName()
-                                            .equals(
-                                                    
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException"));
-
-            if (!exception.isPresent()) {
-                // if this is achieved, either we failed due to another 
exception or the
-                // user-specific
-                // exception is not serialized between JobManager and 
JobClient.
-                throw e;
-            }
-
-            try {
-                Class.forName(exception.get().getClass().getName());
-                fail("Deserialization of user exception should have failed.");
-            } catch (ClassNotFoundException expected) {
-                // expected
-            }
-        }
+        // sanity check that the exception from the user-jar is not on the 
classpath
+        assertThatThrownBy(
+                        () ->
+                                Class.forName(
+                                        
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException"))
+                .isInstanceOf(ClassNotFoundException.class);
+
+        // Program should terminate with a 'SuccessException'
+        // the exception should be contained in a SerializedThrowable, which 
failed to deserialize
+        // the original exception because it is only contained in the user-jar
+        assertThatThrownBy(() -> 
streamingCheckpointedProg.invokeInteractiveModeForExecution())
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SerializedThrowable.class,
+                                
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException"));
     }
 
     @Test

Reply via email to