This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa06fbfb78ab9de8234e5be63dd66847bb319fdb
Author: Chesnay Schepler <[email protected]>
AuthorDate: Mon Mar 21 09:35:39 2022 +0100

    [FLINK-26780][coordination] Add option to force RPC serialization
---
 .../apache/flink/configuration/AkkaOptions.java    | 23 ++++++++++++++++++
 .../runtime/rpc/akka/AkkaInvocationHandler.java    | 12 ++++++++--
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  5 ++++
 .../rpc/akka/AkkaRpcServiceConfiguration.java      | 21 +++++++++++++---
 .../rpc/akka/FencedAkkaInvocationHandler.java      |  2 ++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  2 ++
 .../runtime/rpc/akka/MessageSerializationTest.java |  2 ++
 .../java/org/apache/flink/runtime/rpc/Local.java   | 28 ++++++++++++++++++++++
 8 files changed, 90 insertions(+), 5 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index c5cfa21..49d5544 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -18,18 +18,41 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.util.TimeUtils;
 
 import java.time.Duration;
 
 import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
 
 /** Akka configuration options. */
 @PublicEvolving
 public class AkkaOptions {
 
+    @Internal
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    private static final ConfigOption<Boolean> 
FORCE_RPC_INVOCATION_SERIALIZATION =
+            ConfigOptions.key("akka.rpc.force-invocation-serialization")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Forces the serialization of all 
RPC invocations (that are not explicitly annotated with %s)."
+                                                    + "This option can be used 
to find serialization issues in the argument/response types without relying 
requiring HA setups."
+                                                    + "This option should not 
be enabled in production.",
+                                            
code("org.apache.flink.runtime.rpc.Local"))
+                                    .build());
+
+    public static boolean 
isForceRpcInvocationSerializationEnabled(Configuration config) {
+        return config.get(FORCE_RPC_INVOCATION_SERIALIZATION)
+                || 
System.getProperties().containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key());
+    }
+
     /** Flag whether to capture call stacks for RPC ask calls. */
     public static final ConfigOption<Boolean> CAPTURE_ASK_CALLSTACK =
             ConfigOptions.key("akka.ask.callstack")
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 5de7159..5f9584d 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
+import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcServer;
@@ -83,6 +84,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
 
     // whether the actor ref is local and thus no message serialization is 
needed
     protected final boolean isLocal;
+    protected final boolean forceRpcInvocationSerialization;
 
     // default timeout for asks
     private final Time timeout;
@@ -100,6 +102,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
             ActorRef rpcEndpoint,
             Time timeout,
             long maximumFramesize,
+            boolean forceRpcInvocationSerialization,
             @Nullable CompletableFuture<Void> terminationFuture,
             boolean captureAskCallStack,
             ClassLoader flinkClassLoader) {
@@ -111,6 +114,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
         this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
         this.timeout = Preconditions.checkNotNull(timeout);
         this.maximumFramesize = maximumFramesize;
+        this.forceRpcInvocationSerialization = forceRpcInvocationSerialization;
         this.terminationFuture = terminationFuture;
         this.captureAskCallStack = captureAskCallStack;
     }
@@ -211,6 +215,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
     private Object invokeRpc(Method method, Object[] args) throws Exception {
         String methodName = method.getName();
         Class<?>[] parameterTypes = method.getParameterTypes();
+        final boolean isLocalRpcInvocation = method.getAnnotation(Local.class) 
!= null;
         Annotation[][] parameterAnnotations = method.getParameterAnnotations();
         Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, 
timeout);
 
@@ -218,6 +223,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
                 createRpcInvocationMessage(
                         method.getDeclaringClass().getSimpleName(),
                         methodName,
+                        isLocalRpcInvocation,
                         parameterTypes,
                         args);
 
@@ -282,20 +288,22 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
      *
      * @param declaringClassName of the RPC
      * @param methodName of the RPC
+     * @param isLocalRpcInvocation whether the RPC must be sent as a local 
message
      * @param parameterTypes of the RPC
      * @param args of the RPC
      * @return RpcInvocation message which encapsulates the RPC details
      * @throws IOException if we cannot serialize the RPC invocation parameters
      */
-    protected RpcInvocation createRpcInvocationMessage(
+    private RpcInvocation createRpcInvocationMessage(
             final String declaringClassName,
             final String methodName,
+            final boolean isLocalRpcInvocation,
             final Class<?>[] parameterTypes,
             final Object[] args)
             throws IOException {
         final RpcInvocation rpcInvocation;
 
-        if (isLocal) {
+        if (isLocal && (!forceRpcInvocationSerialization || 
isLocalRpcInvocation)) {
             rpcInvocation =
                     new LocalRpcInvocation(declaringClassName, methodName, 
parameterTypes, args);
         } else {
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index c2b20fb..0f9057d 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -220,6 +220,7 @@ public class AkkaRpcService implements RpcService {
                             actorRef,
                             configuration.getTimeout(),
                             configuration.getMaximumFramesize(),
+                            configuration.isForceRpcInvocationSerialization(),
                             null,
                             captureAskCallstacks,
                             flinkClassLoader);
@@ -242,6 +243,7 @@ public class AkkaRpcService implements RpcService {
                             actorRef,
                             configuration.getTimeout(),
                             configuration.getMaximumFramesize(),
+                            configuration.isForceRpcInvocationSerialization(),
                             null,
                             () -> fencingToken,
                             captureAskCallstacks,
@@ -290,6 +292,7 @@ public class AkkaRpcService implements RpcService {
                             actorRef,
                             configuration.getTimeout(),
                             configuration.getMaximumFramesize(),
+                            configuration.isForceRpcInvocationSerialization(),
                             actorTerminationFuture,
                             ((FencedRpcEndpoint<?>) 
rpcEndpoint)::getFencingToken,
                             captureAskCallstacks,
@@ -304,6 +307,7 @@ public class AkkaRpcService implements RpcService {
                             actorRef,
                             configuration.getTimeout(),
                             configuration.getMaximumFramesize(),
+                            configuration.isForceRpcInvocationSerialization(),
                             actorTerminationFuture,
                             captureAskCallstacks,
                             flinkClassLoader);
@@ -379,6 +383,7 @@ public class AkkaRpcService implements RpcService {
                             ((AkkaBasedEndpoint) rpcServer).getActorRef(),
                             configuration.getTimeout(),
                             configuration.getMaximumFramesize(),
+                            configuration.isForceRpcInvocationSerialization(),
                             null,
                             () -> fencingToken,
                             captureAskCallstacks,
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
index a6cabbc..6bb8186 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -36,17 +36,21 @@ public class AkkaRpcServiceConfiguration {
 
     private final boolean captureAskCallStack;
 
-    public AkkaRpcServiceConfiguration(
+    private final boolean forceRpcInvocationSerialization;
+
+    private AkkaRpcServiceConfiguration(
             @Nonnull Configuration configuration,
             @Nonnull Time timeout,
             long maximumFramesize,
-            boolean captureAskCallStack) {
+            boolean captureAskCallStack,
+            boolean forceRpcInvocationSerialization) {
 
         checkArgument(maximumFramesize > 0L, "Maximum framesize must be 
positive.");
         this.configuration = configuration;
         this.timeout = timeout;
         this.maximumFramesize = maximumFramesize;
         this.captureAskCallStack = captureAskCallStack;
+        this.forceRpcInvocationSerialization = forceRpcInvocationSerialization;
     }
 
     @Nonnull
@@ -67,6 +71,10 @@ public class AkkaRpcServiceConfiguration {
         return captureAskCallStack;
     }
 
+    public boolean isForceRpcInvocationSerialization() {
+        return forceRpcInvocationSerialization;
+    }
+
     public static AkkaRpcServiceConfiguration fromConfiguration(Configuration 
configuration) {
         final Time timeout = 
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
@@ -74,8 +82,15 @@ public class AkkaRpcServiceConfiguration {
 
         final boolean captureAskCallStacks = 
configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK);
 
+        final boolean forceRpcInvocationSerialization =
+                
AkkaOptions.isForceRpcInvocationSerializationEnabled(configuration);
+
         return new AkkaRpcServiceConfiguration(
-                configuration, timeout, maximumFramesize, 
captureAskCallStacks);
+                configuration,
+                timeout,
+                maximumFramesize,
+                captureAskCallStacks,
+                forceRpcInvocationSerialization);
     }
 
     public static AkkaRpcServiceConfiguration defaultConfiguration() {
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
index 134c247..8ddd41e 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -62,6 +62,7 @@ public class FencedAkkaInvocationHandler<F extends 
Serializable> extends AkkaInv
             ActorRef rpcEndpoint,
             Time timeout,
             long maximumFramesize,
+            boolean forceRpcInvocationSerialization,
             @Nullable CompletableFuture<Void> terminationFuture,
             Supplier<F> fencingTokenSupplier,
             boolean captureAskCallStacks,
@@ -72,6 +73,7 @@ public class FencedAkkaInvocationHandler<F extends 
Serializable> extends AkkaInv
                 rpcEndpoint,
                 timeout,
                 maximumFramesize,
+                forceRpcInvocationSerialization,
                 terminationFuture,
                 captureAskCallStacks,
                 flinkClassLoader);
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 29f973e..71cc851 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
+import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -1006,6 +1007,7 @@ public class AkkaRpcActorTest extends TestLogger {
     // ------------------------------------------------------------------------
 
     interface SchedulingRpcEndpointGateway extends RpcGateway {
+        @Local
         void schedule(
                 final CompletableFuture<Void> scheduleRunnableFuture,
                 final CompletableFuture<Void> scheduleCallableFuture,
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 4139fe4..c9aaaf0 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc.akka;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -169,6 +170,7 @@ public class MessageSerializationTest extends TestLogger {
     }
 
     private interface TestGateway extends RpcGateway {
+        @Local
         CompletableFuture<Void> foobar(Object object) throws IOException, 
InterruptedException;
     }
 
diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/Local.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/Local.java
new file mode 100644
index 0000000..e1902a7
--- /dev/null
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/Local.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** Marks an RPC invocation for only being supported as a local invocation. */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Local {}

Reply via email to