This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 01dfcf4e00f9621b68d6e9466b5f19af2b60d077
Author: Igal Shilman <igalshil...@gmail.com>
AuthorDate: Thu Feb 13 15:24:40 2020 +0100

    [FLINK-15956] Add "timeout" to http function spec
    
    This commit adds a "timeout" property to a function spec module/
    A timeout spans the entire HTTP request, including any retries (spaced
    apart by backoffs).
---
 .../statefun/flink/core/httpfn/HttpFunctionSpec.java      | 10 +++++++++-
 .../flink/statefun/flink/core/jsonmodule/JsonModule.java  | 15 ++++++++++++++-
 .../flink/statefun/flink/core/jsonmodule/Pointers.java    |  2 ++
 3 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 45eef71..1b016ac 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -18,6 +18,7 @@
 package org.apache.flink.statefun.flink.core.httpfn;
 
 import java.net.URI;
+import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec;
@@ -27,11 +28,14 @@ public final class HttpFunctionSpec implements FunctionSpec 
{
   private final FunctionType functionType;
   private final URI endpoint;
   private final List<String> states;
+  private final Duration maxRequestDuration;
 
-  public HttpFunctionSpec(FunctionType functionType, URI endpoint, 
List<String> states) {
+  public HttpFunctionSpec(
+      FunctionType functionType, URI endpoint, List<String> states, Duration 
maxRequestDuration) {
     this.functionType = Objects.requireNonNull(functionType);
     this.endpoint = Objects.requireNonNull(endpoint);
     this.states = Objects.requireNonNull(states);
+    this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration);
   }
 
   @Override
@@ -51,4 +55,8 @@ public final class HttpFunctionSpec implements FunctionSpec {
   public List<String> states() {
     return states;
   }
+
+  public Duration maxRequestDuration() {
+    return maxRequestDuration;
+  }
 }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 7bc2d1e..dd43367 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
+import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -48,6 +49,7 @@ import 
org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
+import org.apache.flink.statefun.flink.core.jsonmodule.Pointers.Functions;
 import org.apache.flink.statefun.flink.core.protorouter.ProtobufRouter;
 import org.apache.flink.statefun.flink.io.spi.JsonIngressSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
@@ -56,8 +58,10 @@ import 
org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.io.Router;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+import org.apache.flink.util.TimeUtils;
 
 final class JsonModule implements StatefulFunctionModule {
+  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
   private final JsonNode spec;
   private final URL moduleUrl;
 
@@ -205,7 +209,10 @@ final class JsonModule implements StatefulFunctionModule {
     switch (kind) {
       case HTTP:
         return new HttpFunctionSpec(
-            functionType, functionUri(functionNode), 
functionStates(functionNode));
+            functionType,
+            functionUri(functionNode),
+            functionStates(functionNode),
+            maxRequestDuration(functionNode));
       case GRPC:
         return new GrpcFunctionSpec(functionType, 
functionAddress(functionNode));
       default:
@@ -213,6 +220,12 @@ final class JsonModule implements StatefulFunctionModule {
     }
   }
 
+  private static Duration maxRequestDuration(JsonNode functionNode) {
+    return Selectors.optionalTextAt(functionNode, Functions.FUNCTION_TIMEOUT)
+        .map(TimeUtils::parseDuration)
+        .orElse(DEFAULT_HTTP_TIMEOUT);
+  }
+
   private static List<String> functionStates(JsonNode functionNode) {
     return Selectors.textListAt(functionNode, 
Pointers.Functions.FUNCTION_STATES);
   }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
index fb25ac1..6d01344 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/Pointers.java
@@ -46,6 +46,8 @@ public final class Pointers {
         JsonPointer.compile("/function/spec/endpoint");
     public static final JsonPointer FUNCTION_PORT = 
JsonPointer.compile("/function/spec/port");
     public static final JsonPointer FUNCTION_STATES = 
JsonPointer.compile("/function/spec/states");
+    public static final JsonPointer FUNCTION_TIMEOUT =
+        JsonPointer.compile("/function/spec/timeout");
   }
 
   // 
-------------------------------------------------------------------------------------

Reply via email to