This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 51d3130a172a40ed1dfda9269292b08054534c64 Author: Galen Warren <[email protected]> AuthorDate: Sun Dec 26 11:24:23 2021 -0500 [FLINK-25197] Fix serialization issue in RequestReplyFunctionBuilder This closes #282. --- .../flink/common/json/StateFunObjectMapper.java | 16 +++- .../DefaultHttpRequestReplyClientFactory.java | 3 +- .../httpfn/DefaultHttpRequestReplyClientSpec.java | 13 +++ .../DefaultHttpRequestReplyClientSpecTest.java | 105 +++++++++++++++++++++ statefun-flink/statefun-flink-datastream/pom.xml | 16 +++- .../datastream/RequestReplyFunctionBuilder.java | 6 +- .../RequestReplyFunctionBuilderTest.java | 39 ++++++++ 7 files changed, 190 insertions(+), 8 deletions(-) diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java index 2d3e502..49b7289 100644 --- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java +++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java @@ -20,11 +20,9 @@ package org.apache.flink.statefun.flink.common.json; import java.io.IOException; import java.time.Duration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.*; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.flink.statefun.sdk.TypeName; import org.apache.flink.util.TimeUtils; @@ -36,6 +34,7 @@ public final class StateFunObjectMapper { new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); final SimpleModule module = new SimpleModule("statefun"); + module.addSerializer(Duration.class, new DurationJsonSerializer()); module.addDeserializer(Duration.class, new DurationJsonDeserializer()); module.addDeserializer(TypeName.class, new TypeNameJsonDeserializer()); @@ -51,6 +50,15 @@ public final class StateFunObjectMapper { } } + private static final class DurationJsonSerializer extends JsonSerializer<Duration> { + @Override + public void serialize( + Duration duration, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException { + jsonGenerator.writeString(TimeUtils.formatWithHighestUnit(duration)); + } + } + private static final class TypeNameJsonDeserializer extends JsonDeserializer<TypeName> { @Override public TypeName deserialize( diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java index 5387c65..fafbfbe 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java @@ -108,8 +108,7 @@ public final class DefaultHttpRequestReplyClientFactory implements RequestReplyC private static DefaultHttpRequestReplyClientSpec parseTransportProperties( ObjectNode transportClientProperties) { try { - return OBJ_MAPPER.treeToValue( - transportClientProperties, DefaultHttpRequestReplyClientSpec.class); + return DefaultHttpRequestReplyClientSpec.fromJson(OBJ_MAPPER, transportClientProperties); } catch (Exception e) { throw new RuntimeException( "Unable to parse transport client properties when creating client: ", e); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java index 5aa3785..d01332f 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java @@ -22,6 +22,10 @@ import java.time.Duration; import java.util.Objects; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSetter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; public final class DefaultHttpRequestReplyClientSpec { @@ -39,6 +43,15 @@ public final class DefaultHttpRequestReplyClientSpec { return timeouts; } + public ObjectNode toJson(ObjectMapper objectMapper) { + return objectMapper.valueToTree(this); + } + + static DefaultHttpRequestReplyClientSpec fromJson(ObjectMapper objectMapper, JsonNode jsonNode) + throws JsonProcessingException { + return objectMapper.treeToValue(jsonNode, DefaultHttpRequestReplyClientSpec.class); + } + private static void validateTimeouts( Duration callTimeout, Duration connectTimeout, Duration readTimeout, Duration writeTimeout) { diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java new file mode 100644 index 0000000..6120a27 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java @@ -0,0 +1,105 @@ +package org.apache.flink.statefun.flink.core.httpfn; + +import static org.junit.Assert.*; + +import java.time.Duration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Test; + +public class DefaultHttpRequestReplyClientSpecTest { + + @Test + public void jsonSerDe() throws JsonProcessingException { + final Duration callTimeout = Duration.ofDays(1L); + final Duration connectTimeout = Duration.ofNanos(2L); + final Duration readTimeout = Duration.ofSeconds(3L); + final Duration writeTimeout = Duration.ofMillis(4L); + + final DefaultHttpRequestReplyClientSpec.Timeouts timeouts = + new DefaultHttpRequestReplyClientSpec.Timeouts(); + timeouts.setCallTimeout(callTimeout); + timeouts.setConnectTimeout(connectTimeout); + timeouts.setReadTimeout(readTimeout); + timeouts.setWriteTimeout(writeTimeout); + + final DefaultHttpRequestReplyClientSpec defaultHttpRequestReplyClientSpec = + new DefaultHttpRequestReplyClientSpec(); + defaultHttpRequestReplyClientSpec.setTimeouts(timeouts); + + final ObjectMapper objectMapper = StateFunObjectMapper.create(); + final ObjectNode json = defaultHttpRequestReplyClientSpec.toJson(objectMapper); + + final DefaultHttpRequestReplyClientSpec deserializedHttpRequestReplyClientSpec = + DefaultHttpRequestReplyClientSpec.fromJson(objectMapper, json); + + assertThat(deserializedHttpRequestReplyClientSpec.getTimeouts(), equalTimeouts(timeouts)); + } + + private static TypeSafeDiagnosingMatcher<DefaultHttpRequestReplyClientSpec.Timeouts> + equalTimeouts(DefaultHttpRequestReplyClientSpec.Timeouts timeouts) { + return new TimeoutsEqualityMatcher(timeouts); + } + + private static class TimeoutsEqualityMatcher + extends TypeSafeDiagnosingMatcher<DefaultHttpRequestReplyClientSpec.Timeouts> { + private final DefaultHttpRequestReplyClientSpec.Timeouts expected; + + private TimeoutsEqualityMatcher(DefaultHttpRequestReplyClientSpec.Timeouts timeouts) { + this.expected = timeouts; + } + + @Override + protected boolean matchesSafely( + DefaultHttpRequestReplyClientSpec.Timeouts timeouts, Description description) { + boolean matching = true; + + if (!timeouts.getCallTimeout().equals(expected.getCallTimeout())) { + description + .appendText("expected ") + .appendValue(expected.getCallTimeout()) + .appendText(" found ") + .appendValue(timeouts.getCallTimeout()); + matching = false; + } + + if (!timeouts.getReadTimeout().equals(expected.getReadTimeout())) { + description + .appendText("expected ") + .appendValue(expected.getReadTimeout()) + .appendText(" found ") + .appendValue(timeouts.getReadTimeout()); + matching = false; + } + + if (!timeouts.getWriteTimeout().equals(expected.getWriteTimeout())) { + description + .appendText("expected ") + .appendValue(expected.getWriteTimeout()) + .appendText(" found ") + .appendValue(timeouts.getWriteTimeout()); + matching = false; + } + + if (!timeouts.getConnectTimeout().equals(expected.getConnectTimeout())) { + description + .appendText("expected ") + .appendValue(expected.getConnectTimeout()) + .appendText(" found ") + .appendValue(timeouts.getConnectTimeout()); + matching = false; + } + + return matching; + } + + @Override + public void describeTo(Description description) { + description.appendText("Matches equality of Timeouts"); + } + } +} diff --git a/statefun-flink/statefun-flink-datastream/pom.xml b/statefun-flink/statefun-flink-datastream/pom.xml index 8f5138c..2679286 100644 --- a/statefun-flink/statefun-flink-datastream/pom.xml +++ b/statefun-flink/statefun-flink-datastream/pom.xml @@ -53,7 +53,7 @@ under the License. </dependency> <!-- The following dependencies are here with scope provided, because: - a) they are transitively required by the statefun-flink-* depencies + a) they are transitively required by the statefun-flink-* dependencies b) they are provided at runtime, by the embedding application. Also note that org.slf4j:slf4j-api is excluded from all the artifacts, since maven @@ -77,6 +77,20 @@ under the License. <version>${flink.version}</version> <scope>provided</scope> </dependency> + + <!-- Tests --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java index 9f479eb..2875c06 100644 --- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java +++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java @@ -23,6 +23,7 @@ import java.time.Duration; import org.apache.flink.annotation.Internal; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper; import org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec; import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec; import org.apache.flink.statefun.flink.core.httpfn.TargetFunctions; @@ -34,6 +35,9 @@ import org.apache.flink.statefun.sdk.FunctionType; /** A Builder for RequestReply remote function type. */ public class RequestReplyFunctionBuilder { + /** The object mapper used to serialize the client spec object. */ + private static final ObjectMapper CLIENT_SPEC_OBJ_MAPPER = StateFunObjectMapper.create(); + private final DefaultHttpRequestReplyClientSpec.Timeouts transportClientTimeoutsSpec = new DefaultHttpRequestReplyClientSpec.Timeouts(); @@ -130,6 +134,6 @@ public class RequestReplyFunctionBuilder { new DefaultHttpRequestReplyClientSpec(); transportClientSpecPojo.setTimeouts(transportClientTimeoutsSpec); - return new ObjectMapper().valueToTree(transportClientSpecPojo); + return transportClientSpecPojo.toJson(CLIENT_SPEC_OBJ_MAPPER); } } diff --git a/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java new file mode 100644 index 0000000..63a9276 --- /dev/null +++ b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java @@ -0,0 +1,39 @@ +package org.apache.flink.statefun.flink.datastream; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.notNullValue; + +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.flink.statefun.sdk.FunctionType; +import org.junit.Test; + +public class RequestReplyFunctionBuilderTest { + + @Test + public void clientSpecCanBeCreated() throws URISyntaxException { + final RequestReplyFunctionBuilder requestReplyFunctionBuilder = + RequestReplyFunctionBuilder.requestReplyFunctionBuilder( + new FunctionType("foobar", "barfoo"), new URI("foobar")); + + assertThat(requestReplyFunctionBuilder.spec(), notNullValue()); + } +}
