This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa06fbfb78ab9de8234e5be63dd66847bb319fdb Author: Chesnay Schepler <[email protected]> AuthorDate: Mon Mar 21 09:35:39 2022 +0100 [FLINK-26780][coordination] Add option to force RPC serialization --- .../apache/flink/configuration/AkkaOptions.java | 23 ++++++++++++++++++ .../runtime/rpc/akka/AkkaInvocationHandler.java | 12 ++++++++-- .../flink/runtime/rpc/akka/AkkaRpcService.java | 5 ++++ .../rpc/akka/AkkaRpcServiceConfiguration.java | 21 +++++++++++++--- .../rpc/akka/FencedAkkaInvocationHandler.java | 2 ++ .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 2 ++ .../runtime/rpc/akka/MessageSerializationTest.java | 2 ++ .../java/org/apache/flink/runtime/rpc/Local.java | 28 ++++++++++++++++++++++ 8 files changed, 90 insertions(+), 5 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index c5cfa21..49d5544 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -18,18 +18,41 @@ package org.apache.flink.configuration; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.description.Description; import org.apache.flink.util.TimeUtils; import java.time.Duration; import static org.apache.flink.configuration.description.LinkElement.link; +import static org.apache.flink.configuration.description.TextElement.code; /** Akka configuration options. */ @PublicEvolving public class AkkaOptions { + @Internal + @Documentation.ExcludeFromDocumentation("Internal use only") + private static final ConfigOption<Boolean> FORCE_RPC_INVOCATION_SERIALIZATION = + ConfigOptions.key("akka.rpc.force-invocation-serialization") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Forces the serialization of all RPC invocations (that are not explicitly annotated with %s)." + + "This option can be used to find serialization issues in the argument/response types without relying requiring HA setups." + + "This option should not be enabled in production.", + code("org.apache.flink.runtime.rpc.Local")) + .build()); + + public static boolean isForceRpcInvocationSerializationEnabled(Configuration config) { + return config.get(FORCE_RPC_INVOCATION_SERIALIZATION) + || System.getProperties().containsKey(FORCE_RPC_INVOCATION_SERIALIZATION.key()); + } + /** Flag whether to capture call stacks for RPC ask calls. */ public static final ConfigOption<Boolean> CAPTURE_ASK_CALLSTACK = ConfigOptions.key("akka.ask.callstack") diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 5de7159..5f9584d 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; import org.apache.flink.runtime.rpc.FencedRpcGateway; +import org.apache.flink.runtime.rpc.Local; import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcServer; @@ -83,6 +84,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc // whether the actor ref is local and thus no message serialization is needed protected final boolean isLocal; + protected final boolean forceRpcInvocationSerialization; // default timeout for asks private final Time timeout; @@ -100,6 +102,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc ActorRef rpcEndpoint, Time timeout, long maximumFramesize, + boolean forceRpcInvocationSerialization, @Nullable CompletableFuture<Void> terminationFuture, boolean captureAskCallStack, ClassLoader flinkClassLoader) { @@ -111,6 +114,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); this.timeout = Preconditions.checkNotNull(timeout); this.maximumFramesize = maximumFramesize; + this.forceRpcInvocationSerialization = forceRpcInvocationSerialization; this.terminationFuture = terminationFuture; this.captureAskCallStack = captureAskCallStack; } @@ -211,6 +215,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc private Object invokeRpc(Method method, Object[] args) throws Exception { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); + final boolean isLocalRpcInvocation = method.getAnnotation(Local.class) != null; Annotation[][] parameterAnnotations = method.getParameterAnnotations(); Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); @@ -218,6 +223,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc createRpcInvocationMessage( method.getDeclaringClass().getSimpleName(), methodName, + isLocalRpcInvocation, parameterTypes, args); @@ -282,20 +288,22 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc * * @param declaringClassName of the RPC * @param methodName of the RPC + * @param isLocalRpcInvocation whether the RPC must be sent as a local message * @param parameterTypes of the RPC * @param args of the RPC * @return RpcInvocation message which encapsulates the RPC details * @throws IOException if we cannot serialize the RPC invocation parameters */ - protected RpcInvocation createRpcInvocationMessage( + private RpcInvocation createRpcInvocationMessage( final String declaringClassName, final String methodName, + final boolean isLocalRpcInvocation, final Class<?>[] parameterTypes, final Object[] args) throws IOException { final RpcInvocation rpcInvocation; - if (isLocal) { + if (isLocal && (!forceRpcInvocationSerialization || isLocalRpcInvocation)) { rpcInvocation = new LocalRpcInvocation(declaringClassName, methodName, parameterTypes, args); } else { diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index c2b20fb..0f9057d 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -220,6 +220,7 @@ public class AkkaRpcService implements RpcService { actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), + configuration.isForceRpcInvocationSerialization(), null, captureAskCallstacks, flinkClassLoader); @@ -242,6 +243,7 @@ public class AkkaRpcService implements RpcService { actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), + configuration.isForceRpcInvocationSerialization(), null, () -> fencingToken, captureAskCallstacks, @@ -290,6 +292,7 @@ public class AkkaRpcService implements RpcService { actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), + configuration.isForceRpcInvocationSerialization(), actorTerminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken, captureAskCallstacks, @@ -304,6 +307,7 @@ public class AkkaRpcService implements RpcService { actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), + configuration.isForceRpcInvocationSerialization(), actorTerminationFuture, captureAskCallstacks, flinkClassLoader); @@ -379,6 +383,7 @@ public class AkkaRpcService implements RpcService { ((AkkaBasedEndpoint) rpcServer).getActorRef(), configuration.getTimeout(), configuration.getMaximumFramesize(), + configuration.isForceRpcInvocationSerialization(), null, () -> fencingToken, captureAskCallstacks, diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java index a6cabbc..6bb8186 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java @@ -36,17 +36,21 @@ public class AkkaRpcServiceConfiguration { private final boolean captureAskCallStack; - public AkkaRpcServiceConfiguration( + private final boolean forceRpcInvocationSerialization; + + private AkkaRpcServiceConfiguration( @Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize, - boolean captureAskCallStack) { + boolean captureAskCallStack, + boolean forceRpcInvocationSerialization) { checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive."); this.configuration = configuration; this.timeout = timeout; this.maximumFramesize = maximumFramesize; this.captureAskCallStack = captureAskCallStack; + this.forceRpcInvocationSerialization = forceRpcInvocationSerialization; } @Nonnull @@ -67,6 +71,10 @@ public class AkkaRpcServiceConfiguration { return captureAskCallStack; } + public boolean isForceRpcInvocationSerialization() { + return forceRpcInvocationSerialization; + } + public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) { final Time timeout = Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); @@ -74,8 +82,15 @@ public class AkkaRpcServiceConfiguration { final boolean captureAskCallStacks = configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK); + final boolean forceRpcInvocationSerialization = + AkkaOptions.isForceRpcInvocationSerializationEnabled(configuration); + return new AkkaRpcServiceConfiguration( - configuration, timeout, maximumFramesize, captureAskCallStacks); + configuration, + timeout, + maximumFramesize, + captureAskCallStacks, + forceRpcInvocationSerialization); } public static AkkaRpcServiceConfiguration defaultConfiguration() { diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java index 134c247..8ddd41e 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java @@ -62,6 +62,7 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv ActorRef rpcEndpoint, Time timeout, long maximumFramesize, + boolean forceRpcInvocationSerialization, @Nullable CompletableFuture<Void> terminationFuture, Supplier<F> fencingTokenSupplier, boolean captureAskCallStacks, @@ -72,6 +73,7 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv rpcEndpoint, timeout, maximumFramesize, + forceRpcInvocationSerialization, terminationFuture, captureAskCallStacks, flinkClassLoader); diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 29f973e..71cc851 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.Local; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; @@ -1006,6 +1007,7 @@ public class AkkaRpcActorTest extends TestLogger { // ------------------------------------------------------------------------ interface SchedulingRpcEndpointGateway extends RpcGateway { + @Local void schedule( final CompletableFuture<Void> scheduleRunnableFuture, final CompletableFuture<Void> scheduleCallableFuture, diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index 4139fe4..c9aaaf0 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc.akka; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rpc.Local; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; @@ -169,6 +170,7 @@ public class MessageSerializationTest extends TestLogger { } private interface TestGateway extends RpcGateway { + @Local CompletableFuture<Void> foobar(Object object) throws IOException, InterruptedException; } diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/Local.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/Local.java new file mode 100644 index 0000000..e1902a7 --- /dev/null +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/Local.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** Marks an RPC invocation for only being supported as a local invocation. */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface Local {}
