This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 422b76b  Fix IndexTaskClient to retry on ChannelException (#6649)
422b76b is described below

commit 422b76b33cf80dce62a7f2ac2fe9d80fa7a25244
Author: Jihoon Son <[email protected]>
AuthorDate: Tue Nov 27 15:54:38 2018 -0800

    Fix IndexTaskClient to retry on ChannelException (#6649)
    
    * Fix IndexTaskClient to retry on ChannelException
    
    * fix travis and add javadoc
    
    * address comment
---
 .../druid/indexing/common/IndexTaskClient.java     | 170 ++++++++++++++-------
 .../druid/indexing/common/IndexTaskClientTest.java | 153 +++++++++++++++++++
 2 files changed, 264 insertions(+), 59 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
index 92042a2..148dc6c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -50,10 +51,12 @@ import org.joda.time.Period;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
+import java.net.MalformedURLException;
 import java.net.Socket;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Abstract class to communicate with index tasks via HTTP. This class 
provides interfaces to serialize/deserialize
@@ -228,6 +231,38 @@ public abstract class IndexTaskClient implements 
AutoCloseable
     );
   }
 
+  private Request createRequest(
+      String taskId,
+      TaskLocation location,
+      String path,
+      @Nullable String encodedQueryString,
+      HttpMethod method,
+      @Nullable String mediaType,
+      byte[] content
+  ) throws MalformedURLException
+  {
+    final String host = location.getHost();
+    final String scheme = location.getTlsPort() >= 0 ? "https" : "http";
+    final int port = location.getTlsPort() >= 0 ? location.getTlsPort() : 
location.getPort();
+
+    // Use URL constructor, not URI, since the path is already encoded.
+    // The below line can throw a MalformedURLException, and this method 
should return immediately without rety.
+    final URL serviceUrl = new URL(
+        scheme,
+        host,
+        port,
+        encodedQueryString == null ? path : StringUtils.format("%s?%s", path, 
encodedQueryString)
+    );
+
+    final Request request = new Request(method, serviceUrl);
+    // used to validate that we are talking to the correct worker
+    request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
+    if (content.length > 0) {
+      request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), 
content);
+    }
+    return request;
+  }
+
   /**
    * Sends an HTTP request to the task of the specified {@code taskId} and 
returns a response if it succeeded.
    */
@@ -244,67 +279,40 @@ public abstract class IndexTaskClient implements 
AutoCloseable
     final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
 
     while (true) {
-      FullResponseHolder response = null;
-      Request request = null;
-      TaskLocation location = TaskLocation.unknown();
       String path = StringUtils.format("%s/%s/%s", BASE_PATH, 
StringUtils.urlEncode(taskId), encodedPathSuffix);
 
       Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(taskId);
       if (!status.isPresent() || !status.get().isRunnable()) {
-        throw new TaskNotRunnableException(StringUtils.format(
-            "Aborting request because task [%s] is not runnable",
-            taskId
-        ));
+        throw new TaskNotRunnableException(
+            StringUtils.format(
+                "Aborting request because task [%s] is not runnable",
+                taskId
+            )
+        );
       }
 
-      String host = location.getHost();
-      String scheme = "";
-      int port = -1;
-
-      try {
-        location = taskInfoProvider.getTaskLocation(taskId);
-        if (location.equals(TaskLocation.unknown())) {
-          throw new NoTaskLocationException(StringUtils.format("No 
TaskLocation available for task [%s]", taskId));
-        }
+      final TaskLocation location = taskInfoProvider.getTaskLocation(taskId);
+      if (location.equals(TaskLocation.unknown())) {
+        throw new NoTaskLocationException(StringUtils.format("No TaskLocation 
available for task [%s]", taskId));
+      }
 
-        host = location.getHost();
-        scheme = location.getTlsPort() >= 0 ? "https" : "http";
-        port = location.getTlsPort() >= 0 ? location.getTlsPort() : 
location.getPort();
+      final Request request = createRequest(
+          taskId,
+          location,
+          path,
+          encodedQueryString,
+          method,
+          mediaType,
+          content
+      );
 
+      FullResponseHolder response = null;
+      try {
         // Netty throws some annoying exceptions if a connection can't be 
opened, which happens relatively frequently
         // for tasks that happen to still be starting up, so test the 
connection first to keep the logs clean.
-        checkConnection(host, port);
-
-        try {
-          // Use URL constructor, not URI, since the path is already encoded.
-          final URL serviceUrl = new URL(
-              scheme,
-              host,
-              port,
-              encodedQueryString == null ? path : StringUtils.format("%s?%s", 
path, encodedQueryString)
-          );
-          request = new Request(method, serviceUrl);
-
-          // used to validate that we are talking to the correct worker
-          request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
-
-          if (content.length > 0) {
-            request.setContent(Preconditions.checkNotNull(mediaType, 
"mediaType"), content);
-          }
+        checkConnection(request.getUrl().getHost(), 
request.getUrl().getPort());
 
-          log.debug("HTTP %s: %s", method.getName(), serviceUrl.toString());
-          response = httpClient.go(request, new 
FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
-        }
-        catch (IOException | ChannelException ioce) {
-          throw ioce;
-        }
-        catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(ie);
-        }
-        catch (Exception e) {
-          throw new RuntimeException(e);
-        }
+        response = submitRequest(request);
 
         int responseCode = response.getStatus().getCode();
         if (responseCode / 100 == 2) {
@@ -339,15 +347,7 @@ public abstract class IndexTaskClient implements 
AutoCloseable
         } else {
           delay = retryPolicy.getAndIncrementRetryDelay();
         }
-        String urlForLog = (request != null
-                            ? request.getUrl().toString()
-                            : StringUtils.nonStrictFormat(
-                                "%s://%s:%d%s",
-                                scheme,
-                                host,
-                                port,
-                                path
-                            ));
+        final String urlForLog = request.getUrl().toString();
         if (!retry) {
           // if retry=false, we probably aren't too concerned if the operation 
doesn't succeed (i.e. the request was
           // for informational purposes only) so don't log a scary stack trace
@@ -387,6 +387,58 @@ public abstract class IndexTaskClient implements 
AutoCloseable
     }
   }
 
+  private FullResponseHolder submitRequest(Request request) throws 
IOException, ChannelException
+  {
+    try {
+      log.debug("HTTP %s: %s", request.getMethod().getName(), 
request.getUrl().toString());
+      return httpClient.go(request, new 
FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
+    }
+    catch (Exception e) {
+      throw throwIfPossible(e);
+    }
+  }
+
+  /**
+   * Throws if it's possible to throw the given Throwable.
+   *
+   * - The input throwable shouldn't be null.
+   * - If Throwable is an {@link ExecutionException}, this calls itself 
recursively with the cause of ExecutionException.
+   * - If Throwable is an {@link IOException} or a {@link ChannelException}, 
this simply throws it.
+   * - If Throwable is an {@link InterruptedException}, this interrupts the 
current thread and throws a RuntimeException
+   *   wrapping the InterruptedException
+   * - Otherwise, this simply returns the given Throwable.
+   *
+   * Note that if the given Throable is an ExecutionException, this can return 
the cause of ExecutionException.
+   */
+  private RuntimeException throwIfPossible(Throwable t) throws IOException, 
ChannelException
+  {
+    Preconditions.checkNotNull(t, "Throwable shoulnd't null");
+
+    if (t instanceof ExecutionException) {
+      if (t.getCause() != null) {
+        return throwIfPossible(t.getCause());
+      } else {
+        return new RuntimeException(t);
+      }
+    }
+
+    if (t instanceof IOException) {
+      throw (IOException) t;
+    }
+
+    if (t instanceof ChannelException) {
+      throw (ChannelException) t;
+    }
+
+    if (t instanceof InterruptedException) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(t);
+    }
+
+    Throwables.propagateIfPossible(t);
+    return new RuntimeException(t);
+  }
+
   @Override
   public void close()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
new file mode 100644
index 0000000..a297db6
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.easymock.EasyMock;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.function.Function;
+
+public class IndexTaskClientTest
+{
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final ObjectMapper objectMapper = new DefaultObjectMapper();
+  private final int numRetries = 2;
+
+  @Test
+  public void failOnMalformedURLException() throws IOException
+  {
+    try (IndexTaskClient indexTaskClient = buildIndexTaskClient(
+        EasyMock.createNiceMock(HttpClient.class),
+        id -> TaskLocation.create(id, -2, -2)
+    )) {
+      expectedException.expect(MalformedURLException.class);
+      expectedException.expectMessage("Invalid port number :-2");
+
+      indexTaskClient.submitRequestWithEmptyContent(
+          "taskId",
+          HttpMethod.GET,
+          "test",
+          null,
+          true
+      );
+    }
+  }
+
+  @Test
+  public void retryOnChannelException() throws IOException
+  {
+    final HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject()))
+            .andReturn(Futures.immediateFailedFuture(new 
ChannelException("IndexTaskClientTest")))
+            .times(2);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject()))
+            .andReturn(
+                Futures.immediateFuture(
+                    new FullResponseHolder(
+                        HttpResponseStatus.OK,
+                        new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.OK),
+                        new StringBuilder()
+                    )
+                )
+            )
+            .once();
+    EasyMock.replay(httpClient);
+    try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id 
-> TaskLocation.create(id, 8000, -1))) {
+      final FullResponseHolder response = 
indexTaskClient.submitRequestWithEmptyContent(
+          "taskId",
+          HttpMethod.GET,
+          "test",
+          null,
+          true
+      );
+      Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
+    }
+  } 
+
+  private IndexTaskClient buildIndexTaskClient(HttpClient httpClient, 
Function<String, TaskLocation> taskLocationProvider)
+  {
+    final TaskInfoProvider taskInfoProvider = new TaskInfoProvider()
+    {
+      @Override
+      public TaskLocation getTaskLocation(String id)
+      {
+        return taskLocationProvider.apply(id);
+      }
+
+      @Override
+      public Optional<TaskStatus> getTaskStatus(String id)
+      {
+        return Optional.of(TaskStatus.running(id));
+      }
+    };
+    return new TestIndexTaskClient(
+        httpClient,
+        objectMapper,
+        taskInfoProvider,
+        new Duration(1000),
+        "indexTaskClientTest",
+        1,
+        numRetries
+    );
+  }
+
+  private static class TestIndexTaskClient extends IndexTaskClient
+  {
+    private TestIndexTaskClient(
+        HttpClient httpClient,
+        ObjectMapper objectMapper,
+        TaskInfoProvider taskInfoProvider,
+        Duration httpTimeout,
+        String callerId,
+        int numThreads,
+        long numRetries
+    )
+    {
+      super(httpClient, objectMapper, taskInfoProvider, httpTimeout, callerId, 
numThreads, numRetries);
+    }
+
+    @Override
+    protected void checkConnection(String host, int port)
+    {
+      // do nothing
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to