kfaraz commented on code in PR #13354:
URL: https://github.com/apache/druid/pull/13354#discussion_r1024181978
##########
core/src/main/java/org/apache/druid/common/guava/FutureUtils.java:
##########
@@ -106,6 +110,57 @@ public static <T, R> ListenableFuture<R> transform(final
ListenableFuture<T> fut
return Futures.transform(future, fn::apply);
}
+ /**
+ * Like {@link Futures#transform(ListenableFuture, AsyncFunction)}, but
works better with lambdas due to not having
+ * overloads.
+ *
+ * One can write {@code FutureUtils.transform(future, v -> ...)} instead of
+ * {@code Futures.transform(future, (Function<? super T, ?>) v -> ...)}
+ */
+ public static <T, R> ListenableFuture<R> transformAsync(final
ListenableFuture<T> future, final AsyncFunction<T, R> fn)
+ {
+ return Futures.transform(future, fn);
+ }
+
+ /**
+ * Like {@link Futures#successfulAsList}, but returns {@link Either} instead
of using {@code} null in case of error.
Review Comment:
```suggestion
* Like {@link Futures#successfulAsList}, but returns {@link Either}
instead of using {@code null} in case of error.
```
##########
core/src/main/java/org/apache/druid/common/guava/FutureUtils.java:
##########
@@ -106,6 +110,57 @@ public static <T, R> ListenableFuture<R> transform(final
ListenableFuture<T> fut
return Futures.transform(future, fn::apply);
}
+ /**
+ * Like {@link Futures#transform(ListenableFuture, AsyncFunction)}, but
works better with lambdas due to not having
+ * overloads.
+ *
+ * One can write {@code FutureUtils.transform(future, v -> ...)} instead of
Review Comment:
```suggestion
* One can write {@code FutureUtils.transformAsync(future, v -> ...)}
instead of
```
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java:
##########
@@ -169,6 +174,23 @@ public Integer getWorkerThreads()
return workerThreads;
}
+ @Override
+ public boolean getChatAsync()
+ {
+ if (chatAsync != null) {
+ return chatAsync;
+ } else {
+ return DEFAULT_ASYNC;
+ }
+ }
+
+ @JsonProperty("chatAsync")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ Boolean getChatAsyncConfigured()
Review Comment:
Do we want to retain the boxed value in order to have desired behaviour when
we switch the default value of `chatAsync` to `true`?
Style-wise, I think it might be simpler and similar to the other fields if
we just name this method as `getChatAsync()` and mark it as `@JsonProperty`
with the other one just being called `chatAsync`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1948,24 +1942,58 @@ public Boolean
apply(SeekableStreamIndexTaskRunner.Status status)
}
}
- List<Boolean> results =
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+ List<Either<Throwable, Boolean>> results = coalesceAndAwait(futures);
+
+ final List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
for (int i = 0; i < results.size(); i++) {
- if (results.get(i) == null) {
- String taskId = futureTaskIds.get(i);
+ String taskId = futureTaskIds.get(i);
+ if (results.get(i).isError() || results.get(i).valueOrThrow() == null) {
killTask(taskId, "Task [%s] failed to return status, killing task",
taskId);
+ } else if (Boolean.valueOf(false).equals(results.get(i).valueOrThrow()))
{
+ // "return false" above means that we want to stop the task.
+ stopFutures.add(stopTask(taskId, false));
}
}
log.debug("Found [%d] seekablestream indexing tasks for dataSource [%s]",
taskCount, dataSource);
- // make sure the checkpoints are consistent with each other and with the
metadata store
+ if (!stopFutures.isEmpty()) {
+ coalesceAndAwait(stopFutures);
+ }
+ // make sure the checkpoints are consistent with each other and with the
metadata store
verifyAndMergeCheckpoints(taskGroupsToVerify.values());
// A pause from the previous Overlord's supervisor, immediately before
leader change,
// can lead to tasks being in a state where they are active but do not
read.
resumeAllActivelyReadingTasks();
}
+ private ListenableFuture<Pair<SeekableStreamIndexTaskRunner.Status,
Map<PartitionIdType, SequenceOffsetType>>> getStatusAndPossiblyEndOffsets(
+ final String taskId
+ )
+ {
+ return Futures.transform(
+ taskClient.getStatusAsync(taskId),
+ new AsyncFunction<SeekableStreamIndexTaskRunner.Status,
Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType,
SequenceOffsetType>>>()
Review Comment:
Suggestion: Maybe use the new `FutureUtils.transform` here and create a new
class for the `Pair<Status, Map>` to make this more readable.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java:
##########
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.IndexTaskClient;
+import org.apache.druid.indexing.common.RetryPolicy;
+import org.apache.druid.indexing.common.TaskInfoProvider;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.rpc.IgnoreHttpResponseHandler;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceClosedException;
+import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.ServiceLocations;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.ServiceNotAvailableException;
+import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+/**
+ * Implementation of {@link SeekableStreamIndexTaskClient} based on {@link
ServiceClient}.
+ *
+ * Used when {@link
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig#getChatAsync()}
+ * is true.
+ */
+public abstract class SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType,
SequenceOffsetType>
+ implements SeekableStreamIndexTaskClient<PartitionIdType,
SequenceOffsetType>
+{
+ private static final EmittingLogger log = new
EmittingLogger(SeekableStreamIndexTaskClientAsyncImpl.class);
+
+ private final ServiceClientFactory serviceClientFactory;
+ private final TaskInfoProvider taskInfoProvider;
+ private final ObjectMapper jsonMapper;
+ private final Duration httpTimeout;
+ private final long httpRetries;
+
+ // Used by getOffsetsWhenPaused, due to special retry logic.
+ private final ScheduledExecutorService retryExec;
+
+ public SeekableStreamIndexTaskClientAsyncImpl(
+ final String dataSource,
+ final ServiceClientFactory serviceClientFactory,
+ final TaskInfoProvider taskInfoProvider,
+ final ObjectMapper jsonMapper,
+ final Duration httpTimeout,
+ final long httpRetries
+ )
+ {
+ this.serviceClientFactory = serviceClientFactory;
+ this.taskInfoProvider = taskInfoProvider;
+ this.jsonMapper = jsonMapper;
+ this.httpTimeout = httpTimeout;
+ this.httpRetries = httpRetries;
+ this.retryExec = Execs.scheduledSingleThreaded(
+ StringUtils.format(
+ "%s-%s-%%d",
+ getClass().getSimpleName(),
+ StringUtils.encodeForFormat(dataSource)
+ )
+ );
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ListenableFuture<TreeMap<Integer, Map<PartitionIdType,
SequenceOffsetType>>> getCheckpointsAsync(
+ final String id,
+ final boolean retry
+ )
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/checkpoints"))
+ .handler(new BytesFullResponseHandler())
+ .onSuccess(r -> {
+ final TypeFactory factory = jsonMapper.getTypeFactory();
+ return (TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>)
+ JacksonUtils.readValue(
+ jsonMapper,
+ r.getContent(),
+ factory.constructMapType(
+ TreeMap.class,
+ factory.constructType(Integer.class),
+ factory.constructMapType(Map.class, getPartitionType(),
getSequenceType())
+ )
+ );
+ })
+ .onNotAvailable(e -> Either.value(new TreeMap<>()))
+ .retry(retry)
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<Boolean> stopAsync(final String id, final boolean
publish)
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.POST, "/stop" +
(publish ? "?publish=true" : "")))
+ .onSuccess(r -> true)
+ .onHttpError(e -> Either.value(false))
+ .onNotAvailable(e -> Either.value(false))
+ .onClosed(e -> {
+ log.debug("Task [%s] couldn't be stopped because it is no longer
running.", id);
+ return Either.value(true);
+ })
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<Boolean> resumeAsync(final String id)
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.POST, "/resume"))
+ .onSuccess(r -> true)
+ .onException(e -> Either.value(false))
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>
getCurrentOffsetsAsync(String id, boolean retry)
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.GET,
"/offsets/current"))
+ .handler(new BytesFullResponseHandler())
+ .onSuccess(r -> deserializeOffsetsMap(r.getContent()))
+ .onNotAvailable(e -> Either.value(Collections.emptyMap()))
+ .retry(retry)
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>
getEndOffsetsAsync(String id)
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/offsets/end"))
+ .handler(new BytesFullResponseHandler())
+ .onSuccess(r -> deserializeOffsetsMap(r.getContent()))
+ .onNotAvailable(e -> Either.value(Collections.emptyMap()))
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<Boolean> setEndOffsetsAsync(
+ final String id,
+ final Map<PartitionIdType, SequenceOffsetType> endOffsets,
+ final boolean finalize
+ )
+ {
+ final RequestBuilder requestBuilder = new RequestBuilder(
+ HttpMethod.POST,
+ StringUtils.format("/offsets/end?finish=%s", finalize)
+ ).jsonContent(jsonMapper, endOffsets);
+
+ return makeRequest(id, requestBuilder)
+ .handler(IgnoreHttpResponseHandler.INSTANCE)
+ .onSuccess(r -> true)
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<SeekableStreamIndexTaskRunner.Status>
getStatusAsync(final String id)
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/status"))
+ .handler(new BytesFullResponseHandler())
+ .onSuccess(
+ r ->
+ JacksonUtils.readValue(jsonMapper, r.getContent(),
SeekableStreamIndexTaskRunner.Status.class)
+ )
+ .onNotAvailable(e ->
Either.value(SeekableStreamIndexTaskRunner.Status.NOT_STARTED))
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<DateTime> getStartTimeAsync(String id)
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/time/start"))
+ .handler(new BytesFullResponseHandler())
+ .onSuccess(r -> {
+ if (isNullOrEmpty(r.getContent())) {
+ return null;
+ } else {
+ return JacksonUtils.readValue(jsonMapper, r.getContent(),
DateTime.class);
+ }
+ })
+ .onNotAvailable(e -> Either.value(null))
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>
pauseAsync(String id)
+ {
+ final ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>
pauseFuture =
+ makeRequest(id, new RequestBuilder(HttpMethod.POST, "/pause"))
+ .handler(new BytesFullResponseHandler())
+ .onSuccess(r -> {
+ if (r.getStatus().equals(HttpResponseStatus.OK)) {
+ log.info("Task [%s] paused successfully", id);
+ return deserializeOffsetsMap(r.getContent());
+ } else if (r.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
+ // Return null, which triggers a loop later to wait for the
task to enter PAUSED state.
+ return null;
+ } else {
+ throw new ISE(
+ "Pause request for task [%s] failed with response [%s]",
+ id,
+ r.getStatus()
+ );
+ }
+ })
+ .onNotAvailable(e -> Either.value(Collections.emptyMap()))
+ .go();
+
+ return FutureUtils.transformAsync(
+ pauseFuture,
+ result -> {
+ if (result != null) {
+ return Futures.immediateFuture(result);
+ } else {
+ return getOffsetsWhenPaused(id,
IndexTaskClient.makeRetryPolicyFactory(httpRetries).makeRetryPolicy());
+ }
+ }
+ );
+ }
+
+ @Override
+ public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(String
id)
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/rowStats"))
+ .handler(new BytesFullResponseHandler())
+ .onSuccess(r -> {
+ if (isNullOrEmpty(r.getContent())) {
+ log.warn("Got empty response when calling getMovingAverages,
id[%s]", id);
+ return null;
+ } else {
+ return JacksonUtils.readValue(jsonMapper, r.getContent(),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
+ }
+ })
+ .onNotAvailable(e -> Either.value(Collections.emptyMap()))
+ .go();
+ }
+
+ @Override
+ public ListenableFuture<List<ParseExceptionReport>>
getParseErrorsAsync(String id)
+ {
+ return makeRequest(id, new RequestBuilder(HttpMethod.GET,
"/unparseableEvents"))
+ .handler(new BytesFullResponseHandler())
+ .onSuccess(r -> {
+ if (isNullOrEmpty(r.getContent())) {
+ log.warn("Got empty response when calling getParseErrors, id[%s]",
id);
+ return null;
+ } else {
+ return JacksonUtils.readValue(
+ jsonMapper,
+ r.getContent(),
+ TYPE_REFERENCE_LIST_PARSE_EXCEPTION_REPORT
+ );
+ }
+ })
+ .onNotAvailable(e -> Either.value(Collections.emptyList()))
+ .go();
+ }
+
+ @Override
+ public void close()
+ {
+ retryExec.shutdownNow();
+ }
+
+ /**
+ * Create a {@link SeekableStreamRequestBuilder}.
+ */
+ private SeekableStreamRequestBuilder<Void, Void, Void> makeRequest(
+ String taskId,
+ RequestBuilder requestBuilder
+ )
+ {
+ return new SeekableStreamRequestBuilder<>(
+ taskId,
+ requestBuilder,
+ IgnoreHttpResponseHandler.INSTANCE,
+ Function.identity()
+ );
+ }
+
+ /**
+ * Helper for deserializing offset maps.
+ */
+ private Map<PartitionIdType, SequenceOffsetType> deserializeOffsetsMap(final
byte[] content)
+ {
+ final MapType offsetsMapType =
+ jsonMapper.getTypeFactory().constructMapType(Map.class,
getPartitionType(), getSequenceType());
+ return JacksonUtils.readValue(jsonMapper, content, offsetsMapType);
+ }
+
+ /**
+ * Helper for {@link #pauseAsync}.
+ *
+ * Calls {@link #getStatusAsync} in a loop until a task is paused, then
calls {@link #getCurrentOffsetsAsync} to
+ * get the post-pause offsets for the task.
+ */
+ private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>
getOffsetsWhenPaused(
+ final String taskId,
+ final RetryPolicy retryPolicy
+ )
+ {
+ final ListenableFuture<SeekableStreamIndexTaskRunner.Status> statusFuture
= getStatusAsync(taskId);
+
+ return FutureUtils.transformAsync(
+ statusFuture,
+ status -> {
+ if (status == SeekableStreamIndexTaskRunner.Status.PAUSED) {
+ return getCurrentOffsetsAsync(taskId, true);
+ } else {
+ final Duration delay;
+
+ synchronized (retryPolicy) {
+ delay = retryPolicy.getAndIncrementRetryDelay();
+ }
+
+ if (delay == null) {
+ return Futures.immediateFailedFuture(
+ new ISE(
+ "Task [%s] failed to change its status from [%s] to
[%s], aborting",
+ taskId,
+ status,
+ SeekableStreamIndexTaskRunner.Status.PAUSED
+ )
+ );
+ } else {
+ final long sleepTime = delay.getMillis();
+ final SettableFuture<Map<PartitionIdType, SequenceOffsetType>>
retVal = SettableFuture.create();
+ retryExec.schedule(
+ () ->
+ Futures.addCallback(
+ getOffsetsWhenPaused(taskId, retryPolicy),
+ new FutureCallback<Map<PartitionIdType,
SequenceOffsetType>>()
+ {
+ @Override
+ public void onSuccess(@Nullable
Map<PartitionIdType, SequenceOffsetType> result)
+ {
+ retVal.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ retVal.setException(t);
+ }
+ }
+ ),
+ sleepTime,
+ TimeUnit.MILLISECONDS
+ );
+
+ return retVal;
+ }
+ }
+ }
+ );
+ }
+
+ private static boolean isNullOrEmpty(@Nullable final byte[] content)
+ {
+ return content == null || content.length == 0;
+ }
+
+ /**
+ * Helper for setting up each request's desired response, error handling,
and retry behavior.
+ */
+ private class SeekableStreamRequestBuilder<IntermediateType, FinalType, T>
+ {
+ private final String taskId;
+ private final RequestBuilder requestBuilder;
+
+ private final List<Function<Throwable, Either<Throwable, T>>>
exceptionMappers = new ArrayList<>();
+ private HttpResponseHandler<IntermediateType, FinalType> responseHandler;
+ private Function<FinalType, T> responseTransformer;
+ private boolean retry = true;
+
+ SeekableStreamRequestBuilder(
+ String taskId,
+ RequestBuilder requestBuilder,
+ HttpResponseHandler<IntermediateType, FinalType> responseHandler,
+ Function<FinalType, T> responseTransformer
+ )
+ {
+ this.taskId = taskId;
+ this.requestBuilder = requestBuilder;
+ this.responseHandler = responseHandler;
+ this.responseTransformer = responseTransformer;
+ }
+
+ /**
+ * Handler for requests. The result from this handler is fed into the
transformer provided by {@link #onSuccess}.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public <NewIntermediateType, NewFinalType>
SeekableStreamRequestBuilder<NewIntermediateType, NewFinalType, T> handler(
+ final HttpResponseHandler<NewIntermediateType, NewFinalType> handler
+ )
+ {
+ this.responseHandler = (HttpResponseHandler) handler;
+ return (SeekableStreamRequestBuilder) this;
+ }
+
+ /**
+ * Response mapping for successful requests.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public <NewT> SeekableStreamRequestBuilder<IntermediateType, FinalType,
NewT> onSuccess(
+ final Function<FinalType, NewT> responseTransformer
+ )
+ {
+ this.responseTransformer = (Function) responseTransformer;
+ return (SeekableStreamRequestBuilder) this;
+ }
+
+ /**
+ * Whether the request should be retried on failure. Default is true.
+ */
+ public SeekableStreamRequestBuilder<IntermediateType, FinalType, T>
retry(boolean retry)
+ {
+ this.retry = retry;
+ return this;
+ }
+
+ /**
+ * Error mapping for all exceptions.
+ */
+ public SeekableStreamRequestBuilder<IntermediateType, FinalType, T>
onException(final Function<Throwable, Either<Throwable, T>> fn)
+ {
+ exceptionMappers.add(fn);
+ return this;
+ }
+
+ /**
+ * Error mapping for {@link HttpResponseException}, which occurs when a
task returns a non-2xx HTTP code.
+ */
+ public SeekableStreamRequestBuilder<IntermediateType, FinalType, T>
onHttpError(final Function<HttpResponseException, Either<Throwable, T>> fn)
+ {
+ return onException(e -> {
+ if (e instanceof HttpResponseException) {
+ return fn.apply((HttpResponseException) e);
+ } else {
+ return Either.error(e);
+ }
+ });
+ }
+
+ /**
+ * Error mapping for {@link ServiceNotAvailableException}, which occurs
when a task is not available.
+ */
+ public SeekableStreamRequestBuilder<IntermediateType, FinalType, T>
onNotAvailable(final Function<ServiceNotAvailableException, Either<Throwable,
T>> fn)
+ {
+ return onException(e -> {
+ if (e instanceof ServiceNotAvailableException) {
+ return fn.apply((ServiceNotAvailableException) e);
+ } else {
+ return Either.error(e);
+ }
+ });
+ }
+
+ /**
+ * Error mapping for {@link ServiceClosedException}, which occurs when a
task is not running.
+ */
+ public SeekableStreamRequestBuilder<IntermediateType, FinalType, T>
onClosed(final Function<ServiceClosedException, Either<Throwable, T>> fn)
+ {
+ return onException(e -> {
+ if (e instanceof ServiceClosedException) {
+ return fn.apply((ServiceClosedException) e);
+ } else {
+ return Either.error(e);
+ }
+ });
+ }
+
+ /**
+ * Issue the request.
+ */
+ public ListenableFuture<T> go()
+ {
+ final ServiceClient client = makeClient(taskId, retry);
+ final SettableFuture<T> retVal = SettableFuture.create();
+
+ Futures.addCallback(
+ FutureUtils.transform(
+ client.asyncRequest(requestBuilder.timeout(httpTimeout),
responseHandler),
+ responseTransformer
+ ),
+ new FutureCallback<T>()
+ {
+ @Override
+ public void onSuccess(@Nullable T result)
+ {
+ retVal.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ Either<Throwable, T> either = Either.error(t);
+
+ for (final Function<Throwable, Either<Throwable, T>>
exceptionMapper : exceptionMappers) {
+ if (!either.isError()) {
+ break;
+ }
+
+ try {
+ final Either<Throwable, T> nextEither =
exceptionMapper.apply(either.error());
+ if (nextEither != null) {
+ either = nextEither;
+ }
+ }
+ catch (Throwable e) {
+ // Not expected: on-error function should never throw
exceptions. Continue mapping.
+ log.warn(e, "Failed to map exception encountered while
contacting task [%s]", taskId);
+ }
+ }
+
+ if (either.isError()) {
+ retVal.setException(either.error());
+ } else {
+ retVal.set(either.valueOrThrow());
+ }
+ }
+ }
+ );
+
+ return retVal;
+ }
+
+ private ServiceClient makeClient(final String taskId, final boolean retry)
+ {
+ final ServiceRetryPolicy retryPolicy = makeRetryPolicy(taskId, retry);
+ final SeekableStreamTaskLocator locator = new
SeekableStreamTaskLocator(taskInfoProvider, taskId);
+ return serviceClientFactory.makeClient(taskId, locator, retryPolicy);
Review Comment:
Would this create a new client on every call? Would it make sense to cache
this to help with the preferred location in case of redirects?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]