This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 422b76b Fix IndexTaskClient to retry on ChannelException (#6649)
422b76b is described below
commit 422b76b33cf80dce62a7f2ac2fe9d80fa7a25244
Author: Jihoon Son <[email protected]>
AuthorDate: Tue Nov 27 15:54:38 2018 -0800
Fix IndexTaskClient to retry on ChannelException (#6649)
* Fix IndexTaskClient to retry on ChannelException
* fix travis and add javadoc
* address comment
---
.../druid/indexing/common/IndexTaskClient.java | 170 ++++++++++++++-------
.../druid/indexing/common/IndexTaskClientTest.java | 153 +++++++++++++++++++
2 files changed, 264 insertions(+), 59 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
index 92042a2..148dc6c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -50,10 +51,12 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
+import java.net.MalformedURLException;
import java.net.Socket;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
/**
* Abstract class to communicate with index tasks via HTTP. This class
provides interfaces to serialize/deserialize
@@ -228,6 +231,38 @@ public abstract class IndexTaskClient implements
AutoCloseable
);
}
+ private Request createRequest(
+ String taskId,
+ TaskLocation location,
+ String path,
+ @Nullable String encodedQueryString,
+ HttpMethod method,
+ @Nullable String mediaType,
+ byte[] content
+ ) throws MalformedURLException
+ {
+ final String host = location.getHost();
+ final String scheme = location.getTlsPort() >= 0 ? "https" : "http";
+ final int port = location.getTlsPort() >= 0 ? location.getTlsPort() :
location.getPort();
+
+ // Use URL constructor, not URI, since the path is already encoded.
+ // The below line can throw a MalformedURLException, and this method
should return immediately without rety.
+ final URL serviceUrl = new URL(
+ scheme,
+ host,
+ port,
+ encodedQueryString == null ? path : StringUtils.format("%s?%s", path,
encodedQueryString)
+ );
+
+ final Request request = new Request(method, serviceUrl);
+ // used to validate that we are talking to the correct worker
+ request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
+ if (content.length > 0) {
+ request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"),
content);
+ }
+ return request;
+ }
+
/**
* Sends an HTTP request to the task of the specified {@code taskId} and
returns a response if it succeeded.
*/
@@ -244,67 +279,40 @@ public abstract class IndexTaskClient implements
AutoCloseable
final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
while (true) {
- FullResponseHolder response = null;
- Request request = null;
- TaskLocation location = TaskLocation.unknown();
String path = StringUtils.format("%s/%s/%s", BASE_PATH,
StringUtils.urlEncode(taskId), encodedPathSuffix);
Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(taskId);
if (!status.isPresent() || !status.get().isRunnable()) {
- throw new TaskNotRunnableException(StringUtils.format(
- "Aborting request because task [%s] is not runnable",
- taskId
- ));
+ throw new TaskNotRunnableException(
+ StringUtils.format(
+ "Aborting request because task [%s] is not runnable",
+ taskId
+ )
+ );
}
- String host = location.getHost();
- String scheme = "";
- int port = -1;
-
- try {
- location = taskInfoProvider.getTaskLocation(taskId);
- if (location.equals(TaskLocation.unknown())) {
- throw new NoTaskLocationException(StringUtils.format("No
TaskLocation available for task [%s]", taskId));
- }
+ final TaskLocation location = taskInfoProvider.getTaskLocation(taskId);
+ if (location.equals(TaskLocation.unknown())) {
+ throw new NoTaskLocationException(StringUtils.format("No TaskLocation
available for task [%s]", taskId));
+ }
- host = location.getHost();
- scheme = location.getTlsPort() >= 0 ? "https" : "http";
- port = location.getTlsPort() >= 0 ? location.getTlsPort() :
location.getPort();
+ final Request request = createRequest(
+ taskId,
+ location,
+ path,
+ encodedQueryString,
+ method,
+ mediaType,
+ content
+ );
+ FullResponseHolder response = null;
+ try {
// Netty throws some annoying exceptions if a connection can't be
opened, which happens relatively frequently
// for tasks that happen to still be starting up, so test the
connection first to keep the logs clean.
- checkConnection(host, port);
-
- try {
- // Use URL constructor, not URI, since the path is already encoded.
- final URL serviceUrl = new URL(
- scheme,
- host,
- port,
- encodedQueryString == null ? path : StringUtils.format("%s?%s",
path, encodedQueryString)
- );
- request = new Request(method, serviceUrl);
-
- // used to validate that we are talking to the correct worker
- request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
-
- if (content.length > 0) {
- request.setContent(Preconditions.checkNotNull(mediaType,
"mediaType"), content);
- }
+ checkConnection(request.getUrl().getHost(),
request.getUrl().getPort());
- log.debug("HTTP %s: %s", method.getName(), serviceUrl.toString());
- response = httpClient.go(request, new
FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
- }
- catch (IOException | ChannelException ioce) {
- throw ioce;
- }
- catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(ie);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
+ response = submitRequest(request);
int responseCode = response.getStatus().getCode();
if (responseCode / 100 == 2) {
@@ -339,15 +347,7 @@ public abstract class IndexTaskClient implements
AutoCloseable
} else {
delay = retryPolicy.getAndIncrementRetryDelay();
}
- String urlForLog = (request != null
- ? request.getUrl().toString()
- : StringUtils.nonStrictFormat(
- "%s://%s:%d%s",
- scheme,
- host,
- port,
- path
- ));
+ final String urlForLog = request.getUrl().toString();
if (!retry) {
// if retry=false, we probably aren't too concerned if the operation
doesn't succeed (i.e. the request was
// for informational purposes only) so don't log a scary stack trace
@@ -387,6 +387,58 @@ public abstract class IndexTaskClient implements
AutoCloseable
}
}
+ private FullResponseHolder submitRequest(Request request) throws
IOException, ChannelException
+ {
+ try {
+ log.debug("HTTP %s: %s", request.getMethod().getName(),
request.getUrl().toString());
+ return httpClient.go(request, new
FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
+ }
+ catch (Exception e) {
+ throw throwIfPossible(e);
+ }
+ }
+
+ /**
+ * Throws if it's possible to throw the given Throwable.
+ *
+ * - The input throwable shouldn't be null.
+ * - If Throwable is an {@link ExecutionException}, this calls itself
recursively with the cause of ExecutionException.
+ * - If Throwable is an {@link IOException} or a {@link ChannelException},
this simply throws it.
+ * - If Throwable is an {@link InterruptedException}, this interrupts the
current thread and throws a RuntimeException
+ * wrapping the InterruptedException
+ * - Otherwise, this simply returns the given Throwable.
+ *
+ * Note that if the given Throable is an ExecutionException, this can return
the cause of ExecutionException.
+ */
+ private RuntimeException throwIfPossible(Throwable t) throws IOException,
ChannelException
+ {
+ Preconditions.checkNotNull(t, "Throwable shoulnd't null");
+
+ if (t instanceof ExecutionException) {
+ if (t.getCause() != null) {
+ return throwIfPossible(t.getCause());
+ } else {
+ return new RuntimeException(t);
+ }
+ }
+
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+
+ if (t instanceof ChannelException) {
+ throw (ChannelException) t;
+ }
+
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(t);
+ }
+
+ Throwables.propagateIfPossible(t);
+ return new RuntimeException(t);
+ }
+
@Override
public void close()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
new file mode 100644
index 0000000..a297db6
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.easymock.EasyMock;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.function.Function;
+
+public class IndexTaskClientTest
+{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private final ObjectMapper objectMapper = new DefaultObjectMapper();
+ private final int numRetries = 2;
+
+ @Test
+ public void failOnMalformedURLException() throws IOException
+ {
+ try (IndexTaskClient indexTaskClient = buildIndexTaskClient(
+ EasyMock.createNiceMock(HttpClient.class),
+ id -> TaskLocation.create(id, -2, -2)
+ )) {
+ expectedException.expect(MalformedURLException.class);
+ expectedException.expectMessage("Invalid port number :-2");
+
+ indexTaskClient.submitRequestWithEmptyContent(
+ "taskId",
+ HttpMethod.GET,
+ "test",
+ null,
+ true
+ );
+ }
+ }
+
+ @Test
+ public void retryOnChannelException() throws IOException
+ {
+ final HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+ EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andReturn(Futures.immediateFailedFuture(new
ChannelException("IndexTaskClientTest")))
+ .times(2);
+ EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andReturn(
+ Futures.immediateFuture(
+ new FullResponseHolder(
+ HttpResponseStatus.OK,
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK),
+ new StringBuilder()
+ )
+ )
+ )
+ .once();
+ EasyMock.replay(httpClient);
+ try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id
-> TaskLocation.create(id, 8000, -1))) {
+ final FullResponseHolder response =
indexTaskClient.submitRequestWithEmptyContent(
+ "taskId",
+ HttpMethod.GET,
+ "test",
+ null,
+ true
+ );
+ Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
+ }
+ }
+
+ private IndexTaskClient buildIndexTaskClient(HttpClient httpClient,
Function<String, TaskLocation> taskLocationProvider)
+ {
+ final TaskInfoProvider taskInfoProvider = new TaskInfoProvider()
+ {
+ @Override
+ public TaskLocation getTaskLocation(String id)
+ {
+ return taskLocationProvider.apply(id);
+ }
+
+ @Override
+ public Optional<TaskStatus> getTaskStatus(String id)
+ {
+ return Optional.of(TaskStatus.running(id));
+ }
+ };
+ return new TestIndexTaskClient(
+ httpClient,
+ objectMapper,
+ taskInfoProvider,
+ new Duration(1000),
+ "indexTaskClientTest",
+ 1,
+ numRetries
+ );
+ }
+
+ private static class TestIndexTaskClient extends IndexTaskClient
+ {
+ private TestIndexTaskClient(
+ HttpClient httpClient,
+ ObjectMapper objectMapper,
+ TaskInfoProvider taskInfoProvider,
+ Duration httpTimeout,
+ String callerId,
+ int numThreads,
+ long numRetries
+ )
+ {
+ super(httpClient, objectMapper, taskInfoProvider, httpTimeout, callerId,
numThreads, numRetries);
+ }
+
+ @Override
+ protected void checkConnection(String host, int port)
+ {
+ // do nothing
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]