This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c8c790f6d726fd6942a2569dd97ed7a116092c4e Author: Kezhu Wang <kez...@gmail.com> AuthorDate: Sat Feb 13 00:06:53 2021 +0800 [FLINK-20580][rpc] Separate wire value class from user values --- .../runtime/rpc/akka/AkkaInvocationHandler.java | 5 +- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 18 ++- .../runtime/rpc/akka/AkkaRpcSerializedValue.java | 88 +++++++++++++++ .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 60 ++++++++++ .../rpc/akka/AkkaRpcSerializedValueTest.java | 125 +++++++++++++++++++++ .../runtime/rpc/akka/RemoteAkkaRpcActorTest.java | 28 +++++ 6 files changed, 311 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index a6d6a4b..5ec3f1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.rpc.messages.RpcInvocation; import org.apache.flink.runtime.rpc.messages.RunAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedValue; import akka.actor.ActorRef; import akka.pattern.Patterns; @@ -389,9 +388,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc } static Object deserializeValueIfNeeded(Object o, Method method) { - if (o instanceof SerializedValue) { + if (o instanceof AkkaRpcSerializedValue) { try { - return ((SerializedValue<?>) o) + return ((AkkaRpcSerializedValue) o) .deserializeValue(AkkaInvocationHandler.class.getClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new CompletionException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 08fe9b5..b0a2a82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -36,7 +36,6 @@ import org.apache.flink.runtime.rpc.messages.RunAsync; import org.apache.flink.types.Either; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedValue; import akka.actor.AbstractActor; import akka.actor.ActorRef; @@ -332,7 +331,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { private void sendSyncResponse(Object response, String methodName) { if (isRemoteSender(getSender())) { - Either<SerializedValue<?>, AkkaRpcException> serializedResult = + Either<AkkaRpcSerializedValue, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(response, methodName); if (serializedResult.isLeft()) { @@ -356,8 +355,10 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { promise.failure(throwable); } else { if (isRemoteSender(sender)) { - Either<SerializedValue<?>, AkkaRpcException> serializedResult = - serializeRemoteResultAndVerifySize(value, methodName); + Either<AkkaRpcSerializedValue, AkkaRpcException> + serializedResult = + serializeRemoteResultAndVerifySize( + value, methodName); if (serializedResult.isLeft()) { promise.success(serializedResult.left()); @@ -380,15 +381,12 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { return !sender.path().address().hasLocalScope(); } - private Either<SerializedValue<?>, AkkaRpcException> serializeRemoteResultAndVerifySize( + private Either<AkkaRpcSerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize( Object result, String methodName) { try { - SerializedValue<?> serializedResult = new SerializedValue<>(result); + AkkaRpcSerializedValue serializedResult = AkkaRpcSerializedValue.valueOf(result); - long resultSize = - serializedResult.getByteArray() == null - ? 0 - : serializedResult.getByteArray().length; + long resultSize = serializedResult.getSerializedDataLength(); if (resultSize > maximumFramesize) { return Either.Right( new AkkaRpcException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValue.java new file mode 100644 index 0000000..fb8011c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValue.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +/** A self-contained serialized value to decouple from user values and transfer on wire. */ +final class AkkaRpcSerializedValue implements Serializable { + private static final long serialVersionUID = -4388571068440835689L; + + @Nullable private final byte[] serializedData; + + private AkkaRpcSerializedValue(@Nullable byte[] serializedData) { + this.serializedData = serializedData; + } + + @Nullable + public byte[] getSerializedData() { + return serializedData; + } + + /** Return length of serialized data, zero if no serialized data. */ + public int getSerializedDataLength() { + return serializedData == null ? 0 : serializedData.length; + } + + @Nullable + public <T> T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException { + Preconditions.checkNotNull(loader, "No classloader has been passed"); + return serializedData == null + ? null + : InstantiationUtil.deserializeObject(serializedData, loader); + } + + /** + * Construct a serialized value to transfer on wire. + * + * @param value nullable value + * @return serialized value to transfer on wire + * @throws IOException exception during value serialization + */ + public static AkkaRpcSerializedValue valueOf(@Nullable Object value) throws IOException { + byte[] serializedData = value == null ? null : InstantiationUtil.serializeObject(value); + return new AkkaRpcSerializedValue(serializedData); + } + + @Override + public boolean equals(Object o) { + if (o instanceof AkkaRpcSerializedValue) { + AkkaRpcSerializedValue other = (AkkaRpcSerializedValue) o; + return Arrays.equals(serializedData, other.serializedData); + } + return false; + } + + @Override + public int hashCode() { + return Arrays.hashCode(serializedData); + } + + @Override + public String toString() { + return serializedData == null ? "AkkaRpcSerializedValue(null)" : "AkkaRpcSerializedValue"; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 689e14f..c833027 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import akka.actor.ActorRef; @@ -47,6 +48,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -535,6 +538,28 @@ public class AkkaRpcActorTest extends TestLogger { } } + @Test + public void canRespondWithSerializedValueLocally() throws Exception { + try (final SerializedValueRespondingEndpoint endpoint = + new SerializedValueRespondingEndpoint(akkaRpcService)) { + endpoint.start(); + + final SerializedValueRespondingGateway selfGateway = + endpoint.getSelfGateway(SerializedValueRespondingGateway.class); + + assertThat( + selfGateway.getSerializedValueSynchronously(), + equalTo(SerializedValueRespondingEndpoint.SERIALIZED_VALUE)); + + final CompletableFuture<SerializedValue<String>> responseFuture = + selfGateway.getSerializedValue(); + + assertThat( + responseFuture.get(), + equalTo(SerializedValueRespondingEndpoint.SERIALIZED_VALUE)); + } + } + // ------------------------------------------------------------------------ // Test Actors and Interfaces // ------------------------------------------------------------------------ @@ -586,6 +611,41 @@ public class AkkaRpcActorTest extends TestLogger { // ------------------------------------------------------------------------ + interface SerializedValueRespondingGateway extends RpcGateway { + CompletableFuture<SerializedValue<String>> getSerializedValue(); + + SerializedValue<String> getSerializedValueSynchronously(); + } + + static class SerializedValueRespondingEndpoint extends RpcEndpoint + implements SerializedValueRespondingGateway { + static final SerializedValue<String> SERIALIZED_VALUE; + + static { + try { + SERIALIZED_VALUE = new SerializedValue<>("string-value"); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public SerializedValueRespondingEndpoint(RpcService rpcService) { + super(rpcService); + } + + @Override + public CompletableFuture<SerializedValue<String>> getSerializedValue() { + return CompletableFuture.completedFuture(SERIALIZED_VALUE); + } + + @Override + public SerializedValue<String> getSerializedValueSynchronously() { + return SERIALIZED_VALUE; + } + } + + // ------------------------------------------------------------------------ + private interface ExceptionalGateway extends RpcGateway { CompletableFuture<Integer> doStuff(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValueTest.java new file mode 100644 index 0000000..70263a6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSerializedValueTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +/** Tests for the {@link AkkaRpcSerializedValue}. */ +public class AkkaRpcSerializedValueTest extends TestLogger { + + @Test + public void testNullValue() throws Exception { + AkkaRpcSerializedValue serializedValue = AkkaRpcSerializedValue.valueOf(null); + assertThat(serializedValue.getSerializedData(), nullValue()); + assertThat(serializedValue.getSerializedDataLength(), equalTo(0)); + assertThat(serializedValue.deserializeValue(getClass().getClassLoader()), nullValue()); + + AkkaRpcSerializedValue otherSerializedValue = AkkaRpcSerializedValue.valueOf(null); + assertThat(otherSerializedValue, equalTo(serializedValue)); + assertThat(otherSerializedValue.hashCode(), equalTo(serializedValue.hashCode())); + + AkkaRpcSerializedValue clonedSerializedValue = InstantiationUtil.clone(serializedValue); + assertThat(clonedSerializedValue.getSerializedData(), nullValue()); + assertThat(clonedSerializedValue.getSerializedDataLength(), equalTo(0)); + assertThat( + clonedSerializedValue.deserializeValue(getClass().getClassLoader()), nullValue()); + assertThat(clonedSerializedValue, equalTo(serializedValue)); + assertThat(clonedSerializedValue.hashCode(), equalTo(serializedValue.hashCode())); + } + + @Test + public void testNotNullValues() throws Exception { + Set<Object> values = + Stream.of( + true, + (byte) 5, + (short) 6, + 5, + 5L, + 5.5F, + 6.5, + 'c', + "string", + Instant.now(), + BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.TEN), + BigDecimal.valueOf(Math.PI)) + .collect(Collectors.toSet()); + + Object previousValue = null; + AkkaRpcSerializedValue previousSerializedValue = null; + for (Object value : values) { + AkkaRpcSerializedValue serializedValue = AkkaRpcSerializedValue.valueOf(value); + assertThat(value.toString(), serializedValue.getSerializedData(), notNullValue()); + assertThat(value.toString(), serializedValue.getSerializedDataLength(), greaterThan(0)); + assertThat( + value.toString(), + serializedValue.deserializeValue(getClass().getClassLoader()), + equalTo(value)); + + AkkaRpcSerializedValue otherSerializedValue = AkkaRpcSerializedValue.valueOf(value); + assertThat(value.toString(), otherSerializedValue, equalTo(serializedValue)); + assertThat( + value.toString(), + otherSerializedValue.hashCode(), + equalTo(serializedValue.hashCode())); + + AkkaRpcSerializedValue clonedSerializedValue = InstantiationUtil.clone(serializedValue); + assertThat( + value.toString(), + clonedSerializedValue.getSerializedData(), + equalTo(serializedValue.getSerializedData())); + assertThat( + value.toString(), + clonedSerializedValue.deserializeValue(getClass().getClassLoader()), + equalTo(value)); + assertThat(value.toString(), clonedSerializedValue, equalTo(serializedValue)); + assertThat( + value.toString(), + clonedSerializedValue.hashCode(), + equalTo(serializedValue.hashCode())); + + if (previousValue != null && !previousValue.equals(value)) { + assertThat( + value.toString() + " " + previousValue.toString(), + serializedValue, + not(equalTo(previousSerializedValue))); + } + + previousValue = value; + previousSerializedValue = serializedValue; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java index a9940d8..3cde7f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/RemoteAkkaRpcActorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc.akka; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; @@ -97,4 +99,30 @@ public class RemoteAkkaRpcActorTest extends TestLogger { assertThat(value, is(nullValue())); } } + + @Test + public void canRespondWithSerializedValueRemotely() throws Exception { + try (final AkkaRpcActorTest.SerializedValueRespondingEndpoint endpoint = + new AkkaRpcActorTest.SerializedValueRespondingEndpoint(rpcService)) { + endpoint.start(); + + final AkkaRpcActorTest.SerializedValueRespondingGateway remoteGateway = + otherRpcService + .connect( + endpoint.getAddress(), + AkkaRpcActorTest.SerializedValueRespondingGateway.class) + .join(); + + assertThat( + remoteGateway.getSerializedValueSynchronously(), + equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE)); + + final CompletableFuture<SerializedValue<String>> responseFuture = + remoteGateway.getSerializedValue(); + + assertThat( + responseFuture.get(), + equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE)); + } + } }