This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.1
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/release-3.1 by this push:
     new 56786b1  [FLINK-25197] Fix serialization issue in 
RequestReplyFunctionBuilder
56786b1 is described below

commit 56786b1d16ae3300ae8e0da917531093adeb10cf
Author: Galen Warren <[email protected]>
AuthorDate: Sun Dec 26 11:24:23 2021 -0500

    [FLINK-25197] Fix serialization issue in RequestReplyFunctionBuilder
    
    This closes #282.
---
 .../flink/common/json/StateFunObjectMapper.java    |  16 +++-
 .../DefaultHttpRequestReplyClientFactory.java      |   3 +-
 .../httpfn/DefaultHttpRequestReplyClientSpec.java  |  13 +++
 .../DefaultHttpRequestReplyClientSpecTest.java     | 105 +++++++++++++++++++++
 statefun-flink/statefun-flink-datastream/pom.xml   |  16 +++-
 .../datastream/RequestReplyFunctionBuilder.java    |   6 +-
 .../RequestReplyFunctionBuilderTest.java           |  39 ++++++++
 7 files changed, 190 insertions(+), 8 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
 
b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
index 2d3e502..49b7289 100644
--- 
a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
+++ 
b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
@@ -20,11 +20,9 @@ package org.apache.flink.statefun.flink.common.json;
 
 import java.io.IOException;
 import java.time.Duration;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.*;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.flink.statefun.sdk.TypeName;
 import org.apache.flink.util.TimeUtils;
@@ -36,6 +34,7 @@ public final class StateFunObjectMapper {
         new 
ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
 
     final SimpleModule module = new SimpleModule("statefun");
+    module.addSerializer(Duration.class, new DurationJsonSerializer());
     module.addDeserializer(Duration.class, new DurationJsonDeserializer());
     module.addDeserializer(TypeName.class, new TypeNameJsonDeserializer());
 
@@ -51,6 +50,15 @@ public final class StateFunObjectMapper {
     }
   }
 
+  private static final class DurationJsonSerializer extends 
JsonSerializer<Duration> {
+    @Override
+    public void serialize(
+        Duration duration, JsonGenerator jsonGenerator, SerializerProvider 
serializerProvider)
+        throws IOException {
+      jsonGenerator.writeString(TimeUtils.formatWithHighestUnit(duration));
+    }
+  }
+
   private static final class TypeNameJsonDeserializer extends 
JsonDeserializer<TypeName> {
     @Override
     public TypeName deserialize(
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
index 5387c65..fafbfbe 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
@@ -108,8 +108,7 @@ public final class DefaultHttpRequestReplyClientFactory 
implements RequestReplyC
   private static DefaultHttpRequestReplyClientSpec parseTransportProperties(
       ObjectNode transportClientProperties) {
     try {
-      return OBJ_MAPPER.treeToValue(
-          transportClientProperties, DefaultHttpRequestReplyClientSpec.class);
+      return DefaultHttpRequestReplyClientSpec.fromJson(OBJ_MAPPER, 
transportClientProperties);
     } catch (Exception e) {
       throw new RuntimeException(
           "Unable to parse transport client properties when creating client: 
", e);
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
index 5aa3785..d01332f 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
@@ -22,6 +22,10 @@ import java.time.Duration;
 import java.util.Objects;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSetter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 public final class DefaultHttpRequestReplyClientSpec {
 
@@ -39,6 +43,15 @@ public final class DefaultHttpRequestReplyClientSpec {
     return timeouts;
   }
 
+  public ObjectNode toJson(ObjectMapper objectMapper) {
+    return objectMapper.valueToTree(this);
+  }
+
+  static DefaultHttpRequestReplyClientSpec fromJson(ObjectMapper objectMapper, 
JsonNode jsonNode)
+      throws JsonProcessingException {
+    return objectMapper.treeToValue(jsonNode, 
DefaultHttpRequestReplyClientSpec.class);
+  }
+
   private static void validateTimeouts(
       Duration callTimeout, Duration connectTimeout, Duration readTimeout, 
Duration writeTimeout) {
 
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
new file mode 100644
index 0000000..6120a27
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
@@ -0,0 +1,105 @@
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.junit.Assert.*;
+
+import java.time.Duration;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+public class DefaultHttpRequestReplyClientSpecTest {
+
+  @Test
+  public void jsonSerDe() throws JsonProcessingException {
+    final Duration callTimeout = Duration.ofDays(1L);
+    final Duration connectTimeout = Duration.ofNanos(2L);
+    final Duration readTimeout = Duration.ofSeconds(3L);
+    final Duration writeTimeout = Duration.ofMillis(4L);
+
+    final DefaultHttpRequestReplyClientSpec.Timeouts timeouts =
+        new DefaultHttpRequestReplyClientSpec.Timeouts();
+    timeouts.setCallTimeout(callTimeout);
+    timeouts.setConnectTimeout(connectTimeout);
+    timeouts.setReadTimeout(readTimeout);
+    timeouts.setWriteTimeout(writeTimeout);
+
+    final DefaultHttpRequestReplyClientSpec defaultHttpRequestReplyClientSpec =
+        new DefaultHttpRequestReplyClientSpec();
+    defaultHttpRequestReplyClientSpec.setTimeouts(timeouts);
+
+    final ObjectMapper objectMapper = StateFunObjectMapper.create();
+    final ObjectNode json = 
defaultHttpRequestReplyClientSpec.toJson(objectMapper);
+
+    final DefaultHttpRequestReplyClientSpec 
deserializedHttpRequestReplyClientSpec =
+        DefaultHttpRequestReplyClientSpec.fromJson(objectMapper, json);
+
+    assertThat(deserializedHttpRequestReplyClientSpec.getTimeouts(), 
equalTimeouts(timeouts));
+  }
+
+  private static 
TypeSafeDiagnosingMatcher<DefaultHttpRequestReplyClientSpec.Timeouts>
+      equalTimeouts(DefaultHttpRequestReplyClientSpec.Timeouts timeouts) {
+    return new TimeoutsEqualityMatcher(timeouts);
+  }
+
+  private static class TimeoutsEqualityMatcher
+      extends 
TypeSafeDiagnosingMatcher<DefaultHttpRequestReplyClientSpec.Timeouts> {
+    private final DefaultHttpRequestReplyClientSpec.Timeouts expected;
+
+    private TimeoutsEqualityMatcher(DefaultHttpRequestReplyClientSpec.Timeouts 
timeouts) {
+      this.expected = timeouts;
+    }
+
+    @Override
+    protected boolean matchesSafely(
+        DefaultHttpRequestReplyClientSpec.Timeouts timeouts, Description 
description) {
+      boolean matching = true;
+
+      if (!timeouts.getCallTimeout().equals(expected.getCallTimeout())) {
+        description
+            .appendText("expected ")
+            .appendValue(expected.getCallTimeout())
+            .appendText(" found ")
+            .appendValue(timeouts.getCallTimeout());
+        matching = false;
+      }
+
+      if (!timeouts.getReadTimeout().equals(expected.getReadTimeout())) {
+        description
+            .appendText("expected ")
+            .appendValue(expected.getReadTimeout())
+            .appendText(" found ")
+            .appendValue(timeouts.getReadTimeout());
+        matching = false;
+      }
+
+      if (!timeouts.getWriteTimeout().equals(expected.getWriteTimeout())) {
+        description
+            .appendText("expected ")
+            .appendValue(expected.getWriteTimeout())
+            .appendText(" found ")
+            .appendValue(timeouts.getWriteTimeout());
+        matching = false;
+      }
+
+      if (!timeouts.getConnectTimeout().equals(expected.getConnectTimeout())) {
+        description
+            .appendText("expected ")
+            .appendValue(expected.getConnectTimeout())
+            .appendText(" found ")
+            .appendValue(timeouts.getConnectTimeout());
+        matching = false;
+      }
+
+      return matching;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("Matches equality of Timeouts");
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-datastream/pom.xml 
b/statefun-flink/statefun-flink-datastream/pom.xml
index 5244784..2c2fc5a 100644
--- a/statefun-flink/statefun-flink-datastream/pom.xml
+++ b/statefun-flink/statefun-flink-datastream/pom.xml
@@ -53,7 +53,7 @@ under the License.
         </dependency>
   
         <!-- The following dependencies are here with scope provided, because: 
-             a) they are transitively required by the statefun-flink-* 
depencies
+             a) they are transitively required by the statefun-flink-* 
dependencies
              b) they are provided at runtime, by the embedding application. 
              
              Also note that org.slf4j:slf4j-api is excluded from all the 
artifacts, since maven 
@@ -77,6 +77,20 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
        </dependency>
+
+        <!-- Tests -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
         
     </dependencies>
 
diff --git 
a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
 
b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index 9f479eb..2875c06 100644
--- 
a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ 
b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
 import 
org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
 import org.apache.flink.statefun.flink.core.httpfn.TargetFunctions;
@@ -34,6 +35,9 @@ import org.apache.flink.statefun.sdk.FunctionType;
 /** A Builder for RequestReply remote function type. */
 public class RequestReplyFunctionBuilder {
 
+  /** The object mapper used to serialize the client spec object. */
+  private static final ObjectMapper CLIENT_SPEC_OBJ_MAPPER = 
StateFunObjectMapper.create();
+
   private final DefaultHttpRequestReplyClientSpec.Timeouts 
transportClientTimeoutsSpec =
       new DefaultHttpRequestReplyClientSpec.Timeouts();
 
@@ -130,6 +134,6 @@ public class RequestReplyFunctionBuilder {
         new DefaultHttpRequestReplyClientSpec();
     transportClientSpecPojo.setTimeouts(transportClientTimeoutsSpec);
 
-    return new ObjectMapper().valueToTree(transportClientSpecPojo);
+    return transportClientSpecPojo.toJson(CLIENT_SPEC_OBJ_MAPPER);
   }
 }
diff --git 
a/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java
 
b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java
new file mode 100644
index 0000000..63a9276
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java
@@ -0,0 +1,39 @@
+package org.apache.flink.statefun.flink.datastream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.junit.Test;
+
+public class RequestReplyFunctionBuilderTest {
+
+  @Test
+  public void clientSpecCanBeCreated() throws URISyntaxException {
+    final RequestReplyFunctionBuilder requestReplyFunctionBuilder =
+        RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
+            new FunctionType("foobar", "barfoo"), new URI("foobar"));
+
+    assertThat(requestReplyFunctionBuilder.spec(), notNullValue());
+  }
+}

Reply via email to