This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 4a920f913eb652183021be727203033529570d90 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu May 28 14:27:52 2020 +0800 [FLINK-17875] [core] Move default function config values to HttpFunctionSpec --- .../flink/core/httpfn/HttpFunctionSpec.java | 46 +++++++++++++++++++++- .../flink/core/jsonmodule/FunctionJsonEntity.java | 31 ++++++++------- 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java index 61945f4..0e03591 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java @@ -19,19 +19,24 @@ package org.apache.flink.statefun.flink.core.httpfn; import java.net.URI; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec; import org.apache.flink.statefun.sdk.FunctionType; public final class HttpFunctionSpec implements FunctionSpec { + + private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1); + private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; + private final FunctionType functionType; private final URI endpoint; private final List<String> states; private final Duration maxRequestDuration; private final int maxNumBatchRequests; - public HttpFunctionSpec( + private HttpFunctionSpec( FunctionType functionType, URI endpoint, List<String> states, @@ -44,6 +49,10 @@ public final class HttpFunctionSpec implements FunctionSpec { this.maxNumBatchRequests = maxNumBatchRequests; } + public static Builder builder(FunctionType functionType, URI endpoint) { + return new Builder(functionType, endpoint); + } + @Override public FunctionType functionType() { return functionType; @@ -74,4 +83,39 @@ public final class HttpFunctionSpec implements FunctionSpec { public int maxNumBatchRequests() { return maxNumBatchRequests; } + + public static final class Builder { + + private final FunctionType functionType; + private final URI endpoint; + + private final List<String> states = new ArrayList<>(); + private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT; + private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS; + + private Builder(FunctionType functionType, URI endpoint) { + this.functionType = Objects.requireNonNull(functionType); + this.endpoint = Objects.requireNonNull(endpoint); + } + + public Builder withState(String stateName) { + this.states.add(stateName); + return this; + } + + public Builder withMaxRequestDuration(Duration duration) { + this.maxRequestDuration = Objects.requireNonNull(duration); + return this; + } + + public Builder withMaxNumBatchRequests(int maxNumBatchRequests) { + this.maxNumBatchRequests = maxNumBatchRequests; + return this; + } + + public HttpFunctionSpec build() { + return new HttpFunctionSpec( + functionType, endpoint, states, maxRequestDuration, maxNumBatchRequests); + } + } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java index 25f1981..7dc5173 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java @@ -29,6 +29,8 @@ import java.time.Duration; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Function; import java.util.stream.Collector; @@ -48,9 +50,6 @@ import org.apache.flink.util.TimeUtils; final class FunctionJsonEntity implements JsonEntity { - private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1); - private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; - @Override public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) { final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode); @@ -83,12 +82,16 @@ final class FunctionJsonEntity implements JsonEntity { FunctionType functionType = functionType(functionNode); switch (kind) { case HTTP: - return new HttpFunctionSpec( - functionType, - functionUri(functionNode), - functionStates(functionNode), - maxRequestDuration(functionNode), - maxNumBatchRequests(functionNode)); + final HttpFunctionSpec.Builder specBuilder = + HttpFunctionSpec.builder(functionType, functionUri(functionNode)); + + for (String state : functionStates(functionNode)) { + specBuilder.withState(state); + } + optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests); + optionalMaxRequestDuration(functionNode).ifPresent(specBuilder::withMaxRequestDuration); + + return specBuilder.build(); case GRPC: return new GrpcFunctionSpec(functionType, functionAddress(functionNode)); default: @@ -100,16 +103,14 @@ final class FunctionJsonEntity implements JsonEntity { return Selectors.textListAt(functionNode, Pointers.Functions.FUNCTION_STATES); } - private static int maxNumBatchRequests(JsonNode functionNode) { + private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) { return Selectors.optionalIntegerAt( - functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS) - .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS); + functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS); } - private static Duration maxRequestDuration(JsonNode functionNode) { + private static Optional<Duration> optionalMaxRequestDuration(JsonNode functionNode) { return Selectors.optionalTextAt(functionNode, Pointers.Functions.FUNCTION_TIMEOUT) - .map(TimeUtils::parseDuration) - .orElse(DEFAULT_HTTP_TIMEOUT); + .map(TimeUtils::parseDuration); } private static FunctionType functionType(JsonNode functionNode) {
