This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fa4afd  URL encode datasources, task ids, authenticator names. (#5938)
9fa4afd is described below

commit 9fa4afdb8ec1bd1881b30076e4e2a503418e5386
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Sep 30 12:29:51 2018 -0700

    URL encode datasources, task ids, authenticator names. (#5938)
    
    * URL encode datasources, task ids, authenticator names.
    
    * Fix URL encoding for router forwarding servlets.
    
    * Fix log-with-offset API.
    
    * Fix test.
    
    * Test adjustments.
    
    * Task client fixes.
    
    * Remove unused import.
---
 .../druid/security/basic/CommonCacheNotifier.java  |  2 +-
 .../druid/indexing/common/IndexTaskClient.java     | 54 ++++++++++++++--------
 .../druid/indexing/overlord/RemoteTaskRunner.java  | 36 ++++++---------
 .../druid/indexing/overlord/TaskRunnerUtils.java   | 27 ++++++++++-
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |  5 +-
 .../druid/indexing/overlord/hrtr/WorkerHolder.java | 23 ++-------
 .../indexing/overlord/RemoteTaskRunnerTest.java    |  4 +-
 .../indexing/overlord/TaskRunnerUtilsTest.java     | 37 +++++++--------
 .../indexer/AbstractITRealtimeIndexTaskTest.java   |  6 +--
 .../druid/tests/indexer/AbstractIndexerTest.java   |  8 +++-
 .../druid/tests/indexer/ITCompactionTaskTest.java  | 11 ++---
 .../apache/druid/tests/indexer/ITIndexerTest.java  | 11 +++--
 .../druid/tests/indexer/ITParallelIndexTest.java   |  7 ++-
 .../druid/tests/indexer/ITUnionQueryTest.java      | 20 ++++----
 .../apache/druid/java/util/common/StringUtils.java | 15 +++++-
 .../server/AsyncManagementForwardingServlet.java   | 24 ++++------
 .../druid/server/AsyncQueryForwardingServlet.java  | 30 +++++-------
 .../java/org/apache/druid/server/JettyUtils.java   | 49 ++++++++++++++++++++
 .../druid/server/http/OverlordProxyServlet.java    | 30 ++++--------
 .../AsyncManagementForwardingServletTest.java      |  2 +-
 .../server/AsyncQueryForwardingServletTest.java    | 15 +++---
 .../org/apache/druid/server/JettyUtilsTest.java    | 34 ++++++--------
 .../server/http/OverlordProxyServletTest.java      |  6 ++-
 23 files changed, 248 insertions(+), 208 deletions(-)

diff --git 
a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
 
b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
index 21a009b..33dd021 100644
--- 
a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
+++ 
b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
@@ -184,7 +184,7 @@ public class CommonCacheNotifier
           druidNode.getServiceScheme(),
           druidNode.getHost(),
           druidNode.getPortToUse(),
-          StringUtils.format(baseUrl, itemName)
+          StringUtils.format(baseUrl, StringUtils.urlEncode(itemName))
       );
     }
     catch (MalformedURLException mue) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
index 1ef152b..92042a2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
@@ -51,7 +51,7 @@ import javax.annotation.Nullable;
 import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.net.Socket;
-import java.net.URI;
+import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Callable;
 
@@ -174,12 +174,12 @@ public abstract class IndexTaskClient implements 
AutoCloseable
   protected FullResponseHolder submitRequestWithEmptyContent(
       String taskId,
       HttpMethod method,
-      String pathSuffix,
-      @Nullable String query,
+      String encodedPathSuffix,
+      @Nullable String encodedQueryString,
       boolean retry
   ) throws IOException, ChannelException, NoTaskLocationException
   {
-    return submitRequest(taskId, null, method, pathSuffix, query, new byte[0], 
retry);
+    return submitRequest(taskId, null, method, encodedPathSuffix, 
encodedQueryString, new byte[0], retry);
   }
 
   /**
@@ -188,13 +188,21 @@ public abstract class IndexTaskClient implements 
AutoCloseable
   protected FullResponseHolder submitJsonRequest(
       String taskId,
       HttpMethod method,
-      String pathSuffix,
-      @Nullable String query,
+      String encodedPathSuffix,
+      @Nullable String encodedQueryString,
       byte[] content,
       boolean retry
   ) throws IOException, ChannelException, NoTaskLocationException
   {
-    return submitRequest(taskId, MediaType.APPLICATION_JSON, method, 
pathSuffix, query, content, retry);
+    return submitRequest(
+        taskId,
+        MediaType.APPLICATION_JSON,
+        method,
+        encodedPathSuffix,
+        encodedQueryString,
+        content,
+        retry
+    );
   }
 
   /**
@@ -203,13 +211,21 @@ public abstract class IndexTaskClient implements 
AutoCloseable
   protected FullResponseHolder submitSmileRequest(
       String taskId,
       HttpMethod method,
-      String pathSuffix,
-      @Nullable String query,
+      String encodedPathSuffix,
+      @Nullable String encodedQueryString,
       byte[] content,
       boolean retry
   ) throws IOException, ChannelException, NoTaskLocationException
   {
-    return submitRequest(taskId, SmileMediaTypes.APPLICATION_JACKSON_SMILE, 
method, pathSuffix, query, content, retry);
+    return submitRequest(
+        taskId,
+        SmileMediaTypes.APPLICATION_JACKSON_SMILE,
+        method,
+        encodedPathSuffix,
+        encodedQueryString,
+        content,
+        retry
+    );
   }
 
   /**
@@ -219,8 +235,8 @@ public abstract class IndexTaskClient implements 
AutoCloseable
       String taskId,
       @Nullable String mediaType, // nullable if content is empty
       HttpMethod method,
-      String pathSuffix,
-      @Nullable String query,
+      String encodedPathSuffix,
+      @Nullable String encodedQueryString,
       byte[] content,
       boolean retry
   ) throws IOException, ChannelException, NoTaskLocationException
@@ -231,7 +247,7 @@ public abstract class IndexTaskClient implements 
AutoCloseable
       FullResponseHolder response = null;
       Request request = null;
       TaskLocation location = TaskLocation.unknown();
-      String path = StringUtils.format("%s/%s/%s", BASE_PATH, taskId, 
pathSuffix);
+      String path = StringUtils.format("%s/%s/%s", BASE_PATH, 
StringUtils.urlEncode(taskId), encodedPathSuffix);
 
       Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(taskId);
       if (!status.isPresent() || !status.get().isRunnable()) {
@@ -260,16 +276,14 @@ public abstract class IndexTaskClient implements 
AutoCloseable
         checkConnection(host, port);
 
         try {
-          URI serviceUri = new URI(
+          // Use URL constructor, not URI, since the path is already encoded.
+          final URL serviceUrl = new URL(
               scheme,
-              null,
               host,
               port,
-              path,
-              query,
-              null
+              encodedQueryString == null ? path : StringUtils.format("%s?%s", 
path, encodedQueryString)
           );
-          request = new Request(method, serviceUri.toURL());
+          request = new Request(method, serviceUrl);
 
           // used to validate that we are talking to the correct worker
           request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
@@ -278,7 +292,7 @@ public abstract class IndexTaskClient implements 
AutoCloseable
             request.setContent(Preconditions.checkNotNull(mediaType, 
"mediaType"), content);
           }
 
-          log.debug("HTTP %s: %s", method.getName(), serviceUri.toString());
+          log.debug("HTTP %s: %s", method.getName(), serviceUrl.toString());
           response = httpClient.go(request, new 
FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
         }
         catch (IOException | ChannelException ioce) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 736cab6..16b75ed 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -42,6 +42,12 @@ import 
com.google.common.util.concurrent.ListenableScheduledFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.curator.CuratorUtils;
 import org.apache.druid.curator.cache.PathChildrenCacheFactory;
@@ -75,12 +81,6 @@ import 
org.apache.druid.java.util.http.client.response.StatusResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
 import org.apache.druid.server.initialization.IndexerZkConfig;
 import org.apache.druid.tasklogs.TaskLogStreamer;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -91,7 +91,6 @@ import org.joda.time.Period;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
@@ -560,7 +559,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
       }
       URL url = null;
       try {
-        url = makeWorkerURL(zkWorker.getWorker(), 
StringUtils.format("/task/%s/shutdown", taskId));
+        url = TaskRunnerUtils.makeWorkerURL(zkWorker.getWorker(), 
"/druid/worker/v1/task/%s/shutdown", taskId);
         final StatusResponseHolder response = httpClient.go(
             new Request(HttpMethod.POST, url),
             RESPONSE_HANDLER,
@@ -598,7 +597,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
       return Optional.absent();
     } else {
       // Worker is still running this task
-      final URL url = makeWorkerURL(zkWorker.getWorker(), 
StringUtils.format("/task/%s/log?offset=%d", taskId, offset));
+      final URL url = TaskRunnerUtils.makeWorkerURL(
+          zkWorker.getWorker(),
+          "/druid/worker/v1/task/%s/log?offset=%d",
+          taskId,
+          offset
+      );
       return Optional.of(
           new ByteSource()
           {
@@ -625,18 +629,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
     }
   }
 
-  private URL makeWorkerURL(Worker worker, String path)
-  {
-    Preconditions.checkArgument(path.startsWith("/"), "path must start with 
'/': %s", path);
-
-    try {
-      return new URL(StringUtils.format("%s://%s/druid/worker/v1%s", 
worker.getScheme(), worker.getHost(), path));
-    }
-    catch (MalformedURLException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
   /**
    * Adds a task to the pending queue
    */
@@ -816,7 +808,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
 
           if (immutableZkWorker != null &&
               
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(),
 task.getId())
-                == null) {
+              == null) {
             assignedWorker = 
zkWorkers.get(immutableZkWorker.getWorker().getHost());
           }
         }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
index 63a571f..176a3a1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
@@ -19,11 +19,20 @@
 
 package org.apache.druid.indexing.overlord;
 
-import org.apache.druid.java.util.emitter.EmittingLogger;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Arrays;
 import java.util.concurrent.Executor;
 
 public class TaskRunnerUtils
@@ -89,4 +98,20 @@ public class TaskRunnerUtils
       }
     }
   }
+
+  public static URL makeWorkerURL(Worker worker, String pathFormat, Object... 
pathParams)
+  {
+    Preconditions.checkArgument(pathFormat.startsWith("/"), "path must start 
with '/': %s", pathFormat);
+    final String path = StringUtils.format(
+        pathFormat,
+        Arrays.stream(pathParams).map(s -> 
StringUtils.urlEncode(s.toString())).toArray()
+    );
+
+    try {
+      return new URI(StringUtils.format("%s://%s%s", worker.getScheme(), 
worker.getHost(), path)).toURL();
+    }
+    catch (URISyntaxException | MalformedURLException e) {
+      throw Throwables.propagate(e);
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index ae8cd0d..2047d53 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableScheduledFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidNodeDiscovery;
@@ -64,7 +65,6 @@ import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -75,7 +75,6 @@ import org.apache.druid.java.util.http.client.Request;
 import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
 import org.apache.druid.server.initialization.IndexerZkConfig;
 import org.apache.druid.tasklogs.TaskLogStreamer;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.KeeperException;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.Period;
@@ -858,7 +857,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
       return Optional.absent();
     } else {
       // Worker is still running this task
-      final URL url = WorkerHolder.makeWorkerURL(worker, 
StringUtils.format("/druid/worker/v1/task/%s/log?offset=%d", taskId, offset));
+      final URL url = TaskRunnerUtils.makeWorkerURL(worker, 
"/druid/worker/v1/task/%s/log?offset=%d", taskId, offset);
       return Optional.of(
           new ByteSource()
           {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index c289a7c..48c087c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -22,12 +22,11 @@ package org.apache.druid.indexing.overlord.hrtr;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Sets;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
+import org.apache.druid.indexing.overlord.TaskRunnerUtils;
 import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
 import org.apache.druid.indexing.worker.TaskAnnouncement;
 import org.apache.druid.indexing.worker.Worker;
@@ -35,7 +34,6 @@ import org.apache.druid.indexing.worker.WorkerHistoryItem;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.RetryUtils;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
@@ -47,7 +45,6 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.DateTime;
 
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -116,7 +113,7 @@ public class WorkerHolder
         smileMapper,
         httpClient,
         workersSyncExec,
-        makeWorkerURL(worker, "/"),
+        TaskRunnerUtils.makeWorkerURL(worker, "/"),
         "/druid-internal/v1/worker",
         WORKER_SYNC_RESP_TYPE_REF,
         config.getSyncRequestTimeout().toStandardDuration().getMillis(),
@@ -211,18 +208,6 @@ public class WorkerHolder
     this.continuouslyFailedTasksCount.incrementAndGet();
   }
 
-  public static URL makeWorkerURL(Worker worker, String path)
-  {
-    Preconditions.checkArgument(path.startsWith("/"), "path must start with 
'/': %s", path);
-
-    try {
-      return new URL(StringUtils.format("%s://%s%s", worker.getScheme(), 
worker.getHost(), path));
-    }
-    catch (MalformedURLException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
   public boolean assignTask(Task task)
   {
     if (disabled.get()) {
@@ -234,7 +219,7 @@ public class WorkerHolder
       return false;
     }
 
-    URL url = makeWorkerURL(worker, "/druid-internal/v1/worker/assignTask");
+    URL url = TaskRunnerUtils.makeWorkerURL(worker, 
"/druid-internal/v1/worker/assignTask");
     int numTries = config.getAssignRequestMaxRetries();
 
     try {
@@ -282,7 +267,7 @@ public class WorkerHolder
 
   public void shutdownTask(String taskId)
   {
-    URL url = makeWorkerURL(worker, 
StringUtils.format("/druid/worker/v1/task/%s/shutdown", taskId));
+    final URL url = TaskRunnerUtils.makeWorkerURL(worker, 
"/druid/worker/v1/task/%s/shutdown", taskId);
 
     try {
       RetryUtils.retry(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 920d401..1616bb4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -79,7 +79,7 @@ public class RemoteTaskRunnerTest
     jsonMapper = rtrTestUtils.getObjectMapper();
     cf = rtrTestUtils.getCuratorFramework();
 
-    task = TestTasks.unending("task");
+    task = TestTasks.unending("task id with spaces");
   }
 
   @After
@@ -308,7 +308,7 @@ public class RemoteTaskRunnerTest
 
     Assert.assertTrue(workerRunningTask(task.getId()));
 
-    
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals("task"));
+    
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals(task.getId()));
 
     cf.delete().forPath(joiner.join(statusPath, task.getId()));
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
similarity index 51%
copy from 
integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
copy to 
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
index 618084d..7c02946 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
@@ -17,31 +17,26 @@
  * under the License.
  */
 
-package org.apache.druid.tests.indexer;
+package org.apache.druid.indexing.overlord;
 
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
+import org.apache.druid.indexing.worker.Worker;
+import org.junit.Assert;
+import org.junit.Test;
 
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITParallelIndexTest extends AbstractITBatchIndexTest
-{
-  private static String INDEX_TASK = 
"/indexer/wikipedia_parallel_index_task.json";
-  private static String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_parallel_index_queries.json";
-  private static String INDEX_DATASOURCE = "wikipedia_parallel_index_test";
+import java.net.URL;
 
+public class TaskRunnerUtilsTest
+{
   @Test
-  public void testIndexData() throws Exception
+  public void testMakeWorkerURL()
   {
-    try {
-      doIndexTestTest(
-          INDEX_DATASOURCE,
-          INDEX_TASK,
-          INDEX_QUERIES_RESOURCE
-      );
-    }
-    finally {
-      unloadAndKillData(INDEX_DATASOURCE);
-    }
+    final URL url = TaskRunnerUtils.makeWorkerURL(
+        new Worker("https", "1.2.3.4:8290", "1.2.3.4", 1, "0"),
+        "/druid/worker/v1/task/%s/log",
+        "foo bar&"
+    );
+    
Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo+bar%26/log";, 
url.toString());
+    Assert.assertEquals("1.2.3.4:8290", url.getAuthority());
+    Assert.assertEquals("/druid/worker/v1/task/foo+bar%26/log", url.getPath());
   }
 }
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
index 0995dba..fb9681f 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
@@ -34,6 +34,7 @@ import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import java.io.Closeable;
 import java.io.InputStream;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -80,7 +81,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends 
AbstractIndexerTes
   void doTest()
   {
     LOG.info("Starting test: ITRealtimeIndexTaskTest");
-    try {
+    try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
       // the task will run for 3 minutes and then shutdown itself
       String task = setShutOffTime(
           getTaskAsString(getTaskResource()),
@@ -153,9 +154,6 @@ public abstract class AbstractITRealtimeIndexTaskTest 
extends AbstractIndexerTes
     catch (Exception e) {
       throw Throwables.propagate(e);
     }
-    finally {
-      unloadAndKillData(INDEX_DATASOURCE);
-    }
   }
 
   String setShutOffTime(String taskAsString, DateTime time)
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index a689567..d59c383 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.testing.utils.TestQueryHelper;
 import org.apache.commons.io.IOUtils;
 import org.joda.time.Interval;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
@@ -53,6 +54,11 @@ public abstract class AbstractIndexerTest
   @Inject
   protected TestQueryHelper queryHelper;
 
+  protected Closeable unloader(final String dataSource)
+  {
+    return () -> unloadAndKillData(dataSource);
+  }
+
   protected void unloadAndKillData(final String dataSource)
   {
     List<String> intervals = coordinator.getSegmentIntervals(dataSource);
@@ -68,7 +74,7 @@ public abstract class AbstractIndexerTest
     unloadAndKillData(dataSource, first, last);
   }
 
-  protected void unloadAndKillData(final String dataSource, String start, 
String end)
+  private void unloadAndKillData(final String dataSource, String start, String 
end)
   {
     // Wait for any existing index tasks to complete before disabling the 
datasource otherwise
     // realtime tasks can get stuck waiting for handoff. 
https://github.com/apache/incubator-druid/issues/1729
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 2f9078a..0901d39 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.testing.utils.RetryUtil;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import java.io.Closeable;
 import java.util.List;
 
 @Guice(moduleFactory = DruidTestModuleFactory.class)
@@ -47,7 +48,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
     if (intervalsBeforeCompaction.contains(compactedInterval)) {
       throw new ISE("Containing a segment for the compacted interval[%s] 
before compaction", compactedInterval);
     }
-    try {
+    try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
       queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
       compactData(false);
 
@@ -59,9 +60,6 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
       intervalsBeforeCompaction.sort(null);
       checkCompactionIntervals(intervalsBeforeCompaction);
     }
-    finally {
-      unloadAndKillData(INDEX_DATASOURCE);
-    }
   }
 
   @Test
@@ -70,7 +68,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
     loadData();
     final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(INDEX_DATASOURCE);
     intervalsBeforeCompaction.sort(null);
-    try {
+    try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
       queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
       compactData(true);
 
@@ -80,9 +78,6 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
 
       checkCompactionIntervals(intervalsBeforeCompaction);
     }
-    finally {
-      unloadAndKillData(INDEX_DATASOURCE);
-    }
   }
 
   private void loadData() throws Exception
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index 2a7bc64..63681e2 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -23,6 +23,8 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import java.io.Closeable;
+
 @Guice(moduleFactory = DruidTestModuleFactory.class)
 public class ITIndexerTest extends AbstractITBatchIndexTest
 {
@@ -35,7 +37,10 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
   @Test
   public void testIndexData() throws Exception
   {
-    try {
+    try (
+        final Closeable indexCloseable = unloader(INDEX_DATASOURCE);
+        final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE)
+    ) {
       doIndexTestTest(
           INDEX_DATASOURCE,
           INDEX_TASK,
@@ -47,9 +52,5 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
           INDEX_QUERIES_RESOURCE
       );
     }
-    finally {
-      unloadAndKillData(INDEX_DATASOURCE);
-      unloadAndKillData(REINDEX_DATASOURCE);
-    }
   }
 }
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
index 618084d..b844acd 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
@@ -23,6 +23,8 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import java.io.Closeable;
+
 @Guice(moduleFactory = DruidTestModuleFactory.class)
 public class ITParallelIndexTest extends AbstractITBatchIndexTest
 {
@@ -33,15 +35,12 @@ public class ITParallelIndexTest extends 
AbstractITBatchIndexTest
   @Test
   public void testIndexData() throws Exception
   {
-    try {
+    try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
       doIndexTestTest(
           INDEX_DATASOURCE,
           INDEX_TASK,
           INDEX_QUERIES_RESOURCE
       );
     }
-    finally {
-      unloadAndKillData(INDEX_DATASOURCE);
-    }
   }
 }
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
index a1bfb4c..dc5017f 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
@@ -20,12 +20,12 @@
 package org.apache.druid.tests.indexer;
 
 import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Throwables;
 import com.google.inject.Inject;
 import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
 import org.apache.druid.curator.discovery.ServerDiscoverySelector;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
@@ -43,6 +43,7 @@ import org.joda.time.DateTime;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -70,10 +71,13 @@ public class ITUnionQueryTest extends AbstractIndexerTest
   IntegrationTestingConfig config;
 
   @Test
-  public void testUnionQuery()
+  public void testUnionQuery() throws IOException
   {
     final int numTasks = 3;
-
+    final Closer closer = Closer.create();
+    for (int i = 0; i < numTasks; i++) {
+      closer.register(unloader(UNION_DATASOURCE + i));
+    }
     try {
       // Load 4 datasources with same dimensions
       String task = setShutOffTime(
@@ -143,16 +147,12 @@ public class ITUnionQueryTest extends AbstractIndexerTest
       this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2);
 
     }
-    catch (Exception e) {
-      LOG.error(e, "Error while testing");
-      throw Throwables.propagate(e);
+    catch (Throwable e) {
+      throw closer.rethrow(e);
     }
     finally {
-      for (int i = 0; i < numTasks; i++) {
-        unloadAndKillData(UNION_DATASOURCE + i);
-      }
+      closer.close();
     }
-
   }
 
   private String setShutOffTime(String taskAsString, DateTime time)
diff --git 
a/java-util/src/main/java/org/apache/druid/java/util/common/StringUtils.java 
b/java-util/src/main/java/org/apache/druid/java/util/common/StringUtils.java
index 12a47ab..b69f81e 100644
--- a/java-util/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+++ b/java-util/src/main/java/org/apache/druid/java/util/common/StringUtils.java
@@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
 
 import javax.annotation.Nullable;
 import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -161,6 +162,16 @@ public class StringUtils
     return s;
   }
 
+  public static String urlEncode(String s)
+  {
+    try {
+      return URLEncoder.encode(s, "UTF-8");
+    }
+    catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private static String removeChar(String s, char c, int firstOccurranceIndex)
   {
     StringBuilder sb = new StringBuilder(s.length() - 1);
@@ -180,6 +191,7 @@ public class StringUtils
    * irrelevant to null handling of the data.
    *
    * @param string the string to test and possibly return
+   *
    * @return {@code string} itself if it is non-null; {@code ""} if it is null
    */
   public static String nullToEmptyNonDruidDataString(@Nullable String string)
@@ -195,8 +207,9 @@ public class StringUtils
    * irrelevant to null handling of the data.
    *
    * @param string the string to test and possibly return
+   *
    * @return {@code string} itself if it is nonempty; {@code null} if it is
-   *     empty or null
+   * empty or null
    */
   @Nullable
   public static String emptyToNullNonDruidDataString(@Nullable String string)
diff --git 
a/server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java
 
b/server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java
index 6454da2..3bd31c5 100644
--- 
a/server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java
+++ 
b/server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java
@@ -20,7 +20,6 @@
 package org.apache.druid.server;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -33,7 +32,6 @@ import org.apache.druid.guice.http.DruidHttpClientConfig;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.security.AuthConfig;
-import org.apache.http.client.utils.URIBuilder;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.proxy.AsyncProxyServlet;
@@ -42,7 +40,6 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.concurrent.TimeUnit;
 
 public class AsyncManagementForwardingServlet extends AsyncProxyServlet
@@ -143,18 +140,15 @@ public class AsyncManagementForwardingServlet extends 
AsyncProxyServlet
   @Override
   protected String rewriteTarget(HttpServletRequest request)
   {
-    try {
-      return new URIBuilder((String) request.getAttribute(BASE_URI_ATTRIBUTE))
-          .setPath(request.getAttribute(MODIFIED_PATH_ATTRIBUTE) != null ?
-                   (String) request.getAttribute(MODIFIED_PATH_ATTRIBUTE) : 
request.getRequestURI())
-          .setQuery(request.getQueryString()) // No need to encode-decode 
queryString, it is already encoded
-          .build()
-          .toString();
-    }
-    catch (URISyntaxException e) {
-      log.error(e, "Unable to rewrite URI [%s]", e.getMessage());
-      throw Throwables.propagate(e);
-    }
+    final String encodedPath = request.getAttribute(MODIFIED_PATH_ATTRIBUTE) 
!= null
+                               ? (String) 
request.getAttribute(MODIFIED_PATH_ATTRIBUTE)
+                               : request.getRequestURI();
+
+    return JettyUtils.concatenateForRewrite(
+        (String) request.getAttribute(BASE_URI_ATTRIBUTE),
+        encodedPath,
+        request.getQueryString()
+    );
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java 
b/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index fa41ada..8546d84 100644
--- 
a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ 
b/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -22,6 +22,7 @@ package org.apache.druid.server;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
@@ -32,6 +33,7 @@ import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.guice.http.DruidHttpClientConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -48,7 +50,6 @@ import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authenticator;
 import org.apache.druid.server.security.AuthenticatorMapper;
-import org.apache.http.client.utils.URIBuilder;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
@@ -64,8 +65,6 @@ import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -365,29 +364,22 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
         request,
         (String) request.getAttribute(SCHEME_ATTRIBUTE),
         (String) request.getAttribute(HOST_ATTRIBUTE)
-    ).toString();
+    );
   }
 
-  protected URI rewriteURI(HttpServletRequest request, String scheme, String 
host)
+  protected String rewriteURI(HttpServletRequest request, String scheme, 
String host)
   {
     return makeURI(scheme, host, request.getRequestURI(), 
request.getQueryString());
   }
 
-  protected static URI makeURI(String scheme, String host, String requestURI, 
String rawQueryString)
+  @VisibleForTesting
+  static String makeURI(String scheme, String host, String requestURI, String 
rawQueryString)
   {
-    try {
-      return new URIBuilder()
-          .setScheme(scheme)
-          .setHost(host)
-          .setPath(requestURI)
-          // No need to encode-decode queryString, it is already encoded
-          .setQuery(rawQueryString)
-          .build();
-    }
-    catch (URISyntaxException e) {
-      log.error(e, "Unable to rewrite URI [%s]", e.getMessage());
-      throw Throwables.propagate(e);
-    }
+    return JettyUtils.concatenateForRewrite(
+        StringUtils.format("%s://%s", scheme, host),
+        requestURI,
+        rawQueryString
+    );
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/server/JettyUtils.java 
b/server/src/main/java/org/apache/druid/server/JettyUtils.java
new file mode 100644
index 0000000..9374fb0
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/JettyUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import javax.annotation.Nullable;
+
+public class JettyUtils
+{
+  /**
+   * Concatenate URI parts, in a way that is useful for proxy servlets.
+   *
+   * @param base               base part of the uri, like http://example.com 
(no trailing slash)
+   * @param encodedPath        encoded path, like you would get from 
HttpServletRequest's getRequestURI
+   * @param encodedQueryString encoded query string, like you would get from 
HttpServletRequest's getQueryString
+   */
+  public static String concatenateForRewrite(
+      final String base,
+      final String encodedPath,
+      @Nullable final String encodedQueryString
+  )
+  {
+    // Query string and path are already encoded, no need for anything fancy 
beyond string concatenation.
+
+    final StringBuilder url = new StringBuilder(base).append(encodedPath);
+
+    if (encodedQueryString != null) {
+      url.append("?").append(encodedQueryString);
+    }
+
+    return url.toString();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/http/OverlordProxyServlet.java 
b/server/src/main/java/org/apache/druid/server/http/OverlordProxyServlet.java
index 0b4a12d..8ed6ac0 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/OverlordProxyServlet.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/OverlordProxyServlet.java
@@ -19,14 +19,13 @@
 
 package org.apache.druid.server.http;
 
-import com.google.common.base.Throwables;
 import com.google.inject.Inject;
 import org.apache.druid.client.indexing.IndexingService;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.http.DruidHttpClientConfig;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.server.JettyUtils;
 import org.apache.druid.server.security.AuthConfig;
 import com.google.inject.Provider;
 import org.eclipse.jetty.client.HttpClient;
@@ -36,8 +35,6 @@ import org.eclipse.jetty.proxy.ProxyServlet;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import java.net.URI;
-import java.net.URISyntaxException;
 
 /**
  * A Proxy servlet that proxies requests to the overlord.
@@ -63,23 +60,16 @@ public class OverlordProxyServlet extends ProxyServlet
   @Override
   protected String rewriteTarget(HttpServletRequest request)
   {
-    try {
-      final String overlordLeader = druidLeaderClient.findCurrentLeader();
-      if (overlordLeader == null) {
-        throw new ISE("Can't find Overlord leader.");
-      }
-
-      String location = StringUtils.format("%s%s", overlordLeader, 
request.getRequestURI());
-
-      if (request.getQueryString() != null) {
-        location = StringUtils.format("%s?%s", location, 
request.getQueryString());
-      }
-
-      return new URI(location).toString();
-    }
-    catch (URISyntaxException e) {
-      throw Throwables.propagate(e);
+    final String overlordLeader = druidLeaderClient.findCurrentLeader();
+    if (overlordLeader == null) {
+      throw new ISE("Can't find Overlord leader.");
     }
+
+    return JettyUtils.concatenateForRewrite(
+        overlordLeader,
+        request.getRequestURI(),
+        request.getQueryString()
+    );
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java
 
b/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java
index fa6d540..a2f4261 100644
--- 
a/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java
@@ -302,7 +302,7 @@ public class AsyncManagementForwardingServletTest extends 
BaseJettyTest
     overlordExpectedRequest.headers = ImmutableMap.of("Authorization", "Basic 
bXl1c2VyOm15cGFzc3dvcmQ=");
 
     HttpURLConnection connection = ((HttpURLConnection)
-        new URL(StringUtils.format("http://localhost:%d/proxy/overlord/%s";, 
port, overlordExpectedRequest.path))
+        new URL(StringUtils.format("http://localhost:%d/proxy/overlord%s";, 
port, overlordExpectedRequest.path))
             .openConnection());
     connection.setRequestMethod(overlordExpectedRequest.method);
 
diff --git 
a/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
 
b/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index 68082c3..49cb2a9 100644
--- 
a/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -348,13 +348,13 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
           )
           {
             @Override
-            protected URI rewriteURI(HttpServletRequest request, String 
scheme, String host)
+            protected String rewriteURI(HttpServletRequest request, String 
scheme, String host)
             {
               String uri = super.rewriteURI(request, scheme, host).toString();
               if (uri.contains("/druid/v2")) {
-                return URI.create(uri.replace("/druid/v2", "/default"));
+                return URI.create(uri.replace("/druid/v2", 
"/default")).toString();
               }
-              return URI.create(uri.replace("/proxy", ""));
+              return URI.create(uri.replace("/proxy", "")).toString();
             }
           });
       //NOTE: explicit maxThreads to workaround 
https://tickets.puppetlabs.com/browse/TK-152
@@ -378,7 +378,7 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
 
     // test params
     Assert.assertEquals(
-        new URI("http://localhost:1234/some/path?param=1";),
+        "http://localhost:1234/some/path?param=1";,
         AsyncQueryForwardingServlet.makeURI("http", "localhost:1234", 
"/some/path", "param=1")
     );
 
@@ -391,20 +391,19 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
             HostAndPort.fromParts("2a00:1450:4007:805::1007", 1234).toString(),
             "/some/path",
             "param=1&param2=%E2%82%AC"
-        ).toASCIIString()
+        )
     );
 
     // test null query
     Assert.assertEquals(
-        new URI("http://localhost/";),
+        "http://localhost/";,
         AsyncQueryForwardingServlet.makeURI("http", "localhost", "/", null)
     );
 
     // Test reWrite Encoded interval with timezone info
     // decoded parameters 1900-01-01T00:00:00.000+01.00 -> 
1900-01-01T00:00:00.000+01:00
     Assert.assertEquals(
-        new URI(
-            
"http://localhost:1234/some/path?intervals=1900-01-01T00%3A00%3A00.000%2B01%3A00%2F3000-01-01T00%3A00%3A00.000%2B01%3A00";),
+        
"http://localhost:1234/some/path?intervals=1900-01-01T00%3A00%3A00.000%2B01%3A00%2F3000-01-01T00%3A00%3A00.000%2B01%3A00";,
         AsyncQueryForwardingServlet.makeURI(
             "http",
             "localhost:1234",
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
 b/server/src/test/java/org/apache/druid/server/JettyUtilsTest.java
similarity index 51%
copy from 
integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
copy to server/src/test/java/org/apache/druid/server/JettyUtilsTest.java
index 618084d..bd6d86f 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
+++ b/server/src/test/java/org/apache/druid/server/JettyUtilsTest.java
@@ -17,31 +17,23 @@
  * under the License.
  */
 
-package org.apache.druid.tests.indexer;
+package org.apache.druid.server;
 
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
+import org.junit.Assert;
+import org.junit.Test;
 
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITParallelIndexTest extends AbstractITBatchIndexTest
+public class JettyUtilsTest
 {
-  private static String INDEX_TASK = 
"/indexer/wikipedia_parallel_index_task.json";
-  private static String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_parallel_index_queries.json";
-  private static String INDEX_DATASOURCE = "wikipedia_parallel_index_test";
-
   @Test
-  public void testIndexData() throws Exception
+  public void testConcatenateForRewrite()
   {
-    try {
-      doIndexTestTest(
-          INDEX_DATASOURCE,
-          INDEX_TASK,
-          INDEX_QUERIES_RESOURCE
-      );
-    }
-    finally {
-      unloadAndKillData(INDEX_DATASOURCE);
-    }
+    Assert.assertEquals(
+        "http://example.com/foo%20bar?q=baz%20qux";,
+        JettyUtils.concatenateForRewrite(
+            "http://example.com";,
+            "/foo%20bar",
+            "q=baz%20qux"
+        )
+    );
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/http/OverlordProxyServletTest.java
 
b/server/src/test/java/org/apache/druid/server/http/OverlordProxyServletTest.java
index dd364b7..7e99d82 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/OverlordProxyServletTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/OverlordProxyServletTest.java
@@ -37,12 +37,14 @@ public class OverlordProxyServletTest
 
     HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class);
     
EasyMock.expect(request.getQueryString()).andReturn("param1=test&param2=test2").anyTimes();
-    
EasyMock.expect(request.getRequestURI()).andReturn("/druid/overlord/worker").anyTimes();
+
+    // %3A is a colon; test to make sure urlencoded paths work right.
+    
EasyMock.expect(request.getRequestURI()).andReturn("/druid/over%3Alord/worker").anyTimes();
 
     EasyMock.replay(druidLeaderClient, request);
 
     URI uri = URI.create(new OverlordProxyServlet(druidLeaderClient, null, 
null).rewriteTarget(request));
-    
Assert.assertEquals("https://overlord:port/druid/overlord/worker?param1=test&param2=test2";,
 uri.toString());
+    
Assert.assertEquals("https://overlord:port/druid/over%3Alord/worker?param1=test&param2=test2";,
 uri.toString());
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to