This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-3.1
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
The following commit(s) were added to refs/heads/release-3.1 by this push:
new 56786b1 [FLINK-25197] Fix serialization issue in
RequestReplyFunctionBuilder
56786b1 is described below
commit 56786b1d16ae3300ae8e0da917531093adeb10cf
Author: Galen Warren <[email protected]>
AuthorDate: Sun Dec 26 11:24:23 2021 -0500
[FLINK-25197] Fix serialization issue in RequestReplyFunctionBuilder
This closes #282.
---
.../flink/common/json/StateFunObjectMapper.java | 16 +++-
.../DefaultHttpRequestReplyClientFactory.java | 3 +-
.../httpfn/DefaultHttpRequestReplyClientSpec.java | 13 +++
.../DefaultHttpRequestReplyClientSpecTest.java | 105 +++++++++++++++++++++
statefun-flink/statefun-flink-datastream/pom.xml | 16 +++-
.../datastream/RequestReplyFunctionBuilder.java | 6 +-
.../RequestReplyFunctionBuilderTest.java | 39 ++++++++
7 files changed, 190 insertions(+), 8 deletions(-)
diff --git
a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
index 2d3e502..49b7289 100644
---
a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
+++
b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
@@ -20,11 +20,9 @@ package org.apache.flink.statefun.flink.common.json;
import java.io.IOException;
import java.time.Duration;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.*;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.flink.statefun.sdk.TypeName;
import org.apache.flink.util.TimeUtils;
@@ -36,6 +34,7 @@ public final class StateFunObjectMapper {
new
ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
final SimpleModule module = new SimpleModule("statefun");
+ module.addSerializer(Duration.class, new DurationJsonSerializer());
module.addDeserializer(Duration.class, new DurationJsonDeserializer());
module.addDeserializer(TypeName.class, new TypeNameJsonDeserializer());
@@ -51,6 +50,15 @@ public final class StateFunObjectMapper {
}
}
+ private static final class DurationJsonSerializer extends
JsonSerializer<Duration> {
+ @Override
+ public void serialize(
+ Duration duration, JsonGenerator jsonGenerator, SerializerProvider
serializerProvider)
+ throws IOException {
+ jsonGenerator.writeString(TimeUtils.formatWithHighestUnit(duration));
+ }
+ }
+
private static final class TypeNameJsonDeserializer extends
JsonDeserializer<TypeName> {
@Override
public TypeName deserialize(
diff --git
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
index 5387c65..fafbfbe 100644
---
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
+++
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
@@ -108,8 +108,7 @@ public final class DefaultHttpRequestReplyClientFactory
implements RequestReplyC
private static DefaultHttpRequestReplyClientSpec parseTransportProperties(
ObjectNode transportClientProperties) {
try {
- return OBJ_MAPPER.treeToValue(
- transportClientProperties, DefaultHttpRequestReplyClientSpec.class);
+ return DefaultHttpRequestReplyClientSpec.fromJson(OBJ_MAPPER,
transportClientProperties);
} catch (Exception e) {
throw new RuntimeException(
"Unable to parse transport client properties when creating client:
", e);
diff --git
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
index 5aa3785..d01332f 100644
---
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
+++
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
@@ -22,6 +22,10 @@ import java.time.Duration;
import java.util.Objects;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSetter;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
public final class DefaultHttpRequestReplyClientSpec {
@@ -39,6 +43,15 @@ public final class DefaultHttpRequestReplyClientSpec {
return timeouts;
}
+ public ObjectNode toJson(ObjectMapper objectMapper) {
+ return objectMapper.valueToTree(this);
+ }
+
+ static DefaultHttpRequestReplyClientSpec fromJson(ObjectMapper objectMapper,
JsonNode jsonNode)
+ throws JsonProcessingException {
+ return objectMapper.treeToValue(jsonNode,
DefaultHttpRequestReplyClientSpec.class);
+ }
+
private static void validateTimeouts(
Duration callTimeout, Duration connectTimeout, Duration readTimeout,
Duration writeTimeout) {
diff --git
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
new file mode 100644
index 0000000..6120a27
--- /dev/null
+++
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
@@ -0,0 +1,105 @@
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.junit.Assert.*;
+
+import java.time.Duration;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+public class DefaultHttpRequestReplyClientSpecTest {
+
+ @Test
+ public void jsonSerDe() throws JsonProcessingException {
+ final Duration callTimeout = Duration.ofDays(1L);
+ final Duration connectTimeout = Duration.ofNanos(2L);
+ final Duration readTimeout = Duration.ofSeconds(3L);
+ final Duration writeTimeout = Duration.ofMillis(4L);
+
+ final DefaultHttpRequestReplyClientSpec.Timeouts timeouts =
+ new DefaultHttpRequestReplyClientSpec.Timeouts();
+ timeouts.setCallTimeout(callTimeout);
+ timeouts.setConnectTimeout(connectTimeout);
+ timeouts.setReadTimeout(readTimeout);
+ timeouts.setWriteTimeout(writeTimeout);
+
+ final DefaultHttpRequestReplyClientSpec defaultHttpRequestReplyClientSpec =
+ new DefaultHttpRequestReplyClientSpec();
+ defaultHttpRequestReplyClientSpec.setTimeouts(timeouts);
+
+ final ObjectMapper objectMapper = StateFunObjectMapper.create();
+ final ObjectNode json =
defaultHttpRequestReplyClientSpec.toJson(objectMapper);
+
+ final DefaultHttpRequestReplyClientSpec
deserializedHttpRequestReplyClientSpec =
+ DefaultHttpRequestReplyClientSpec.fromJson(objectMapper, json);
+
+ assertThat(deserializedHttpRequestReplyClientSpec.getTimeouts(),
equalTimeouts(timeouts));
+ }
+
+ private static
TypeSafeDiagnosingMatcher<DefaultHttpRequestReplyClientSpec.Timeouts>
+ equalTimeouts(DefaultHttpRequestReplyClientSpec.Timeouts timeouts) {
+ return new TimeoutsEqualityMatcher(timeouts);
+ }
+
+ private static class TimeoutsEqualityMatcher
+ extends
TypeSafeDiagnosingMatcher<DefaultHttpRequestReplyClientSpec.Timeouts> {
+ private final DefaultHttpRequestReplyClientSpec.Timeouts expected;
+
+ private TimeoutsEqualityMatcher(DefaultHttpRequestReplyClientSpec.Timeouts
timeouts) {
+ this.expected = timeouts;
+ }
+
+ @Override
+ protected boolean matchesSafely(
+ DefaultHttpRequestReplyClientSpec.Timeouts timeouts, Description
description) {
+ boolean matching = true;
+
+ if (!timeouts.getCallTimeout().equals(expected.getCallTimeout())) {
+ description
+ .appendText("expected ")
+ .appendValue(expected.getCallTimeout())
+ .appendText(" found ")
+ .appendValue(timeouts.getCallTimeout());
+ matching = false;
+ }
+
+ if (!timeouts.getReadTimeout().equals(expected.getReadTimeout())) {
+ description
+ .appendText("expected ")
+ .appendValue(expected.getReadTimeout())
+ .appendText(" found ")
+ .appendValue(timeouts.getReadTimeout());
+ matching = false;
+ }
+
+ if (!timeouts.getWriteTimeout().equals(expected.getWriteTimeout())) {
+ description
+ .appendText("expected ")
+ .appendValue(expected.getWriteTimeout())
+ .appendText(" found ")
+ .appendValue(timeouts.getWriteTimeout());
+ matching = false;
+ }
+
+ if (!timeouts.getConnectTimeout().equals(expected.getConnectTimeout())) {
+ description
+ .appendText("expected ")
+ .appendValue(expected.getConnectTimeout())
+ .appendText(" found ")
+ .appendValue(timeouts.getConnectTimeout());
+ matching = false;
+ }
+
+ return matching;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("Matches equality of Timeouts");
+ }
+ }
+}
diff --git a/statefun-flink/statefun-flink-datastream/pom.xml
b/statefun-flink/statefun-flink-datastream/pom.xml
index 5244784..2c2fc5a 100644
--- a/statefun-flink/statefun-flink-datastream/pom.xml
+++ b/statefun-flink/statefun-flink-datastream/pom.xml
@@ -53,7 +53,7 @@ under the License.
</dependency>
<!-- The following dependencies are here with scope provided, because:
- a) they are transitively required by the statefun-flink-*
depencies
+ a) they are transitively required by the statefun-flink-*
dependencies
b) they are provided at runtime, by the embedding application.
Also note that org.slf4j:slf4j-api is excluded from all the
artifacts, since maven
@@ -77,6 +77,20 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+
+ <!-- Tests -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git
a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index 9f479eb..2875c06 100644
---
a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++
b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import org.apache.flink.annotation.Internal;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
import
org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.httpfn.TargetFunctions;
@@ -34,6 +35,9 @@ import org.apache.flink.statefun.sdk.FunctionType;
/** A Builder for RequestReply remote function type. */
public class RequestReplyFunctionBuilder {
+ /** The object mapper used to serialize the client spec object. */
+ private static final ObjectMapper CLIENT_SPEC_OBJ_MAPPER =
StateFunObjectMapper.create();
+
private final DefaultHttpRequestReplyClientSpec.Timeouts
transportClientTimeoutsSpec =
new DefaultHttpRequestReplyClientSpec.Timeouts();
@@ -130,6 +134,6 @@ public class RequestReplyFunctionBuilder {
new DefaultHttpRequestReplyClientSpec();
transportClientSpecPojo.setTimeouts(transportClientTimeoutsSpec);
- return new ObjectMapper().valueToTree(transportClientSpecPojo);
+ return transportClientSpecPojo.toJson(CLIENT_SPEC_OBJ_MAPPER);
}
}
diff --git
a/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java
b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java
new file mode 100644
index 0000000..63a9276
--- /dev/null
+++
b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java
@@ -0,0 +1,39 @@
+package org.apache.flink.statefun.flink.datastream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.junit.Test;
+
+public class RequestReplyFunctionBuilderTest {
+
+ @Test
+ public void clientSpecCanBeCreated() throws URISyntaxException {
+ final RequestReplyFunctionBuilder requestReplyFunctionBuilder =
+ RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
+ new FunctionType("foobar", "barfoo"), new URI("foobar"));
+
+ assertThat(requestReplyFunctionBuilder.spec(), notNullValue());
+ }
+}