gianm commented on code in PR #13354: URL: https://github.com/apache/druid/pull/13354#discussion_r1027379730
########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java: ########## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.IndexTaskClient; +import org.apache.druid.indexing.common.RetryPolicy; +import org.apache.druid.indexing.common.TaskInfoProvider; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.rpc.HttpResponseException; +import org.apache.druid.rpc.IgnoreHttpResponseHandler; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceClosedException; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.ServiceLocations; +import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.rpc.ServiceNotAvailableException; +import org.apache.druid.rpc.ServiceRetryPolicy; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy; +import org.apache.druid.segment.incremental.ParseExceptionReport; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.DateTime; +import org.joda.time.Duration; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * Implementation of {@link SeekableStreamIndexTaskClient} based on {@link ServiceClient}. + * + * Used when {@link org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig#getChatAsync()} + * is true. + */ +public abstract class SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType> + implements SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> +{ + private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskClientAsyncImpl.class); + + private final ServiceClientFactory serviceClientFactory; + private final TaskInfoProvider taskInfoProvider; + private final ObjectMapper jsonMapper; + private final Duration httpTimeout; + private final long httpRetries; + + // Used by getOffsetsWhenPaused, due to special retry logic. + private final ScheduledExecutorService retryExec; + + public SeekableStreamIndexTaskClientAsyncImpl( + final String dataSource, + final ServiceClientFactory serviceClientFactory, + final TaskInfoProvider taskInfoProvider, + final ObjectMapper jsonMapper, + final Duration httpTimeout, + final long httpRetries + ) + { + this.serviceClientFactory = serviceClientFactory; + this.taskInfoProvider = taskInfoProvider; + this.jsonMapper = jsonMapper; + this.httpTimeout = httpTimeout; + this.httpRetries = httpRetries; + this.retryExec = Execs.scheduledSingleThreaded( + StringUtils.format( + "%s-%s-%%d", + getClass().getSimpleName(), + StringUtils.encodeForFormat(dataSource) + ) + ); + } + + @Override + @SuppressWarnings("unchecked") + public ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> getCheckpointsAsync( + final String id, + final boolean retry + ) + { + return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/checkpoints")) + .handler(new BytesFullResponseHandler()) + .onSuccess(r -> { + final TypeFactory factory = jsonMapper.getTypeFactory(); + return (TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>) + JacksonUtils.readValue( + jsonMapper, + r.getContent(), + factory.constructMapType( + TreeMap.class, + factory.constructType(Integer.class), + factory.constructMapType(Map.class, getPartitionType(), getSequenceType()) + ) + ); + }) + .onNotAvailable(e -> Either.value(new TreeMap<>())) + .retry(retry) + .go(); + } + + @Override + public ListenableFuture<Boolean> stopAsync(final String id, final boolean publish) + { + return makeRequest(id, new RequestBuilder(HttpMethod.POST, "/stop" + (publish ? "?publish=true" : ""))) + .onSuccess(r -> true) + .onHttpError(e -> Either.value(false)) + .onNotAvailable(e -> Either.value(false)) + .onClosed(e -> { + log.debug("Task [%s] couldn't be stopped because it is no longer running.", id); + return Either.value(true); + }) + .go(); + } + + @Override + public ListenableFuture<Boolean> resumeAsync(final String id) + { + return makeRequest(id, new RequestBuilder(HttpMethod.POST, "/resume")) + .onSuccess(r -> true) + .onException(e -> Either.value(false)) + .go(); + } + + @Override + public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getCurrentOffsetsAsync(String id, boolean retry) + { + return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/offsets/current")) + .handler(new BytesFullResponseHandler()) + .onSuccess(r -> deserializeOffsetsMap(r.getContent())) + .onNotAvailable(e -> Either.value(Collections.emptyMap())) + .retry(retry) + .go(); + } + + @Override + public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getEndOffsetsAsync(String id) + { + return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/offsets/end")) + .handler(new BytesFullResponseHandler()) + .onSuccess(r -> deserializeOffsetsMap(r.getContent())) + .onNotAvailable(e -> Either.value(Collections.emptyMap())) + .go(); + } + + @Override + public ListenableFuture<Boolean> setEndOffsetsAsync( + final String id, + final Map<PartitionIdType, SequenceOffsetType> endOffsets, + final boolean finalize + ) + { + final RequestBuilder requestBuilder = new RequestBuilder( + HttpMethod.POST, + StringUtils.format("/offsets/end?finish=%s", finalize) + ).jsonContent(jsonMapper, endOffsets); + + return makeRequest(id, requestBuilder) + .handler(IgnoreHttpResponseHandler.INSTANCE) + .onSuccess(r -> true) + .go(); + } + + @Override + public ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(final String id) + { + return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/status")) + .handler(new BytesFullResponseHandler()) + .onSuccess( + r -> + JacksonUtils.readValue(jsonMapper, r.getContent(), SeekableStreamIndexTaskRunner.Status.class) + ) + .onNotAvailable(e -> Either.value(SeekableStreamIndexTaskRunner.Status.NOT_STARTED)) + .go(); + } + + @Override + public ListenableFuture<DateTime> getStartTimeAsync(String id) + { + return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/time/start")) + .handler(new BytesFullResponseHandler()) + .onSuccess(r -> { + if (isNullOrEmpty(r.getContent())) { + return null; + } else { + return JacksonUtils.readValue(jsonMapper, r.getContent(), DateTime.class); + } + }) + .onNotAvailable(e -> Either.value(null)) + .go(); + } + + @Override + public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(String id) + { + final ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseFuture = + makeRequest(id, new RequestBuilder(HttpMethod.POST, "/pause")) + .handler(new BytesFullResponseHandler()) + .onSuccess(r -> { + if (r.getStatus().equals(HttpResponseStatus.OK)) { + log.info("Task [%s] paused successfully", id); + return deserializeOffsetsMap(r.getContent()); + } else if (r.getStatus().equals(HttpResponseStatus.ACCEPTED)) { + // Return null, which triggers a loop later to wait for the task to enter PAUSED state. + return null; + } else { + throw new ISE( + "Pause request for task [%s] failed with response [%s]", + id, + r.getStatus() + ); + } + }) + .onNotAvailable(e -> Either.value(Collections.emptyMap())) + .go(); + + return FutureUtils.transformAsync( + pauseFuture, + result -> { + if (result != null) { + return Futures.immediateFuture(result); + } else { + return getOffsetsWhenPaused(id, IndexTaskClient.makeRetryPolicyFactory(httpRetries).makeRetryPolicy()); + } + } + ); + } + + @Override + public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(String id) + { + return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/rowStats")) + .handler(new BytesFullResponseHandler()) + .onSuccess(r -> { + if (isNullOrEmpty(r.getContent())) { + log.warn("Got empty response when calling getMovingAverages, id[%s]", id); + return null; + } else { + return JacksonUtils.readValue(jsonMapper, r.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT); + } + }) + .onNotAvailable(e -> Either.value(Collections.emptyMap())) + .go(); + } + + @Override + public ListenableFuture<List<ParseExceptionReport>> getParseErrorsAsync(String id) + { + return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/unparseableEvents")) + .handler(new BytesFullResponseHandler()) + .onSuccess(r -> { + if (isNullOrEmpty(r.getContent())) { + log.warn("Got empty response when calling getParseErrors, id[%s]", id); + return null; + } else { + return JacksonUtils.readValue( + jsonMapper, + r.getContent(), + TYPE_REFERENCE_LIST_PARSE_EXCEPTION_REPORT + ); + } + }) + .onNotAvailable(e -> Either.value(Collections.emptyList())) + .go(); + } + + @Override + public void close() + { + retryExec.shutdownNow(); + } + + /** + * Create a {@link SeekableStreamRequestBuilder}. + */ + private SeekableStreamRequestBuilder<Void, Void, Void> makeRequest( + String taskId, + RequestBuilder requestBuilder + ) + { + return new SeekableStreamRequestBuilder<>( + taskId, + requestBuilder, + IgnoreHttpResponseHandler.INSTANCE, + Function.identity() + ); + } + + /** + * Helper for deserializing offset maps. + */ + private Map<PartitionIdType, SequenceOffsetType> deserializeOffsetsMap(final byte[] content) + { + final MapType offsetsMapType = + jsonMapper.getTypeFactory().constructMapType(Map.class, getPartitionType(), getSequenceType()); + return JacksonUtils.readValue(jsonMapper, content, offsetsMapType); + } + + /** + * Helper for {@link #pauseAsync}. + * + * Calls {@link #getStatusAsync} in a loop until a task is paused, then calls {@link #getCurrentOffsetsAsync} to + * get the post-pause offsets for the task. + */ + private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getOffsetsWhenPaused( + final String taskId, + final RetryPolicy retryPolicy + ) + { + final ListenableFuture<SeekableStreamIndexTaskRunner.Status> statusFuture = getStatusAsync(taskId); + + return FutureUtils.transformAsync( + statusFuture, + status -> { + if (status == SeekableStreamIndexTaskRunner.Status.PAUSED) { + return getCurrentOffsetsAsync(taskId, true); + } else { + final Duration delay; + + synchronized (retryPolicy) { + delay = retryPolicy.getAndIncrementRetryDelay(); + } + + if (delay == null) { + return Futures.immediateFailedFuture( + new ISE( + "Task [%s] failed to change its status from [%s] to [%s], aborting", + taskId, + status, + SeekableStreamIndexTaskRunner.Status.PAUSED + ) + ); + } else { + final long sleepTime = delay.getMillis(); + final SettableFuture<Map<PartitionIdType, SequenceOffsetType>> retVal = SettableFuture.create(); + retryExec.schedule( + () -> + Futures.addCallback( + getOffsetsWhenPaused(taskId, retryPolicy), + new FutureCallback<Map<PartitionIdType, SequenceOffsetType>>() + { + @Override + public void onSuccess(@Nullable Map<PartitionIdType, SequenceOffsetType> result) + { + retVal.set(result); + } + + @Override + public void onFailure(Throwable t) + { + retVal.setException(t); + } + } + ), + sleepTime, + TimeUnit.MILLISECONDS + ); + + return retVal; + } + } + } + ); + } + + private static boolean isNullOrEmpty(@Nullable final byte[] content) + { + return content == null || content.length == 0; + } + + /** + * Helper for setting up each request's desired response, error handling, and retry behavior. + */ + private class SeekableStreamRequestBuilder<IntermediateType, FinalType, T> + { + private final String taskId; + private final RequestBuilder requestBuilder; + + private final List<Function<Throwable, Either<Throwable, T>>> exceptionMappers = new ArrayList<>(); + private HttpResponseHandler<IntermediateType, FinalType> responseHandler; + private Function<FinalType, T> responseTransformer; + private boolean retry = true; + + SeekableStreamRequestBuilder( + String taskId, + RequestBuilder requestBuilder, + HttpResponseHandler<IntermediateType, FinalType> responseHandler, + Function<FinalType, T> responseTransformer + ) + { + this.taskId = taskId; + this.requestBuilder = requestBuilder; + this.responseHandler = responseHandler; + this.responseTransformer = responseTransformer; + } + + /** + * Handler for requests. The result from this handler is fed into the transformer provided by {@link #onSuccess}. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public <NewIntermediateType, NewFinalType> SeekableStreamRequestBuilder<NewIntermediateType, NewFinalType, T> handler( + final HttpResponseHandler<NewIntermediateType, NewFinalType> handler + ) + { + this.responseHandler = (HttpResponseHandler) handler; + return (SeekableStreamRequestBuilder) this; + } + + /** + * Response mapping for successful requests. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public <NewT> SeekableStreamRequestBuilder<IntermediateType, FinalType, NewT> onSuccess( + final Function<FinalType, NewT> responseTransformer + ) + { + this.responseTransformer = (Function) responseTransformer; + return (SeekableStreamRequestBuilder) this; + } + + /** + * Whether the request should be retried on failure. Default is true. + */ + public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> retry(boolean retry) + { + this.retry = retry; + return this; + } + + /** + * Error mapping for all exceptions. + */ + public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onException(final Function<Throwable, Either<Throwable, T>> fn) + { + exceptionMappers.add(fn); + return this; + } + + /** + * Error mapping for {@link HttpResponseException}, which occurs when a task returns a non-2xx HTTP code. + */ + public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onHttpError(final Function<HttpResponseException, Either<Throwable, T>> fn) + { + return onException(e -> { + if (e instanceof HttpResponseException) { + return fn.apply((HttpResponseException) e); + } else { + return Either.error(e); + } + }); + } + + /** + * Error mapping for {@link ServiceNotAvailableException}, which occurs when a task is not available. + */ + public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onNotAvailable(final Function<ServiceNotAvailableException, Either<Throwable, T>> fn) + { + return onException(e -> { + if (e instanceof ServiceNotAvailableException) { + return fn.apply((ServiceNotAvailableException) e); + } else { + return Either.error(e); + } + }); + } + + /** + * Error mapping for {@link ServiceClosedException}, which occurs when a task is not running. + */ + public SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onClosed(final Function<ServiceClosedException, Either<Throwable, T>> fn) + { + return onException(e -> { + if (e instanceof ServiceClosedException) { + return fn.apply((ServiceClosedException) e); + } else { + return Either.error(e); + } + }); + } + + /** + * Issue the request. + */ + public ListenableFuture<T> go() + { + final ServiceClient client = makeClient(taskId, retry); + final SettableFuture<T> retVal = SettableFuture.create(); + + Futures.addCallback( + FutureUtils.transform( + client.asyncRequest(requestBuilder.timeout(httpTimeout), responseHandler), + responseTransformer + ), + new FutureCallback<T>() + { + @Override + public void onSuccess(@Nullable T result) + { + retVal.set(result); + } + + @Override + public void onFailure(Throwable t) + { + Either<Throwable, T> either = Either.error(t); + + for (final Function<Throwable, Either<Throwable, T>> exceptionMapper : exceptionMappers) { + if (!either.isError()) { + break; + } + + try { + final Either<Throwable, T> nextEither = exceptionMapper.apply(either.error()); + if (nextEither != null) { + either = nextEither; + } + } + catch (Throwable e) { + // Not expected: on-error function should never throw exceptions. Continue mapping. + log.warn(e, "Failed to map exception encountered while contacting task [%s]", taskId); + } + } + + if (either.isError()) { + retVal.setException(either.error()); + } else { + retVal.set(either.valueOrThrow()); + } + } + } + ); + + return retVal; + } + + private ServiceClient makeClient(final String taskId, final boolean retry) + { + final ServiceRetryPolicy retryPolicy = makeRetryPolicy(taskId, retry); + final SeekableStreamTaskLocator locator = new SeekableStreamTaskLocator(taskInfoProvider, taskId); + return serviceClientFactory.makeClient(taskId, locator, retryPolicy); Review Comment: I did it this way because I didn't want to deal with figuring out how long to keep the clients for, and they're pretty cheap to create. The only state they have is the cache of redirects. But, tasks don't do redirects — only leadery things like Coordinators and Overlords do that. So I think it's OK. I added some comments: ``` // We're creating a new locator for each request and not closing it. This is OK, since SeekableStreamTaskLocator // is stateless, cheap to create, and its close() method does nothing. final SeekableStreamTaskLocator locator = new SeekableStreamTaskLocator(taskInfoProvider, taskId); // We're creating a new client for each request. This is OK, clients are cheap to create and do not contain // state that is important for us to retain across requests. (The main state they retain is preferred location // from prior redirects; but tasks don't do redirects.) return serviceClientFactory.makeClient(taskId, locator, retryPolicy); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
