This is an automated email from the ASF dual-hosted git repository.

vogievetsky pushed a commit to branch 24.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/24.0.0 by this push:
     new c8ab8b20bb Race in Task report/log streamer (#12931) (#12980)
c8ab8b20bb is described below

commit c8ab8b20bbeeac6f880e102e35e053867e7898a5
Author: Karan Kumar <[email protected]>
AuthorDate: Fri Aug 26 20:53:43 2022 +0530

    Race in Task report/log streamer (#12931) (#12980)
    
    * Fixing RACE in HTTP remote task Runner
    
    * Changes in the interface
    
    * Updating documentation
    
    * Adding test cases to SwitchingTaskLogStreamer
    
    * Adding more tests
---
 .../org/apache/druid/tasklogs/NoopTaskLogs.java    |   4 +-
 .../org/apache/druid/tasklogs/TaskLogStreamer.java |   9 +-
 .../apache/druid/storage/aliyun/OssTaskLogs.java   |  59 +++--
 .../druid/storage/aliyun/OssTaskLogsTest.java      | 165 ++++++++++----
 .../apache/druid/storage/azure/AzureTaskLogs.java  |  60 +++---
 .../druid/storage/azure/AzureTaskLogsTest.java     |  26 +--
 .../druid/storage/google/GoogleTaskLogs.java       |  48 ++---
 .../druid/storage/google/GoogleTaskLogsTest.java   |  14 +-
 .../druid/storage/hdfs/tasklog/HdfsTaskLogs.java   |  40 ++--
 .../indexing/common/tasklogs/HdfsTaskLogsTest.java |   2 +-
 .../org/apache/druid/storage/s3/S3TaskLogs.java    |  58 +++--
 .../apache/druid/storage/s3/S3TaskLogsTest.java    | 180 ++++++++++++----
 .../indexing/common/tasklogs/FileTaskLogs.java     |  27 +--
 .../common/tasklogs/SwitchingTaskLogStreamer.java  |  77 ++++++-
 .../common/tasklogs/TaskRunnerTaskLogStreamer.java |   6 +-
 .../druid/indexing/overlord/ForkingTaskRunner.java |  15 +-
 .../druid/indexing/overlord/RemoteTaskRunner.java  |  82 +++----
 .../indexing/overlord/ThreadingTaskRunner.java     |   4 +-
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |  81 +++----
 .../indexing/overlord/http/OverlordResource.java   |  10 +-
 .../druid/indexing/worker/http/WorkerResource.java |   6 +-
 .../indexing/common/tasklogs/FileTaskLogsTest.java |   6 +-
 .../tasklogs/SwitchingTaskLogStreamerTest.java     | 238 +++++++++++++++++++++
 .../indexing/overlord/RemoteTaskRunnerTest.java    |   2 +-
 .../java/org/apache/druid/cli/CliOverlord.java     |   6 +-
 25 files changed, 804 insertions(+), 421 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java 
b/core/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
index a0dc877a30..13962d7c4d 100644
--- a/core/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
+++ b/core/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
@@ -20,17 +20,17 @@
 package org.apache.druid.tasklogs;
 
 import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
 import org.apache.druid.java.util.common.logger.Logger;
 
 import java.io.File;
+import java.io.InputStream;
 
 public class NoopTaskLogs implements TaskLogs
 {
   private final Logger log = new Logger(TaskLogs.class);
 
   @Override
-  public Optional<ByteSource> streamTaskLog(String taskid, long offset)
+  public Optional<InputStream> streamTaskLog(String taskid, long offset)
   {
     return Optional.absent();
   }
diff --git a/core/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java 
b/core/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
index e6451a6840..04add17ea5 100644
--- a/core/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
+++ b/core/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
@@ -20,10 +20,10 @@
 package org.apache.druid.tasklogs;
 
 import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
 import org.apache.druid.guice.annotations.ExtensionPoint;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 /**
  * Something that knows how to stream logs for tasks.
@@ -36,12 +36,11 @@ public interface TaskLogStreamer
    *
    * @param offset If zero, stream the entire log. If positive, attempt to 
read from this position onwards. If
    *               negative, attempt to read this many bytes from the end of 
the file (like <tt>tail -n</tt>).
-   *
-   * @return input supplier for this log, if available from this provider
+   * @return inputStream for this log, if available
    */
-  Optional<ByteSource> streamTaskLog(String taskid, long offset) throws 
IOException;
+  Optional<InputStream> streamTaskLog(String taskid, long offset) throws 
IOException;
 
-  default Optional<ByteSource> streamTaskReports(final String taskid) throws 
IOException
+  default Optional<InputStream> streamTaskReports(final String taskid) throws 
IOException
   {
     return Optional.absent();
   }
diff --git 
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
 
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
index 515d85096e..91d02c3f5a 100644
--- 
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
+++ 
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
@@ -25,7 +25,6 @@ import com.aliyun.oss.model.GetObjectRequest;
 import com.aliyun.oss.model.ObjectMetadata;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
-import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
 import org.apache.druid.java.util.common.IOE;
@@ -66,54 +65,44 @@ public class OssTaskLogs implements TaskLogs
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(final String taskid, final long 
offset) throws IOException
+  public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
     final String taskKey = getTaskLogKey(taskid, "log");
     return streamTaskFile(offset, taskKey);
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(String taskid) throws 
IOException
+  public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
   {
     final String taskKey = getTaskLogKey(taskid, "report.json");
     return streamTaskFile(0, taskKey);
   }
 
-  private Optional<ByteSource> streamTaskFile(final long offset, String 
taskKey) throws IOException
+  private Optional<InputStream> streamTaskFile(final long offset, String 
taskKey) throws IOException
   {
     try {
       final ObjectMetadata objectMetadata = 
client.getObjectMetadata(config.getBucket(), taskKey);
-
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              try {
-                final long start;
-                final long end = objectMetadata.getContentLength() - 1;
-
-                if (offset > 0 && offset < objectMetadata.getContentLength()) {
-                  start = offset;
-                } else if (offset < 0 && (-1 * offset) < 
objectMetadata.getContentLength()) {
-                  start = objectMetadata.getContentLength() + offset;
-                } else {
-                  start = 0;
-                }
-
-                final GetObjectRequest request = new 
GetObjectRequest(config.getBucket(), taskKey);
-                
request.setMatchingETagConstraints(Collections.singletonList(objectMetadata.getETag()));
-                request.setRange(start, end);
-
-                return client.getObject(request).getObjectContent();
-              }
-              catch (OSSException e) {
-                throw new IOException(e);
-              }
-            }
-          }
-      );
+      try {
+        final long start;
+        final long end = objectMetadata.getContentLength() - 1;
+
+        if (offset > 0 && offset < objectMetadata.getContentLength()) {
+          start = offset;
+        } else if (offset < 0 && (-1 * offset) < 
objectMetadata.getContentLength()) {
+          start = objectMetadata.getContentLength() + offset;
+        } else {
+          start = 0;
+        }
+
+        final GetObjectRequest request = new 
GetObjectRequest(config.getBucket(), taskKey);
+        
request.setMatchingETagConstraints(Collections.singletonList(objectMetadata.getETag()));
+        request.setRange(start, end);
+
+        return Optional.of(client.getObject(request).getObjectContent());
+      }
+      catch (OSSException e) {
+        throw new IOException(e);
+      }
     }
     catch (OSSException e) {
       if ("NoSuchKey".equals(e.getErrorCode())
diff --git 
a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
 
b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
index 1264a0fe9d..16b09866ec 100644
--- 
a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
+++ 
b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
@@ -23,11 +23,15 @@ import com.aliyun.oss.ClientException;
 import com.aliyun.oss.OSS;
 import com.aliyun.oss.model.AccessControlList;
 import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.GetObjectRequest;
 import com.aliyun.oss.model.Grant;
+import com.aliyun.oss.model.OSSObject;
 import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectMetadata;
 import com.aliyun.oss.model.Owner;
 import com.aliyun.oss.model.PutObjectRequest;
 import com.aliyun.oss.model.PutObjectResult;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@@ -42,12 +46,19 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
+import javax.annotation.Nonnull;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @RunWith(EasyMockRunner.class)
 public class OssTaskLogsTest extends EasyMockSupport
@@ -65,6 +76,9 @@ public class OssTaskLogsTest extends EasyMockSupport
   private static final int MAX_KEYS = 1;
   private static final Exception RECOVERABLE_EXCEPTION = new 
ClientException(new IOException());
   private static final Exception NON_RECOVERABLE_EXCEPTION = new 
ClientException(new NullPointerException());
+  private static final String LOG_CONTENTS = "log_contents";
+  private static final String REPORT_CONTENTS = "report_contents";
+
 
   @Mock
   private CurrentTimeMillisSupplier timeSupplier;
@@ -113,12 +127,7 @@ public class OssTaskLogsTest extends EasyMockSupport
 
     EasyMock.replay(ossClient, timeSupplier);
 
-    OssTaskLogsConfig config = new OssTaskLogsConfig();
-    config.setBucket(TEST_BUCKET);
-    config.setPrefix(TEST_PREFIX);
-    OssInputDataConfig inputDataConfig = new OssInputDataConfig();
-    inputDataConfig.setMaxListingLength(MAX_KEYS);
-    OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, 
timeSupplier);
+    OssTaskLogs taskLogs = getOssTaskLogs();
     taskLogs.killAll();
 
     EasyMock.verify(ossClient, timeSupplier);
@@ -147,12 +156,7 @@ public class OssTaskLogsTest extends EasyMockSupport
 
     EasyMock.replay(ossClient, timeSupplier);
 
-    OssTaskLogsConfig config = new OssTaskLogsConfig();
-    config.setBucket(TEST_BUCKET);
-    config.setPrefix(TEST_PREFIX);
-    OssInputDataConfig inputDataConfig = new OssInputDataConfig();
-    inputDataConfig.setMaxListingLength(MAX_KEYS);
-    OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, 
timeSupplier);
+    OssTaskLogs taskLogs = getOssTaskLogs();
     taskLogs.killAll();
 
     EasyMock.verify(ossClient, timeSupplier);
@@ -181,12 +185,7 @@ public class OssTaskLogsTest extends EasyMockSupport
 
       EasyMock.replay(ossClient, timeSupplier);
 
-      OssTaskLogsConfig config = new OssTaskLogsConfig();
-      config.setBucket(TEST_BUCKET);
-      config.setPrefix(TEST_PREFIX);
-      OssInputDataConfig inputDataConfig = new OssInputDataConfig();
-      inputDataConfig.setMaxListingLength(MAX_KEYS);
-      OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, 
inputDataConfig, timeSupplier);
+      OssTaskLogs taskLogs = getOssTaskLogs();
       taskLogs.killAll();
     }
     catch (IOException e) {
@@ -217,12 +216,7 @@ public class OssTaskLogsTest extends EasyMockSupport
 
     EasyMock.replay(ossClient, timeSupplier);
 
-    OssTaskLogsConfig config = new OssTaskLogsConfig();
-    config.setBucket(TEST_BUCKET);
-    config.setPrefix(TEST_PREFIX);
-    OssInputDataConfig inputDataConfig = new OssInputDataConfig();
-    inputDataConfig.setMaxListingLength(MAX_KEYS);
-    OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, 
timeSupplier);
+    OssTaskLogs taskLogs = getOssTaskLogs();
     taskLogs.killOlderThan(TIME_NOW);
 
     EasyMock.verify(ossClient, timeSupplier);
@@ -250,12 +244,7 @@ public class OssTaskLogsTest extends EasyMockSupport
 
     EasyMock.replay(ossClient, timeSupplier);
 
-    OssTaskLogsConfig config = new OssTaskLogsConfig();
-    config.setBucket(TEST_BUCKET);
-    config.setPrefix(TEST_PREFIX);
-    OssInputDataConfig inputDataConfig = new OssInputDataConfig();
-    inputDataConfig.setMaxListingLength(MAX_KEYS);
-    OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, 
timeSupplier);
+    OssTaskLogs taskLogs = getOssTaskLogs();
     taskLogs.killOlderThan(TIME_NOW);
 
     EasyMock.verify(ossClient, timeSupplier);
@@ -283,12 +272,7 @@ public class OssTaskLogsTest extends EasyMockSupport
 
       EasyMock.replay(ossClient, timeSupplier);
 
-      OssTaskLogsConfig config = new OssTaskLogsConfig();
-      config.setBucket(TEST_BUCKET);
-      config.setPrefix(TEST_PREFIX);
-      OssInputDataConfig inputDataConfig = new OssInputDataConfig();
-      inputDataConfig.setMaxListingLength(MAX_KEYS);
-      OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, 
inputDataConfig, timeSupplier);
+      OssTaskLogs taskLogs = getOssTaskLogs();
       taskLogs.killOlderThan(TIME_NOW);
     }
     catch (IOException e) {
@@ -300,6 +284,115 @@ public class OssTaskLogsTest extends EasyMockSupport
     EasyMock.verify(ossClient, timeSupplier);
   }
 
+  @Test
+  public void test_taskLog_fetch() throws IOException
+  {
+    EasyMock.reset(ossClient);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(LOG_CONTENTS.length());
+    EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+
+    OSSObject ossObject = new OSSObject();
+    ossObject.setObjectContent(new 
ByteArrayInputStream(LOG_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+    
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
+    EasyMock.replay(ossClient);
+
+    OssTaskLogs ossTaskLogs = getOssTaskLogs();
+    Optional<InputStream> inputStreamOptional = 
ossTaskLogs.streamTaskLog(KEY_1, 0);
+    String taskLogs = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(LOG_CONTENTS, taskLogs);
+  }
+
+  @Test
+  public void test_taskLog_fetch_withRange() throws IOException
+  {
+    EasyMock.reset(ossClient);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(LOG_CONTENTS.length());
+    EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+
+    OSSObject ossObject = new OSSObject();
+    ossObject.setObjectContent(new 
ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
+    
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
+    EasyMock.replay(ossClient);
+
+    OssTaskLogs ossTaskLogs = getOssTaskLogs();
+    Optional<InputStream> inputStreamOptional = 
ossTaskLogs.streamTaskLog(KEY_1, 1);
+    String taskLogs = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
+  }
+
+  @Test
+  public void test_taskLog_fetch_withNegativeRange() throws IOException
+  {
+    EasyMock.reset(ossClient);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(LOG_CONTENTS.length());
+    EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+
+    OSSObject ossObject = new OSSObject();
+    ossObject.setObjectContent(new 
ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
+    
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
+    EasyMock.replay(ossClient);
+
+    OssTaskLogs ossTaskLogs = getOssTaskLogs();
+    Optional<InputStream> inputStreamOptional = 
ossTaskLogs.streamTaskLog(KEY_1, -1 * (LOG_CONTENTS.length() - 1));
+    String taskLogs = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
+  }
+
+
+  @Test
+  public void test_taskReport_fetch() throws IOException
+  {
+    EasyMock.reset(ossClient);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/report.json";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(REPORT_CONTENTS.length());
+    EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+
+    OSSObject ossObject = new OSSObject();
+    ossObject.setObjectContent(new 
ByteArrayInputStream(REPORT_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+    
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
+    EasyMock.replay(ossClient);
+
+    OssTaskLogs ossTaskLogs = getOssTaskLogs();
+    Optional<InputStream> inputStreamOptional = 
ossTaskLogs.streamTaskReports(KEY_1);
+    String report = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(REPORT_CONTENTS, report);
+  }
+
+  @Nonnull
+  private OssTaskLogs getOssTaskLogs()
+  {
+    OssTaskLogsConfig config = new OssTaskLogsConfig();
+    config.setBucket(TEST_BUCKET);
+    config.setPrefix(TEST_PREFIX);
+    OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+    inputDataConfig.setMaxListingLength(MAX_KEYS);
+    OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, 
timeSupplier);
+    return taskLogs;
+  }
+
   private List<Grant> testPushInternal(boolean disableAcl, String ownerId, 
String ownerDisplayName) throws Exception
   {
     EasyMock.expect(ossClient.putObject(EasyMock.anyObject()))
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 46e081eea3..678a43e5db 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -20,7 +20,6 @@
 package org.apache.druid.storage.azure;
 
 import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
 import com.microsoft.azure.storage.StorageException;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@@ -100,56 +99,45 @@ public class AzureTaskLogs implements TaskLogs
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(final String taskid, final long 
offset) throws IOException
+  public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
     return streamTaskFile(taskid, offset, getTaskLogKey(taskid));
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(String taskid) throws 
IOException
+  public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
   {
     return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
   }
 
-  private Optional<ByteSource> streamTaskFile(final String taskid, final long 
offset, String taskKey) throws IOException
+  private Optional<InputStream> streamTaskFile(final String taskid, final long 
offset, String taskKey)
+      throws IOException
   {
     final String container = config.getContainer();
-
     try {
       if (!azureStorage.getBlobExists(container, taskKey)) {
         return Optional.absent();
       }
-
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              try {
-                final long start;
-                final long length = azureStorage.getBlobLength(container, 
taskKey);
-
-                if (offset > 0 && offset < length) {
-                  start = offset;
-                } else if (offset < 0 && (-1 * offset) < length) {
-                  start = length + offset;
-                } else {
-                  start = 0;
-                }
-
-                InputStream stream = 
azureStorage.getBlobInputStream(container, taskKey);
-                stream.skip(start);
-
-                return stream;
-
-              }
-              catch (Exception e) {
-                throw new IOException(e);
-              }
-            }
-          }
-      );
+      try {
+        final long start;
+        final long length = azureStorage.getBlobLength(container, taskKey);
+
+        if (offset > 0 && offset < length) {
+          start = offset;
+        } else if (offset < 0 && (-1 * offset) < length) {
+          start = length + offset;
+        } else {
+          start = 0;
+        }
+
+        InputStream stream = azureStorage.getBlobInputStream(container, 
taskKey);
+        stream.skip(start);
+
+        return Optional.of(stream);
+      }
+      catch (Exception e) {
+        throw new IOException(e);
+      }
     }
     catch (StorageException | URISyntaxException e) {
       throw new IOE(e, "Failed to stream logs from: %s", taskKey);
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 519a6e923f..5313c1d5f5 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.storage.azure;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.io.ByteSource;
 import com.microsoft.azure.storage.StorageException;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@@ -39,6 +38,7 @@ import org.junit.Test;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.StringWriter;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -191,10 +191,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
azureTaskLogs.streamTaskLog(TASK_ID, 0);
+    final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID, 
0);
 
     final StringWriter writer = new StringWriter();
-    IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+    IOUtils.copy(stream.get(), writer, "UTF-8");
     Assert.assertEquals(writer.toString(), testLog);
 
     verifyAll();
@@ -214,10 +214,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
azureTaskLogs.streamTaskLog(TASK_ID, 5);
+    final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID, 
5);
 
     final StringWriter writer = new StringWriter();
-    IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+    IOUtils.copy(stream.get(), writer, "UTF-8");
     Assert.assertEquals(writer.toString(), testLog.substring(5));
 
     verifyAll();
@@ -237,10 +237,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
azureTaskLogs.streamTaskLog(TASK_ID, -3);
+    final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID, 
-3);
 
     final StringWriter writer = new StringWriter();
-    IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+    IOUtils.copy(stream.get(), writer, "UTF-8");
     Assert.assertEquals(writer.toString(), testLog.substring(testLog.length() 
- 3));
 
     verifyAll();
@@ -260,10 +260,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
azureTaskLogs.streamTaskReports(TASK_ID);
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskReports(TASK_ID);
 
     final StringWriter writer = new StringWriter();
-    IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+    IOUtils.copy(stream.get(), writer, "UTF-8");
     Assert.assertEquals(writer.toString(), testLog);
 
     verifyAll();
@@ -279,10 +279,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
azureTaskLogs.streamTaskReports(TASK_ID_NOT_FOUND);
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskReports(TASK_ID_NOT_FOUND);
 
 
-    Assert.assertFalse(byteSource.isPresent());
+    Assert.assertFalse(stream.isPresent());
 
     verifyAll();
   }
@@ -301,10 +301,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
azureTaskLogs.streamTaskReports(TASK_ID);
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskReports(TASK_ID);
 
     final StringWriter writer = new StringWriter();
-    IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+    IOUtils.copy(stream.get(), writer, "UTF-8");
     verifyAll();
   }
 
diff --git 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
index d6d9ff569b..fcdee4039b 100644
--- 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
+++ 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
@@ -21,7 +21,6 @@ package org.apache.druid.storage.google;
 
 import com.google.api.client.http.InputStreamContent;
 import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
 import org.apache.druid.java.util.common.IOE;
@@ -103,20 +102,21 @@ public class GoogleTaskLogs implements TaskLogs
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(final String taskid, final long 
offset) throws IOException
+  public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
     final String taskKey = getTaskLogKey(taskid);
     return streamTaskFile(taskid, offset, taskKey);
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(String taskid) throws 
IOException
+  public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
   {
     final String taskKey = getTaskReportKey(taskid);
     return streamTaskFile(taskid, 0, taskKey);
   }
 
-  private Optional<ByteSource> streamTaskFile(final String taskid, final long 
offset, String taskKey) throws IOException
+  private Optional<InputStream> streamTaskFile(final String taskid, final long 
offset, String taskKey)
+      throws IOException
   {
     try {
       if (!storage.exists(config.getBucket(), taskKey)) {
@@ -124,32 +124,22 @@ public class GoogleTaskLogs implements TaskLogs
       }
 
       final long length = storage.size(config.getBucket(), taskKey);
+      try {
+        final long start;
 
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              try {
-                final long start;
-
-                if (offset > 0 && offset < length) {
-                  start = offset;
-                } else if (offset < 0 && (-1 * offset) < length) {
-                  start = length + offset;
-                } else {
-                  start = 0;
-                }
-
-                return new GoogleByteSource(storage, config.getBucket(), 
taskKey).openStream(start);
-              }
-              catch (Exception e) {
-                throw new IOException(e);
-              }
-            }
-          }
-      );
+        if (offset > 0 && offset < length) {
+          start = offset;
+        } else if (offset < 0 && (-1 * offset) < length) {
+          start = length + offset;
+        } else {
+          start = 0;
+        }
+
+        return Optional.of(new GoogleByteSource(storage, config.getBucket(), 
taskKey).openStream(start));
+      }
+      catch (Exception e) {
+        throw new IOException(e);
+      }
     }
     catch (IOException e) {
       throw new IOE(e, "Failed to stream logs from: %s", taskKey);
diff --git 
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
 
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
index c5807723b6..d8b7c61cfc 100644
--- 
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
+++ 
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
@@ -27,7 +27,6 @@ import com.google.api.services.storage.model.StorageObject;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.io.ByteSource;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
 import org.apache.druid.java.util.common.FileUtils;
@@ -42,6 +41,7 @@ import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.StringWriter;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
@@ -121,10 +121,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
googleTaskLogs.streamTaskLog(TASKID, 0);
+    final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID, 
0);
 
     final StringWriter writer = new StringWriter();
-    IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+    IOUtils.copy(stream.get(), writer, "UTF-8");
     Assert.assertEquals(writer.toString(), testLog);
 
     verifyAll();
@@ -144,10 +144,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
googleTaskLogs.streamTaskLog(TASKID, offset);
+    final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID, 
offset);
 
     final StringWriter writer = new StringWriter();
-    IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+    IOUtils.copy(stream.get(), writer, "UTF-8");
     Assert.assertEquals(writer.toString(), expectedLog);
 
     verifyAll();
@@ -168,10 +168,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
 
     replayAll();
 
-    final Optional<ByteSource> byteSource = 
googleTaskLogs.streamTaskLog(TASKID, offset);
+    final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID, 
offset);
 
     final StringWriter writer = new StringWriter();
-    IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+    IOUtils.copy(stream.get(), writer, "UTF-8");
     Assert.assertEquals(writer.toString(), expectedLog);
 
     verifyAll();
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
index 7289901823..6026a3e3cb 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
@@ -20,7 +20,6 @@
 package org.apache.druid.storage.hdfs.tasklog;
 
 import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
 import com.google.common.io.ByteStreams;
 import com.google.inject.Inject;
 import org.apache.druid.guice.Hdfs;
@@ -88,44 +87,35 @@ public class HdfsTaskLogs implements TaskLogs
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(final String taskId, final long 
offset) throws IOException
+  public Optional<InputStream> streamTaskLog(final String taskId, final long 
offset) throws IOException
   {
     final Path path = getTaskLogFileFromId(taskId);
     return streamTaskFile(path, offset);
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(String taskId) throws 
IOException
+  public Optional<InputStream> streamTaskReports(String taskId) throws 
IOException
   {
     final Path path = getTaskReportsFileFromId(taskId);
     return streamTaskFile(path, 0);
   }
 
-  private Optional<ByteSource> streamTaskFile(final Path path, final long 
offset) throws IOException
+  private Optional<InputStream> streamTaskFile(final Path path, final long 
offset) throws IOException
   {
     final FileSystem fs = path.getFileSystem(hadoopConfig);
     if (fs.exists(path)) {
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              log.info("Reading task log from: %s", path);
-              final long seekPos;
-              if (offset < 0) {
-                final FileStatus stat = fs.getFileStatus(path);
-                seekPos = Math.max(0, stat.getLen() + offset);
-              } else {
-                seekPos = offset;
-              }
-              final FSDataInputStream inputStream = fs.open(path);
-              inputStream.seek(seekPos);
-              log.info("Read task log from: %s (seek = %,d)", path, seekPos);
-              return inputStream;
-            }
-          }
-      );
+      log.info("Reading task log from: %s", path);
+      final long seekPos;
+      if (offset < 0) {
+        final FileStatus stat = fs.getFileStatus(path);
+        seekPos = Math.max(0, stat.getLen() + offset);
+      } else {
+        seekPos = offset;
+      }
+      final FSDataInputStream inputStream = fs.open(path);
+      inputStream.seek(seekPos);
+      log.info("Read task log from: %s (seek = %,d)", path, seekPos);
+      return Optional.of(inputStream);
     } else {
       return Optional.absent();
     }
diff --git 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
index 49de4594be..9724cba694 100644
--- 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
+++ 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
@@ -115,6 +115,6 @@ public class HdfsTaskLogsTest
 
   private String readLog(TaskLogs taskLogs, String logFile, long offset) 
throws IOException
   {
-    return 
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, 
offset).get().openStream()));
+    return 
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, 
offset).get()));
   }
 }
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
index 838b47174a..9319e8a5fc 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
@@ -25,7 +25,6 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
-import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
 import org.apache.druid.java.util.common.IOE;
@@ -65,54 +64,45 @@ public class S3TaskLogs implements TaskLogs
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(final String taskid, final long 
offset) throws IOException
+  public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
     final String taskKey = getTaskLogKey(taskid, "log");
     return streamTaskFile(offset, taskKey);
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(String taskid) throws 
IOException
+  public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
   {
     final String taskKey = getTaskLogKey(taskid, "report.json");
     return streamTaskFile(0, taskKey);
   }
 
-  private Optional<ByteSource> streamTaskFile(final long offset, String 
taskKey) throws IOException
+  private Optional<InputStream> streamTaskFile(final long offset, String 
taskKey) throws IOException
   {
     try {
       final ObjectMetadata objectMetadata = 
service.getObjectMetadata(config.getS3Bucket(), taskKey);
 
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              try {
-                final long start;
-                final long end = objectMetadata.getContentLength() - 1;
-
-                if (offset > 0 && offset < objectMetadata.getContentLength()) {
-                  start = offset;
-                } else if (offset < 0 && (-1 * offset) < 
objectMetadata.getContentLength()) {
-                  start = objectMetadata.getContentLength() + offset;
-                } else {
-                  start = 0;
-                }
-
-                final GetObjectRequest request = new 
GetObjectRequest(config.getS3Bucket(), taskKey)
-                    .withMatchingETagConstraint(objectMetadata.getETag())
-                    .withRange(start, end);
-
-                return service.getObject(request).getObjectContent();
-              }
-              catch (AmazonServiceException e) {
-                throw new IOException(e);
-              }
-            }
-          }
-      );
+      try {
+        final long start;
+        final long end = objectMetadata.getContentLength() - 1;
+
+        if (offset > 0 && offset < objectMetadata.getContentLength()) {
+          start = offset;
+        } else if (offset < 0 && (-1 * offset) < 
objectMetadata.getContentLength()) {
+          start = objectMetadata.getContentLength() + offset;
+        } else {
+          start = 0;
+        }
+
+        final GetObjectRequest request = new 
GetObjectRequest(config.getS3Bucket(), taskKey)
+            .withMatchingETagConstraint(objectMetadata.getETag())
+            .withRange(start, end);
+
+        return Optional.of(service.getObject(request).getObjectContent());
+      }
+      catch (AmazonServiceException e) {
+        throw new IOException(e);
+      }
     }
     catch (AmazonS3Exception e) {
       if (404 == e.getStatusCode()
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
index 502897f010..3b28ac07e6 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
@@ -22,12 +22,16 @@ package org.apache.druid.storage.s3;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.services.s3.model.AccessControlList;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.Grant;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.Owner;
 import com.amazonaws.services.s3.model.Permission;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@@ -42,10 +46,17 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
+import javax.annotation.Nonnull;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @RunWith(EasyMockRunner.class)
 public class S3TaskLogsTest extends EasyMockSupport
@@ -63,6 +74,8 @@ public class S3TaskLogsTest extends EasyMockSupport
   private static final int MAX_KEYS = 1;
   private static final Exception RECOVERABLE_EXCEPTION = new 
SdkClientException(new IOException());
   private static final Exception NON_RECOVERABLE_EXCEPTION = new 
SdkClientException(new NullPointerException());
+  private static final String LOG_CONTENTS = "log_contents";
+  private static final String REPORT_CONTENTS = "report_contents";
 
   @Mock
   private CurrentTimeMillisSupplier timeSupplier;
@@ -136,12 +149,7 @@ public class S3TaskLogsTest extends EasyMockSupport
 
     EasyMock.replay(s3Client, timeSupplier);
 
-    S3TaskLogsConfig config = new S3TaskLogsConfig();
-    config.setS3Bucket(TEST_BUCKET);
-    config.setS3Prefix(TEST_PREFIX);
-    S3InputDataConfig inputDataConfig = new S3InputDataConfig();
-    inputDataConfig.setMaxListingLength(MAX_KEYS);
-    S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, 
timeSupplier);
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
     s3TaskLogs.killAll();
 
     EasyMock.verify(s3Client, timeSupplier);
@@ -174,12 +182,7 @@ public class S3TaskLogsTest extends EasyMockSupport
 
     EasyMock.replay(s3Client, timeSupplier);
 
-    S3TaskLogsConfig config = new S3TaskLogsConfig();
-    config.setS3Bucket(TEST_BUCKET);
-    config.setS3Prefix(TEST_PREFIX);
-    S3InputDataConfig inputDataConfig = new S3InputDataConfig();
-    inputDataConfig.setMaxListingLength(MAX_KEYS);
-    S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, 
timeSupplier);
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
     s3TaskLogs.killAll();
 
     EasyMock.verify(s3Client, timeSupplier);
@@ -211,12 +214,7 @@ public class S3TaskLogsTest extends EasyMockSupport
 
       EasyMock.replay(s3Client, timeSupplier);
 
-      S3TaskLogsConfig config = new S3TaskLogsConfig();
-      config.setS3Bucket(TEST_BUCKET);
-      config.setS3Prefix(TEST_PREFIX);
-      S3InputDataConfig inputDataConfig = new S3InputDataConfig();
-      inputDataConfig.setMaxListingLength(MAX_KEYS);
-      S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, 
inputDataConfig, timeSupplier);
+      S3TaskLogs s3TaskLogs = getS3TaskLogs();
       s3TaskLogs.killAll();
     }
     catch (IOException e) {
@@ -250,12 +248,7 @@ public class S3TaskLogsTest extends EasyMockSupport
 
     EasyMock.replay(s3Client, timeSupplier);
 
-    S3TaskLogsConfig config = new S3TaskLogsConfig();
-    config.setS3Bucket(TEST_BUCKET);
-    config.setS3Prefix(TEST_PREFIX);
-    S3InputDataConfig inputDataConfig = new S3InputDataConfig();
-    inputDataConfig.setMaxListingLength(MAX_KEYS);
-    S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, 
timeSupplier);
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
     s3TaskLogs.killOlderThan(TIME_NOW);
 
     EasyMock.verify(s3Client, timeSupplier);
@@ -286,12 +279,7 @@ public class S3TaskLogsTest extends EasyMockSupport
 
     EasyMock.replay(s3Client, timeSupplier);
 
-    S3TaskLogsConfig config = new S3TaskLogsConfig();
-    config.setS3Bucket(TEST_BUCKET);
-    config.setS3Prefix(TEST_PREFIX);
-    S3InputDataConfig inputDataConfig = new S3InputDataConfig();
-    inputDataConfig.setMaxListingLength(MAX_KEYS);
-    S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, 
timeSupplier);
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
     s3TaskLogs.killOlderThan(TIME_NOW);
 
     EasyMock.verify(s3Client, timeSupplier);
@@ -322,12 +310,7 @@ public class S3TaskLogsTest extends EasyMockSupport
 
       EasyMock.replay(s3Client, timeSupplier);
 
-      S3TaskLogsConfig config = new S3TaskLogsConfig();
-      config.setS3Bucket(TEST_BUCKET);
-      config.setS3Prefix(TEST_PREFIX);
-      S3InputDataConfig inputDataConfig = new S3InputDataConfig();
-      inputDataConfig.setMaxListingLength(MAX_KEYS);
-      S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, 
inputDataConfig, timeSupplier);
+      S3TaskLogs s3TaskLogs = getS3TaskLogs();
       s3TaskLogs.killOlderThan(TIME_NOW);
     }
     catch (IOException e) {
@@ -339,6 +322,131 @@ public class S3TaskLogsTest extends EasyMockSupport
     EasyMock.verify(s3Client, timeSupplier);
   }
 
+  @Test
+  public void test_taskLog_fetch() throws IOException
+  {
+    EasyMock.reset(s3Client);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(LOG_CONTENTS.length());
+    EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+
+    S3Object s3Object = new S3Object();
+    s3Object.setObjectContent(new 
ByteArrayInputStream(LOG_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+    GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, 
logPath);
+    getObjectRequest.setRange(0, LOG_CONTENTS.length() - 1);
+    getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+    EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+    EasyMock.replay(s3Client);
+
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+    Optional<InputStream> inputStreamOptional = 
s3TaskLogs.streamTaskLog(KEY_1, 0);
+    String taskLogs = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(LOG_CONTENTS, taskLogs);
+  }
+
+  @Test
+  public void test_taskLog_fetch_withRange() throws IOException
+  {
+    EasyMock.reset(s3Client);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(LOG_CONTENTS.length());
+    EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+
+    S3Object s3Object = new S3Object();
+    s3Object.setObjectContent(new 
ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
+    GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, 
logPath);
+    getObjectRequest.setRange(1, LOG_CONTENTS.length() - 1);
+    getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+    EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+    EasyMock.replay(s3Client);
+
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+    Optional<InputStream> inputStreamOptional = 
s3TaskLogs.streamTaskLog(KEY_1, 1);
+    String taskLogs = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
+  }
+
+  @Test
+  public void test_taskLog_fetch_withNegativeRange() throws IOException
+  {
+    EasyMock.reset(s3Client);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(LOG_CONTENTS.length());
+    EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+
+    S3Object s3Object = new S3Object();
+    s3Object.setObjectContent(new 
ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
+    GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, 
logPath);
+    getObjectRequest.setRange(1, LOG_CONTENTS.length() - 1);
+    getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+    EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+    EasyMock.replay(s3Client);
+
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+    Optional<InputStream> inputStreamOptional = 
s3TaskLogs.streamTaskLog(KEY_1, -1 * (LOG_CONTENTS.length() - 1));
+    String taskLogs = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
+  }
+
+
+  @Test
+  public void test_report_fetch() throws IOException
+  {
+    EasyMock.reset(s3Client);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/report.json";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(REPORT_CONTENTS.length());
+    EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+    S3Object s3Object = new S3Object();
+    s3Object.setObjectContent(new 
ByteArrayInputStream(REPORT_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+    GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, 
logPath);
+    getObjectRequest.setRange(0, REPORT_CONTENTS.length() - 1);
+    getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+    EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+    EasyMock.replay(s3Client);
+
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+    Optional<InputStream> inputStreamOptional = 
s3TaskLogs.streamTaskReports(KEY_1);
+    String report = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(REPORT_CONTENTS, report);
+  }
+
+
+  @Nonnull
+  private S3TaskLogs getS3TaskLogs()
+  {
+    S3TaskLogsConfig config = new S3TaskLogsConfig();
+    config.setS3Bucket(TEST_BUCKET);
+    config.setS3Prefix(TEST_PREFIX);
+    S3InputDataConfig inputDataConfig = new S3InputDataConfig();
+    inputDataConfig.setMaxListingLength(MAX_KEYS);
+    S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, 
timeSupplier);
+    return s3TaskLogs;
+  }
+
   private List<Grant> testPushInternal(boolean disableAcl, String ownerId, 
String ownerDisplayName) throws Exception
   {
     EasyMock.expect(s3Client.putObject(EasyMock.anyObject()))
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
index 857d4c65bf..5d546da494 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing.common.tasklogs;
 
 import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
 import com.google.common.io.Files;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
@@ -67,40 +66,22 @@ public class FileTaskLogs implements TaskLogs
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(final String taskid, final long 
offset)
+  public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
     final File file = fileForTask(taskid, "log");
     if (file.exists()) {
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              return LogUtils.streamFile(file, offset);
-            }
-          }
-      );
+      return Optional.of(LogUtils.streamFile(file, offset));
     } else {
       return Optional.absent();
     }
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(final String taskid)
+  public Optional<InputStream> streamTaskReports(final String taskid) throws 
IOException
   {
     final File file = fileForTask(taskid, "report.json");
     if (file.exists()) {
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              return LogUtils.streamFile(file, 0);
-            }
-          }
-      );
+      return Optional.of(LogUtils.streamFile(file, 0));
     } else {
       return Optional.absent();
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java
index 78806d6eef..098b72854a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java
@@ -21,11 +21,12 @@ package org.apache.druid.indexing.common.tasklogs;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
+import com.google.inject.name.Named;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 
 /**
@@ -33,37 +34,93 @@ import java.util.List;
  */
 public class SwitchingTaskLogStreamer implements TaskLogStreamer
 {
-  private final List<TaskLogStreamer> providers;
+  private final TaskLogStreamer taskRunnerTaskLogStreamer;
+  private final List<TaskLogStreamer> deepStorageStreamers;
 
   @Inject
-  public SwitchingTaskLogStreamer(List<TaskLogStreamer> providers)
+  public SwitchingTaskLogStreamer(
+      @Named("taskstreamer") TaskLogStreamer taskRunnerTaskLogStreamer,
+      List<TaskLogStreamer> deepStorageStreamer
+  )
   {
-    this.providers = ImmutableList.copyOf(providers);
+    this.taskRunnerTaskLogStreamer = taskRunnerTaskLogStreamer;
+    this.deepStorageStreamers = ImmutableList.copyOf(deepStorageStreamer);
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws 
IOException
+  public Optional<InputStream> streamTaskLog(String taskid, long offset) 
throws IOException
   {
-    for (TaskLogStreamer provider : providers) {
-      final Optional<ByteSource> stream = provider.streamTaskLog(taskid, 
offset);
+    IOException deferIOException = null;
+    try {
+      final Optional<InputStream> stream = 
taskRunnerTaskLogStreamer.streamTaskLog(taskid, offset);
       if (stream.isPresent()) {
         return stream;
       }
     }
+    catch (IOException e) {
+      // defer first IO exception due to race in the way tasks update their 
exit status in the overlord
+      // It may happen that the task sent the log to deep storage but is still 
running with http chat handlers unregistered
+      // In such a case, catch and ignore the 1st IOException and try 
deepStorage for the log. If the log is still not found, return the caught 
exception
+      deferIOException = e;
+    }
 
+    for (TaskLogStreamer provider : deepStorageStreamers) {
+      try {
+        final Optional<InputStream> stream = provider.streamTaskLog(taskid, 
offset);
+        if (stream.isPresent()) {
+          return stream;
+        }
+      }
+      catch (IOException e) {
+        if (deferIOException != null) {
+          e.addSuppressed(deferIOException);
+        }
+        throw e;
+      }
+    }
+    // Could not find any InputStream. Throw deferred exception if exists
+    if (deferIOException != null) {
+      throw deferIOException;
+    }
     return Optional.absent();
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(String taskid) throws 
IOException
+  public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
   {
-    for (TaskLogStreamer provider : providers) {
-      final Optional<ByteSource> stream = provider.streamTaskReports(taskid);
+    IOException deferIOException = null;
+
+    try {
+      final Optional<InputStream> stream = 
taskRunnerTaskLogStreamer.streamTaskReports(taskid);
       if (stream.isPresent()) {
         return stream;
       }
     }
+    catch (IOException e) {
+      // defer first IO exception due to race in the way tasks update their 
exit status in the overlord
+      // It may happen that the task sent the report to deep storage but the 
task is still running with http chat handlers unregistered
+      // In such a case, catch and ignore the 1st IOException and try 
deepStorage for the report. If the report is still not found, return the caught 
exception
+      deferIOException = e;
+    }
 
+    for (TaskLogStreamer provider : deepStorageStreamers) {
+      try {
+        final Optional<InputStream> stream = 
provider.streamTaskReports(taskid);
+        if (stream.isPresent()) {
+          return stream;
+        }
+      }
+      catch (IOException e) {
+        if (deferIOException != null) {
+          e.addSuppressed(deferIOException);
+        }
+        throw e;
+      }
+    }
+    // Could not find any InputStream. Throw deferred exception if exists
+    if (deferIOException != null) {
+      throw deferIOException;
+    }
     return Optional.absent();
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java
index 89b5d648cb..206dff097e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java
@@ -20,13 +20,13 @@
 package org.apache.druid.indexing.common.tasklogs;
 
 import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 /**
 */
@@ -41,7 +41,7 @@ public class TaskRunnerTaskLogStreamer implements 
TaskLogStreamer
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws 
IOException
+  public Optional<InputStream> streamTaskLog(String taskid, long offset) 
throws IOException
   {
     final TaskRunner runner = taskMaster.getTaskRunner().orNull();
     if (runner instanceof TaskLogStreamer) {
@@ -52,7 +52,7 @@ public class TaskRunnerTaskLogStreamer implements 
TaskLogStreamer
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(String taskId) throws 
IOException
+  public Optional<InputStream> streamTaskReports(String taskId) throws 
IOException
   {
     final TaskRunner runner = taskMaster.getTaskRunner().orNull();
     if (runner instanceof TaskLogStreamer) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index f07a859e77..98ef0739d2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -33,7 +33,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteSink;
-import com.google.common.io.ByteSource;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.FileWriteMode;
 import com.google.common.io.Files;
@@ -652,7 +651,7 @@ public class ForkingTaskRunner
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(final String taskid, final long 
offset)
+  public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
     final ProcessHolder processHolder;
 
@@ -664,17 +663,7 @@ public class ForkingTaskRunner
         return Optional.absent();
       }
     }
-
-    return Optional.of(
-        new ByteSource()
-        {
-          @Override
-          public InputStream openStream() throws IOException
-          {
-            return LogUtils.streamFile(processHolder.logFile, offset);
-          }
-        }
-    );
+    return Optional.of(LogUtils.streamFile(processHolder.logFile, offset));
   }
 
   /**
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 03654b86a1..42ff3c2467 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -34,7 +34,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -611,7 +610,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(final String taskId, final long 
offset)
+  public Optional<InputStream> streamTaskLog(final String taskId, final long 
offset) throws IOException
   {
     final ZkWorker zkWorker = findWorkerRunningTask(taskId);
 
@@ -626,34 +625,26 @@ public class RemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
           taskId,
           Long.toString(offset)
       );
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              try {
-                return httpClient.go(
-                    new Request(HttpMethod.GET, url),
-                    new InputStreamResponseHandler()
-                ).get();
-              }
-              catch (InterruptedException e) {
-                throw new RuntimeException(e);
-              }
-              catch (ExecutionException e) {
-                // Unwrap if possible
-                Throwables.propagateIfPossible(e.getCause(), 
IOException.class);
-                throw new RuntimeException(e);
-              }
-            }
-          }
-      );
+      try {
+        return Optional.of(httpClient.go(
+            new Request(HttpMethod.GET, url),
+            new InputStreamResponseHandler()
+        ).get());
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      catch (ExecutionException e) {
+        // Unwrap if possible
+        Throwables.propagateIfPossible(e.getCause(), IOException.class);
+        throw new RuntimeException(e);
+      }
     }
   }
 
+
   @Override
-  public Optional<ByteSource> streamTaskReports(final String taskId)
+  public Optional<InputStream> streamTaskReports(final String taskId) throws 
IOException
   {
     final ZkWorker zkWorker = findWorkerRunningTask(taskId);
 
@@ -681,31 +672,24 @@ public class RemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
         "/druid/worker/v1/chat/%s/liveReports",
         taskId
     );
-    return Optional.of(
-        new ByteSource()
-        {
-          @Override
-          public InputStream openStream() throws IOException
-          {
-            try {
-              return httpClient.go(
-                  new Request(HttpMethod.GET, url),
-                  new InputStreamResponseHandler()
-              ).get();
-            }
-            catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-            catch (ExecutionException e) {
-              // Unwrap if possible
-              Throwables.propagateIfPossible(e.getCause(), IOException.class);
-              throw new RuntimeException(e);
-            }
-          }
-        }
-    );
+
+    try {
+      return Optional.of(httpClient.go(
+          new Request(HttpMethod.GET, url),
+          new InputStreamResponseHandler()
+      ).get());
+    }
+    catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    catch (ExecutionException e) {
+      // Unwrap if possible
+      Throwables.propagateIfPossible(e.getCause(), IOException.class);
+      throw new RuntimeException(e);
+    }
   }
 
+
   /**
    * Adds a task to the pending queue.
    * {@link #runPendingTasks()} should be called to run the pending task.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index 79a67b4141..db792b6b99 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -24,7 +24,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -61,6 +60,7 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -128,7 +128,7 @@ public class ThreadingTaskRunner
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(String taskid, long offset)
+  public Optional<InputStream> streamTaskLog(String taskid, long offset)
   {
     // task logs will appear in the main indexer log, streaming individual 
task logs is not supported
     return Optional.absent();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 19279af327..245933eaf8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -32,7 +32,6 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -962,7 +961,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
   }
 
   @Override
-  public Optional<ByteSource> streamTaskLog(String taskId, long offset)
+  public Optional<InputStream> streamTaskLog(String taskId, long offset) 
throws IOException
   {
     @SuppressWarnings("GuardedBy") // Read on tasks is safe
     HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
@@ -982,34 +981,26 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
           taskId,
           Long.toString(offset)
       );
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              try {
-                return httpClient.go(
-                    new Request(HttpMethod.GET, url),
-                    new InputStreamResponseHandler()
-                ).get();
-              }
-              catch (InterruptedException e) {
-                throw new RuntimeException(e);
-              }
-              catch (ExecutionException e) {
-                // Unwrap if possible
-                Throwables.propagateIfPossible(e.getCause(), 
IOException.class);
-                throw new RuntimeException(e);
-              }
-            }
-          }
-      );
+
+      try {
+        return Optional.of(httpClient.go(
+            new Request(HttpMethod.GET, url),
+            new InputStreamResponseHandler()
+        ).get());
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      catch (ExecutionException e) {
+        // Unwrap if possible
+        Throwables.propagateIfPossible(e.getCause(), IOException.class);
+        throw new RuntimeException(e);
+      }
     }
   }
 
   @Override
-  public Optional<ByteSource> streamTaskReports(String taskId)
+  public Optional<InputStream> streamTaskReports(String taskId) throws 
IOException
   {
     @SuppressWarnings("GuardedBy") // Read on tasks is safe
     HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
@@ -1035,29 +1026,21 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
           "/druid/worker/v1/chat/%s/liveReports",
           taskId
       );
-      return Optional.of(
-          new ByteSource()
-          {
-            @Override
-            public InputStream openStream() throws IOException
-            {
-              try {
-                return httpClient.go(
-                    new Request(HttpMethod.GET, url),
-                    new InputStreamResponseHandler()
-                ).get();
-              }
-              catch (InterruptedException e) {
-                throw new RuntimeException(e);
-              }
-              catch (ExecutionException e) {
-                // Unwrap if possible
-                Throwables.propagateIfPossible(e.getCause(), 
IOException.class);
-                throw new RuntimeException(e);
-              }
-            }
-          }
-      );
+
+      try {
+        return Optional.of(httpClient.go(
+            new Request(HttpMethod.GET, url),
+            new InputStreamResponseHandler()
+        ).get());
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      catch (ExecutionException e) {
+        // Unwrap if possible
+        Throwables.propagateIfPossible(e.getCause(), IOException.class);
+        throw new RuntimeException(e);
+      }
     }
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index a111ebe787..a3a7f8de69 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ResourceFilters;
 import org.apache.druid.audit.AuditEntry;
@@ -100,6 +99,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -1008,9 +1008,9 @@ public class OverlordResource
   )
   {
     try {
-      final Optional<ByteSource> stream = 
taskLogStreamer.streamTaskLog(taskid, offset);
+      final Optional<InputStream> stream = 
taskLogStreamer.streamTaskLog(taskid, offset);
       if (stream.isPresent()) {
-        return Response.ok(stream.get().openStream()).build();
+        return Response.ok(stream.get()).build();
       } else {
         return Response.status(Response.Status.NOT_FOUND)
                        .entity(
@@ -1035,9 +1035,9 @@ public class OverlordResource
   )
   {
     try {
-      final Optional<ByteSource> stream = 
taskLogStreamer.streamTaskReports(taskid);
+      final Optional<InputStream> stream = 
taskLogStreamer.streamTaskReports(taskid);
       if (stream.isPresent()) {
-        return Response.ok(stream.get().openStream()).build();
+        return Response.ok(stream.get()).build();
       } else {
         return Response.status(Response.Status.NOT_FOUND)
                        .entity(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
index 935514f27b..42d145edb2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
@@ -24,7 +24,6 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.io.ByteSource;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.sun.jersey.spi.container.ResourceFilters;
@@ -53,6 +52,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
+import java.io.InputStream;
 
 /**
  */
@@ -214,10 +214,10 @@ public class WorkerResource
                      .build();
     }
     try {
-      final Optional<ByteSource> stream = ((TaskLogStreamer) 
taskRunner).streamTaskLog(taskId, offset);
+      final Optional<InputStream> stream = ((TaskLogStreamer) 
taskRunner).streamTaskLog(taskId, offset);
 
       if (stream.isPresent()) {
-        return Response.ok(stream.get().openStream()).build();
+        return Response.ok(stream.get()).build();
       } else {
         return Response.status(Response.Status.NOT_FOUND).build();
       }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
index 74503d9b08..22c13e3464 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
@@ -63,7 +63,7 @@ public class FileTaskLogsTest
 
       final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, 
"lah", -2L, "ah", -5L, "blah");
       for (Map.Entry<Long, String> entry : expected.entrySet()) {
-        final byte[] bytes = 
ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", 
entry.getKey()).get().openStream());
+        final byte[] bytes = 
ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get());
         final String string = StringUtils.fromUtf8(bytes);
         Assert.assertEquals(StringUtils.format("Read with offset %,d", 
entry.getKey()), string, entry.getValue());
       }
@@ -91,7 +91,7 @@ public class FileTaskLogsTest
 
     Assert.assertEquals(
         testReportString,
-        
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskReports("foo").get().openStream()))
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskReports("foo").get()))
     );
   }
 
@@ -147,7 +147,7 @@ public class FileTaskLogsTest
 
   private String readLog(TaskLogs taskLogs, String logFile, long offset) 
throws IOException
   {
-    return 
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, 
offset).get().openStream()));
+    return 
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, 
offset).get()));
   }
 
   private static class TestTaskReport implements TaskReport
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamerTest.java
new file mode 100644
index 0000000000..afe9c62931
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamerTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.tasklogs;
+
+import com.google.common.base.Optional;
+import com.google.common.io.ByteStreams;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class SwitchingTaskLogStreamerTest
+{
+  private static final String LOG = "LOG";
+  private static final String REPORT = "REPORT";
+  private static final String TASK_ID = "foo";
+
+  private final TaskLogStreamer streamer1 = new TestTaskLogStreamer(1);
+  private final TaskLogStreamer streamer2 = new TestTaskLogStreamer(2);
+  private final TaskLogStreamer emptyStreamer = new NoopTaskLogs();
+  private final TaskLogStreamer ioExceptionStreamer = new TaskLogStreamer()
+  {
+    @Override
+    public Optional<InputStream> streamTaskLog(String taskid, long offset) 
throws IOException
+    {
+      throw new IOE("expected log exception");
+    }
+
+    @Override
+    public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
+    {
+      throw new IOE("expected task exception");
+    }
+  };
+
+  @Test
+  public void foundInRemoteTasks() throws IOException
+  {
+    SwitchingTaskLogStreamer switchingTaskLogStreamer = new 
SwitchingTaskLogStreamer(
+        streamer1,
+        Arrays.asList(
+            streamer2,
+            emptyStreamer
+        )
+    );
+    Assert.assertEquals(
+        getLogString(1, TASK_ID, 1),
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
 1).get()))
+    );
+
+    Assert.assertEquals(
+        getReportString(1, TASK_ID),
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+    );
+  }
+
+  @Test
+  public void foundInDeepStorage() throws IOException
+  {
+
+    SwitchingTaskLogStreamer switchingTaskLogStreamer = new 
SwitchingTaskLogStreamer(
+        emptyStreamer,
+        Arrays.asList(
+            streamer2,
+            emptyStreamer
+        )
+    );
+    Assert.assertEquals(
+        getLogString(2, TASK_ID, 1),
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
 1).get()))
+    );
+
+    Assert.assertEquals(
+        getReportString(2, TASK_ID),
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+    );
+  }
+
+  @Test
+  public void exceptionInTaskStreamerButFoundInDeepStrorage() throws 
IOException
+  {
+    SwitchingTaskLogStreamer switchingTaskLogStreamer = new 
SwitchingTaskLogStreamer(
+        ioExceptionStreamer,
+        Arrays.asList(
+            streamer2,
+            emptyStreamer
+        )
+    );
+    Assert.assertEquals(
+        getLogString(2, TASK_ID, 1),
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
 1).get()))
+    );
+
+    Assert.assertEquals(
+        getReportString(2, TASK_ID),
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+    );
+  }
+
+
+  @Test
+  public void exceptionInDeepStrorage()
+  {
+    SwitchingTaskLogStreamer switchingTaskLogStreamer = new 
SwitchingTaskLogStreamer(
+        emptyStreamer,
+        Arrays.asList(
+            ioExceptionStreamer,
+            streamer2
+        )
+    );
+    Assert.assertThrows("expected log exception", IOException.class, () ->
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
 1).get()))
+    );
+
+    Assert.assertThrows("expected report exception", IOException.class, () ->
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+    );
+  }
+
+  @Test
+  public void exceptionInRemoteTaskLogStreamerWithEmptyDeepStorage()
+  {
+    SwitchingTaskLogStreamer switchingTaskLogStreamer = new 
SwitchingTaskLogStreamer(
+        ioExceptionStreamer,
+        Collections.singletonList(
+            emptyStreamer
+        )
+    );
+    Assert.assertThrows("expected log exception", IOException.class, () ->
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
 1).get()))
+    );
+
+    Assert.assertThrows("expected report exception", IOException.class, () ->
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+    );
+
+  }
+
+  @Test
+  public void exceptionEverywhere()
+  {
+    SwitchingTaskLogStreamer switchingTaskLogStreamer = new 
SwitchingTaskLogStreamer(
+        ioExceptionStreamer,
+        Collections.singletonList(
+            ioExceptionStreamer
+        )
+    );
+    Assert.assertThrows("expected log exception", IOException.class, () ->
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
 1).get()))
+    );
+
+    Assert.assertThrows("expected report exception", IOException.class, () ->
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+    );
+  }
+
+  @Test
+  public void empty() throws IOException
+  {
+    SwitchingTaskLogStreamer switchingTaskLogStreamer = new 
SwitchingTaskLogStreamer(
+        emptyStreamer,
+        Collections.singletonList(
+            emptyStreamer
+        )
+    );
+    Assert.assertFalse(switchingTaskLogStreamer.streamTaskLog(TASK_ID, 
1).isPresent());
+    
Assert.assertFalse(switchingTaskLogStreamer.streamTaskReports(TASK_ID).isPresent());
+  }
+
+  private static String getLogString(int id, String taskid, long offset)
+  {
+    return StringUtils.format(
+        LOG + " with id %d, task %s and offset %d",
+        id,
+        taskid,
+        offset
+    );
+  }
+
+
+  private static String getReportString(int id, String taskid)
+  {
+    return StringUtils.format(
+        REPORT + " with id %d, task %s",
+        id,
+        taskid
+    );
+  }
+
+  private static class TestTaskLogStreamer implements TaskLogStreamer
+  {
+    private final int id;
+
+    public TestTaskLogStreamer(int id)
+    {
+      this.id = id;
+    }
+
+    @Override
+    public Optional<InputStream> streamTaskLog(String taskid, long offset)
+    {
+      return Optional.of(new ByteArrayInputStream(getLogString(id, taskid, 
offset).getBytes(StandardCharsets.UTF_8)));
+    }
+
+
+    @Override
+    public Optional<InputStream> streamTaskReports(String taskid)
+    {
+      return Optional.of(new ByteArrayInputStream(getReportString(id, 
taskid).getBytes(StandardCharsets.UTF_8)));
+    }
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 2be0cc6541..ea4c299485 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -1116,7 +1116,7 @@ public class RemoteTaskRunnerTest
     );
 
     // Stream task reports from a running task.
-    final InputStream in = 
remoteTaskRunner.streamTaskReports(task.getId()).get().openStream();
+    final InputStream in = 
remoteTaskRunner.streamTaskReports(task.getId()).get();
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
     ByteStreams.copy(in, baos);
     Assert.assertEquals(reportString, 
StringUtils.fromUtf8(baos.toByteArray()));
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java 
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 14ec99195b..3a45975bf9 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -198,11 +198,15 @@ public class CliOverlord extends ServerRunnable
             )
                   .toProvider(
                       new ListProvider<TaskLogStreamer>()
-                          .add(TaskRunnerTaskLogStreamer.class)
                           .add(TaskLogs.class)
                   )
                   .in(LazySingleton.class);
 
+            binder.bind(TaskLogStreamer.class)
+                  .annotatedWith(Names.named("taskstreamer"))
+                  .to(TaskRunnerTaskLogStreamer.class)
+                  .in(LazySingleton.class);
+
             
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
             binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
             binder.bind(TaskLockbox.class).in(LazySingleton.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to