This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 94def7745a6ee934150db31e19bf7e7665713ba2 Author: Igal Shilman <igalshil...@gmail.com> AuthorDate: Tue Feb 11 16:07:18 2020 +0100 [FLINK-15956] Add an initial HttpFunction implemention --- .../statefun/flink/core/common/PolyglotUtil.java | 53 ++++++ .../statefun/flink/core/httpfn/HttpFunction.java | 192 ++++++++++----------- .../flink/core/httpfn/HttpFunctionProvider.java | 12 +- .../statefun/flink/core/httpfn/OkHttpUtils.java | 51 +++++- .../src/test/resources/bar-module/module.yaml | 2 + 5 files changed, 198 insertions(+), 112 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java new file mode 100644 index 0000000..c901e77 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.common; + +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import java.io.IOException; +import java.io.InputStream; +import javax.annotation.Nonnull; +import org.apache.flink.statefun.flink.core.polyglot.generated.Address; +import org.apache.flink.statefun.sdk.FunctionType; + +public final class PolyglotUtil { + private PolyglotUtil() {} + + public static <M extends Message> M parseProtobufOrThrow(Parser<M> parser, InputStream input) { + try { + return parser.parseFrom(input); + } catch (IOException e) { + throw new IllegalStateException("Unable to parse a Protobuf message", e); + } + } + + public static Address sdkAddressToPolyglotAddress( + @Nonnull org.apache.flink.statefun.sdk.Address sdkAddress) { + return Address.newBuilder() + .setNamespace(sdkAddress.type().namespace()) + .setType(sdkAddress.type().name()) + .setId(sdkAddress.id()) + .build(); + } + + public static org.apache.flink.statefun.sdk.Address polyglotAddressToSdkAddress(Address address) { + return new org.apache.flink.statefun.sdk.Address( + new FunctionType(address.getNamespace(), address.getType()), address.getId()); + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java index 51f0405..506081a 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunction.java @@ -18,42 +18,38 @@ package org.apache.flink.statefun.flink.core.httpfn; -import org.apache.flink.statefun.flink.core.polyglot.generated.Address; +import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.parseProtobufOrThrow; +import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.polyglotAddressToSdkAddress; +import static org.apache.flink.statefun.flink.core.common.PolyglotUtil.sdkAddressToPolyglotAddress; +import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUtils.MEDIA_TYPE_BINARY; +import static org.apache.flink.statefun.flink.core.httpfn.OkHttpUtils.responseBody; +import static org.apache.flink.util.Preconditions.checkState; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse; import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction; import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation.Builder; import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest; +import org.apache.flink.statefun.sdk.Address; import org.apache.flink.statefun.sdk.AsyncOperationResult; import org.apache.flink.statefun.sdk.Context; -import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.annotations.Persisted; import org.apache.flink.statefun.sdk.state.PersistedTable; import org.apache.flink.statefun.sdk.state.PersistedValue; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import okhttp3.HttpUrl; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Objects; - -import javax.annotation.Nonnull; - -import static org.apache.flink.util.Preconditions.checkState; - final class HttpFunction implements StatefulFunction { - private static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream"); - private final HttpFunctionSpec functionSpec; private final OkHttpClient client; private final HttpUrl url; @@ -78,17 +74,18 @@ final class HttpFunction implements StatefulFunction { @Override public void invoke(Context context, Object input) { - if (input instanceof AsyncOperationResult) { - @SuppressWarnings("unchecked") - AsyncOperationResult<Any, Response> result = (AsyncOperationResult<Any, Response>) input; - onAsyncResult(context, result); - } else { + if (!(input instanceof AsyncOperationResult)) { onRequest(context, (Any) input); + return; } + @SuppressWarnings("unchecked") + AsyncOperationResult<ToFunction, Response> result = + (AsyncOperationResult<ToFunction, Response>) input; + onAsyncResult(context, result); } - private void onRequest(Context context, Any input) { - Invocation.Builder invocationBuilder = invocationBuilder(context, input); + private void onRequest(Context context, Any message) { + Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message); if (hasInFlightRpc.getOrDefault(Boolean.FALSE)) { addToOrCreateBatch(invocationBuilder); return; @@ -97,14 +94,23 @@ final class HttpFunction implements StatefulFunction { sendToFunction(context, invocationBuilder); } - private void onAsyncResult(Context context, AsyncOperationResult<Any, Response> asyncResult) { + private void addToOrCreateBatch(Invocation.Builder invocationBuilder) { + InvocationBatchRequest current = batch.get(); + InvocationBatchRequest.Builder next = + (current == null) ? InvocationBatchRequest.newBuilder() : current.toBuilder(); + next.addInvocations(invocationBuilder); + batch.set(next.build()); + } + + private void onAsyncResult( + Context context, AsyncOperationResult<ToFunction, Response> asyncResult) { if (asyncResult.unknown()) { - Any originalMessage = asyncResult.metadata(); - Invocation.Builder invocationBuilder = invocationBuilder(context, originalMessage); - sendToFunction(context, invocationBuilder); + ToFunction batch = asyncResult.metadata(); + sendToFunction(context, batch); return; } - InvocationResponse invocationResult = unpackInvocationResultOrThrow(asyncResult); + InvocationResponse invocationResult = + unpackInvocationResultOrThrow(context.self(), asyncResult); handleInvocationResponse(context, invocationResult); InvocationBatchRequest nextBatch = batch.get(); if (nextBatch == null) { @@ -123,6 +129,27 @@ final class HttpFunction implements StatefulFunction { context.send(to, message); } + handleStateMutations(invocationResult); + } + + // -------------------------------------------------------------------------------- + // State Management + // -------------------------------------------------------------------------------- + + private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) { + for (String stateName : functionSpec.states()) { + ToFunction.PersistedValue.Builder valueBuilder = + ToFunction.PersistedValue.newBuilder().setStateName(stateName); + + byte[] stateValue = managedStates.get(stateName); + if (stateValue != null) { + valueBuilder.setStateValue(ByteString.copyFrom(stateValue)); + } + batchBuilder.addState(valueBuilder); + } + } + + private void handleStateMutations(InvocationResponse invocationResult) { for (FromFunction.PersistedValueMutation mutate : invocationResult.getStateMutationsList()) { final String stateName = mutate.getStateName(); switch (mutate.getMutationType()) { @@ -140,104 +167,65 @@ final class HttpFunction implements StatefulFunction { } } - private void addToOrCreateBatch(Builder invocationBuilder) { - batch.updateAndGet( - existingBatch -> { - final InvocationBatchRequest.Builder builder = - existingBatch != null - ? existingBatch.toBuilder() - : InvocationBatchRequest.newBuilder(); - return builder.addInvocations(invocationBuilder).build(); - }); - } - // -------------------------------------------------------------------------------- - // Utilities + // Send Message to Remote Function // -------------------------------------------------------------------------------- - private static Builder invocationBuilder(Context context, Any input) { + + /** + * Returns an {@link Invocation.Builder} set with the input {@code message} and the caller + * information (is present). + */ + private static Invocation.Builder singeInvocationBuilder(Context context, Any message) { Invocation.Builder invocationBuilder = Invocation.newBuilder(); if (context.caller() != null) { invocationBuilder.setCaller(sdkAddressToPolyglotAddress(context.caller())); } - invocationBuilder.setArgument(input); + invocationBuilder.setArgument(message); return invocationBuilder; } - private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) { - for (String stateName : functionSpec.states()) { - ToFunction.PersistedValue.Builder valueBuilder = - ToFunction.PersistedValue.newBuilder().setStateName(stateName); - - byte[] stateValue = managedStates.get(stateName); - if (stateValue != null) { - valueBuilder.setStateValue(ByteString.copyFrom(stateValue)); - } - batchBuilder.addState(valueBuilder); - } - } - + /** + * Sends a {@link InvocationBatchRequest} to the remote function consisting out of a single + * invocation represented by {@code invocationBuilder}. + */ private void sendToFunction(Context context, Invocation.Builder invocationBuilder) { InvocationBatchRequest.Builder batchBuilder = InvocationBatchRequest.newBuilder(); batchBuilder.addInvocations(invocationBuilder); sendToFunction(context, batchBuilder); } + /** Sends a {@link InvocationBatchRequest} to the remote function. */ private void sendToFunction(Context context, InvocationBatchRequest.Builder batchBuilder) { batchBuilder.setTarget(sdkAddressToPolyglotAddress(context.self())); addStates(batchBuilder); + ToFunction toFunction = ToFunction.newBuilder().setInvocation(batchBuilder).build(); + sendToFunction(context, toFunction); + } + + private void sendToFunction(Context context, ToFunction toFunction) { Request request = new Request.Builder() .url(url) - .post( - RequestBody.create( - MEDIA_TYPE_BINARY, - ToFunction.newBuilder().setInvocation(batchBuilder).build().toByteArray())) + .post(RequestBody.create(MEDIA_TYPE_BINARY, toFunction.toByteArray())) .build(); - OkHttpUtils.call(client, request); + + CompletableFuture<Response> responseFuture = OkHttpUtils.call(client, request); + context.registerAsyncOperation(toFunction, responseFuture); } - private static InvocationResponse unpackInvocationResultOrThrow( - AsyncOperationResult<Any, Response> asyncResult) { - checkState(asyncResult.failure() || asyncResult.successful()); + private InvocationResponse unpackInvocationResultOrThrow( + Address self, AsyncOperationResult<?, Response> asyncResult) { + checkState(!asyncResult.unknown()); if (asyncResult.failure()) { - throw new IllegalStateException("", asyncResult.throwable()); + throw new IllegalStateException( + "Failure forwarding a message to a remote function " + self, asyncResult.throwable()); } - Response httpResponse = asyncResult.value(); - checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code()); - checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)"); - checkState( - Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY), - "Wrong HTTP content-type %s", - httpResponse.body().contentType()); - InputStream httpResponseBody = httpResponse.body().byteStream(); - FromFunction fromFunction = parseProtobufOrThrow(httpResponseBody); + InputStream httpResponseBody = responseBody(asyncResult.value()); + FromFunction fromFunction = parseProtobufOrThrow(FromFunction.parser(), httpResponseBody); checkState( fromFunction.hasInvocationResult(), "The received HTTP payload does not contain an InvocationResult, but rather [%s]", fromFunction); return fromFunction.getInvocationResult(); } - - private static FromFunction parseProtobufOrThrow(InputStream input) { - try { - return FromFunction.parseFrom(input); - } catch (IOException e) { - throw new IllegalStateException("Unable to parse a Protobuf message", e); - } - } - - private static Address sdkAddressToPolyglotAddress( - @Nonnull org.apache.flink.statefun.sdk.Address sdkAddress) { - return Address.newBuilder() - .setNamespace(sdkAddress.type().namespace()) - .setType(sdkAddress.type().name()) - .setId(sdkAddress.id()) - .build(); - } - - private static org.apache.flink.statefun.sdk.Address polyglotAddressToSdkAddress( - Address address) { - return new org.apache.flink.statefun.sdk.Address( - new FunctionType(address.getNamespace(), address.getType()), address.getId()); - } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java index a546865..be027e0 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java @@ -18,20 +18,22 @@ package org.apache.flink.statefun.flink.core.httpfn; +import java.util.Map; +import okhttp3.OkHttpClient; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.StatefulFunctionProvider; -import okhttp3.OkHttpClient; - -import java.util.Map; - public class HttpFunctionProvider implements StatefulFunctionProvider { private final Map<FunctionType, HttpFunctionSpec> supportedTypes; private final OkHttpClient client; public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) { this.supportedTypes = supportedTypes; - this.client = new OkHttpClient(); + final long timeoutMs = 30_000; + // TODO: add various timeouts to HttpFunctionSpec + this.client = + OkHttpUtils.newClient( + timeoutMs, timeoutMs, 2 * timeoutMs, timeoutMs, Integer.MAX_VALUE, Integer.MAX_VALUE); } @Override diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java index 1839c36..100c932 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java @@ -16,25 +16,66 @@ package org.apache.flink.statefun.flink.core.httpfn; +import static org.apache.flink.util.Preconditions.checkState; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import okhttp3.Call; import okhttp3.Callback; +import okhttp3.ConnectionPool; +import okhttp3.Dispatcher; +import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import java.io.IOException; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; - final class OkHttpUtils { private OkHttpUtils() {} - public static CompletableFuture<Response> call(OkHttpClient client, Request request) { + static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream"); + + static CompletableFuture<Response> call(OkHttpClient client, Request request) { CompletableFuture<Response> future = new CompletableFuture<>(); client.newCall(request).enqueue(new CompletableFutureCallback(future)); return future; } + static InputStream responseBody(Response httpResponse) { + checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code()); + checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)"); + checkState( + Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY), + "Wrong HTTP content-type %s", + httpResponse.body().contentType()); + return httpResponse.body().byteStream(); + } + + static OkHttpClient newClient( + long readTimeoutMillis, + long writeTimeoutMillis, + long callTimeout, + long connectionTimeInMillis, + int maxRequestsPerHost, + int maxRequests) { + Dispatcher dispatcher = new Dispatcher(); + dispatcher.setMaxRequestsPerHost(maxRequestsPerHost); + dispatcher.setMaxRequests(maxRequests); + + return new OkHttpClient.Builder() + .connectTimeout(connectionTimeInMillis, TimeUnit.MILLISECONDS) + .writeTimeout(writeTimeoutMillis, TimeUnit.MILLISECONDS) + .readTimeout(readTimeoutMillis, TimeUnit.MILLISECONDS) + .callTimeout(callTimeout, TimeUnit.MILLISECONDS) + .dispatcher(dispatcher) + .connectionPool(new ConnectionPool()) + .followRedirects(true) + .followSslRedirects(true) + .build(); + } + @SuppressWarnings("NullableProblems") private static final class CompletableFutureCallback implements Callback { private final CompletableFuture<Response> future; diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml index c659fe9..2ef6956 100644 --- a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml +++ b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml @@ -31,6 +31,8 @@ module: type: com.foo/world spec: endpoint: localhost:5959/statefun + states: + - seen_count routers: - router: meta: