This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 69abd0984f8b3acb11306a38676c346e3a189c60 Author: Igal Shilman <igalshil...@gmail.com> AuthorDate: Thu Feb 13 14:56:17 2020 +0100 [FLINK-15956] Use ExponentialBackoff during retires --- .../flink/core/httpfn/HttpFunctionProvider.java | 6 +- .../statefun/flink/core/httpfn/OkHttpUtils.java | 70 ++++--------- .../flink/core/httpfn/RetryingCallback.java | 113 +++++++++++++++++++++ 3 files changed, 136 insertions(+), 53 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java index be027e0..3d94109 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java @@ -29,11 +29,7 @@ public class HttpFunctionProvider implements StatefulFunctionProvider { public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) { this.supportedTypes = supportedTypes; - final long timeoutMs = 30_000; - // TODO: add various timeouts to HttpFunctionSpec - this.client = - OkHttpUtils.newClient( - timeoutMs, timeoutMs, 2 * timeoutMs, timeoutMs, Integer.MAX_VALUE, Integer.MAX_VALUE); + this.client = OkHttpUtils.newClient(); } @Override diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java index 100c932..c5fb4be 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/OkHttpUtils.java @@ -18,13 +18,11 @@ package org.apache.flink.statefun.flink.core.httpfn; import static org.apache.flink.util.Preconditions.checkState; -import java.io.IOException; import java.io.InputStream; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import okhttp3.Call; -import okhttp3.Callback; import okhttp3.ConnectionPool; import okhttp3.Dispatcher; import okhttp3.MediaType; @@ -35,63 +33,39 @@ import okhttp3.Response; final class OkHttpUtils { private OkHttpUtils() {} - static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream"); - - static CompletableFuture<Response> call(OkHttpClient client, Request request) { - CompletableFuture<Response> future = new CompletableFuture<>(); - client.newCall(request).enqueue(new CompletableFutureCallback(future)); - return future; - } + private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(2); - static InputStream responseBody(Response httpResponse) { - checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code()); - checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)"); - checkState( - Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY), - "Wrong HTTP content-type %s", - httpResponse.body().contentType()); - return httpResponse.body().byteStream(); - } + static final MediaType MEDIA_TYPE_BINARY = MediaType.parse("application/octet-stream"); - static OkHttpClient newClient( - long readTimeoutMillis, - long writeTimeoutMillis, - long callTimeout, - long connectionTimeInMillis, - int maxRequestsPerHost, - int maxRequests) { + static OkHttpClient newClient() { Dispatcher dispatcher = new Dispatcher(); - dispatcher.setMaxRequestsPerHost(maxRequestsPerHost); - dispatcher.setMaxRequests(maxRequests); + dispatcher.setMaxRequestsPerHost(Integer.MAX_VALUE); + dispatcher.setMaxRequests(Integer.MAX_VALUE); return new OkHttpClient.Builder() - .connectTimeout(connectionTimeInMillis, TimeUnit.MILLISECONDS) - .writeTimeout(writeTimeoutMillis, TimeUnit.MILLISECONDS) - .readTimeout(readTimeoutMillis, TimeUnit.MILLISECONDS) - .callTimeout(callTimeout, TimeUnit.MILLISECONDS) + .callTimeout(DEFAULT_CALL_TIMEOUT) .dispatcher(dispatcher) .connectionPool(new ConnectionPool()) .followRedirects(true) .followSslRedirects(true) + .retryOnConnectionFailure(true) .build(); } - @SuppressWarnings("NullableProblems") - private static final class CompletableFutureCallback implements Callback { - private final CompletableFuture<Response> future; - - public CompletableFutureCallback(CompletableFuture<Response> future) { - this.future = Objects.requireNonNull(future); - } - - @Override - public void onFailure(Call call, IOException e) { - future.completeExceptionally(e); - } + static CompletableFuture<Response> call(OkHttpClient client, Request request) { + Call newCall = client.newCall(request); + RetryingCallback callback = new RetryingCallback(newCall.timeout()); + newCall.enqueue(callback); + return callback.future(); + } - @Override - public void onResponse(Call call, Response response) { - future.complete(response); - } + static InputStream responseBody(Response httpResponse) { + checkState(httpResponse.isSuccessful(), "Unexpected HTTP status code %s", httpResponse.code()); + checkState(httpResponse.body() != null, "Unexpected empty HTTP response (no body)"); + checkState( + Objects.equals(httpResponse.body().contentType(), MEDIA_TYPE_BINARY), + "Wrong HTTP content-type %s", + httpResponse.body().contentType()); + return httpResponse.body().byteStream(); } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java new file mode 100644 index 0000000..f8a4a8a --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core.httpfn; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Response; +import okio.Timeout; +import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff; +import org.apache.flink.util.function.RunnableWithException; + +@SuppressWarnings("NullableProblems") +final class RetryingCallback implements Callback { + private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(10); + + private static final Set<Integer> RETRYABLE_HTTP_CODES = + new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500)); + + private final CompletableFuture<Response> resultFuture; + private final BoundedExponentialBackoff backoff; + + RetryingCallback(Timeout timeout) { + this.resultFuture = new CompletableFuture<>(); + this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout)); + } + + CompletableFuture<Response> future() { + return resultFuture; + } + + @Override + public void onFailure(Call call, IOException cause) { + tryWithFuture(() -> onFailureUnsafe(call, cause)); + } + + @Override + public void onResponse(Call call, Response response) { + tryWithFuture(() -> onResponseUnsafe(call, response)); + } + + private void onFailureUnsafe(Call call, IOException cause) { + if (!retryAfterApplyingBackoff(call)) { + throw new IllegalStateException( + "Maximal request time has elapsed. Last cause is attached", cause); + } + } + + private void onResponseUnsafe(Call call, Response response) { + if (response.isSuccessful()) { + resultFuture.complete(response); + return; + } + if (!RETRYABLE_HTTP_CODES.contains(response.code())) { + throw new IllegalStateException("Non successful HTTP response code " + response.code()); + } + if (!retryAfterApplyingBackoff(call)) { + throw new IllegalStateException( + "Maximal request time has elapsed. Last known error is: invalid HTTP response code " + + response.code()); + } + } + + /** + * Retires the original call, after applying backoff. + * + * @return if the request was retried successfully, false otherwise. + */ + private boolean retryAfterApplyingBackoff(Call call) { + if (backoff.applyNow()) { + call.clone().enqueue(this); + return true; + } + return false; + } + + /** + * Executes the runnable, and completes {@link #resultFuture} with any exceptions thrown, during + * its execution. + */ + private void tryWithFuture(RunnableWithException runnable) { + try { + runnable.run(); + } catch (Throwable t) { + resultFuture.completeExceptionally(t); + } + } + + private static Duration duration(Timeout timeout) { + return Duration.ofNanos(timeout.timeoutNanos()); + } +}