This is an automated email from the ASF dual-hosted git repository.
vogievetsky pushed a commit to branch 24.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/24.0.0 by this push:
new c8ab8b20bb Race in Task report/log streamer (#12931) (#12980)
c8ab8b20bb is described below
commit c8ab8b20bbeeac6f880e102e35e053867e7898a5
Author: Karan Kumar <[email protected]>
AuthorDate: Fri Aug 26 20:53:43 2022 +0530
Race in Task report/log streamer (#12931) (#12980)
* Fixing RACE in HTTP remote task Runner
* Changes in the interface
* Updating documentation
* Adding test cases to SwitchingTaskLogStreamer
* Adding more tests
---
.../org/apache/druid/tasklogs/NoopTaskLogs.java | 4 +-
.../org/apache/druid/tasklogs/TaskLogStreamer.java | 9 +-
.../apache/druid/storage/aliyun/OssTaskLogs.java | 59 +++--
.../druid/storage/aliyun/OssTaskLogsTest.java | 165 ++++++++++----
.../apache/druid/storage/azure/AzureTaskLogs.java | 60 +++---
.../druid/storage/azure/AzureTaskLogsTest.java | 26 +--
.../druid/storage/google/GoogleTaskLogs.java | 48 ++---
.../druid/storage/google/GoogleTaskLogsTest.java | 14 +-
.../druid/storage/hdfs/tasklog/HdfsTaskLogs.java | 40 ++--
.../indexing/common/tasklogs/HdfsTaskLogsTest.java | 2 +-
.../org/apache/druid/storage/s3/S3TaskLogs.java | 58 +++--
.../apache/druid/storage/s3/S3TaskLogsTest.java | 180 ++++++++++++----
.../indexing/common/tasklogs/FileTaskLogs.java | 27 +--
.../common/tasklogs/SwitchingTaskLogStreamer.java | 77 ++++++-
.../common/tasklogs/TaskRunnerTaskLogStreamer.java | 6 +-
.../druid/indexing/overlord/ForkingTaskRunner.java | 15 +-
.../druid/indexing/overlord/RemoteTaskRunner.java | 82 +++----
.../indexing/overlord/ThreadingTaskRunner.java | 4 +-
.../overlord/hrtr/HttpRemoteTaskRunner.java | 81 +++----
.../indexing/overlord/http/OverlordResource.java | 10 +-
.../druid/indexing/worker/http/WorkerResource.java | 6 +-
.../indexing/common/tasklogs/FileTaskLogsTest.java | 6 +-
.../tasklogs/SwitchingTaskLogStreamerTest.java | 238 +++++++++++++++++++++
.../indexing/overlord/RemoteTaskRunnerTest.java | 2 +-
.../java/org/apache/druid/cli/CliOverlord.java | 6 +-
25 files changed, 804 insertions(+), 421 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
b/core/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
index a0dc877a30..13962d7c4d 100644
--- a/core/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
+++ b/core/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
@@ -20,17 +20,17 @@
package org.apache.druid.tasklogs;
import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.File;
+import java.io.InputStream;
public class NoopTaskLogs implements TaskLogs
{
private final Logger log = new Logger(TaskLogs.class);
@Override
- public Optional<ByteSource> streamTaskLog(String taskid, long offset)
+ public Optional<InputStream> streamTaskLog(String taskid, long offset)
{
return Optional.absent();
}
diff --git a/core/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
b/core/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
index e6451a6840..04add17ea5 100644
--- a/core/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
+++ b/core/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
@@ -20,10 +20,10 @@
package org.apache.druid.tasklogs;
import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
import org.apache.druid.guice.annotations.ExtensionPoint;
import java.io.IOException;
+import java.io.InputStream;
/**
* Something that knows how to stream logs for tasks.
@@ -36,12 +36,11 @@ public interface TaskLogStreamer
*
* @param offset If zero, stream the entire log. If positive, attempt to
read from this position onwards. If
* negative, attempt to read this many bytes from the end of
the file (like <tt>tail -n</tt>).
- *
- * @return input supplier for this log, if available from this provider
+ * @return inputStream for this log, if available
*/
- Optional<ByteSource> streamTaskLog(String taskid, long offset) throws
IOException;
+ Optional<InputStream> streamTaskLog(String taskid, long offset) throws
IOException;
- default Optional<ByteSource> streamTaskReports(final String taskid) throws
IOException
+ default Optional<InputStream> streamTaskReports(final String taskid) throws
IOException
{
return Optional.absent();
}
diff --git
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
index 515d85096e..91d02c3f5a 100644
---
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
+++
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
@@ -25,7 +25,6 @@ import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.ObjectMetadata;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
-import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
@@ -66,54 +65,44 @@ public class OssTaskLogs implements TaskLogs
}
@Override
- public Optional<ByteSource> streamTaskLog(final String taskid, final long
offset) throws IOException
+ public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "log");
return streamTaskFile(offset, taskKey);
}
@Override
- public Optional<ByteSource> streamTaskReports(String taskid) throws
IOException
+ public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
return streamTaskFile(0, taskKey);
}
- private Optional<ByteSource> streamTaskFile(final long offset, String
taskKey) throws IOException
+ private Optional<InputStream> streamTaskFile(final long offset, String
taskKey) throws IOException
{
try {
final ObjectMetadata objectMetadata =
client.getObjectMetadata(config.getBucket(), taskKey);
-
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- final long start;
- final long end = objectMetadata.getContentLength() - 1;
-
- if (offset > 0 && offset < objectMetadata.getContentLength()) {
- start = offset;
- } else if (offset < 0 && (-1 * offset) <
objectMetadata.getContentLength()) {
- start = objectMetadata.getContentLength() + offset;
- } else {
- start = 0;
- }
-
- final GetObjectRequest request = new
GetObjectRequest(config.getBucket(), taskKey);
-
request.setMatchingETagConstraints(Collections.singletonList(objectMetadata.getETag()));
- request.setRange(start, end);
-
- return client.getObject(request).getObjectContent();
- }
- catch (OSSException e) {
- throw new IOException(e);
- }
- }
- }
- );
+ try {
+ final long start;
+ final long end = objectMetadata.getContentLength() - 1;
+
+ if (offset > 0 && offset < objectMetadata.getContentLength()) {
+ start = offset;
+ } else if (offset < 0 && (-1 * offset) <
objectMetadata.getContentLength()) {
+ start = objectMetadata.getContentLength() + offset;
+ } else {
+ start = 0;
+ }
+
+ final GetObjectRequest request = new
GetObjectRequest(config.getBucket(), taskKey);
+
request.setMatchingETagConstraints(Collections.singletonList(objectMetadata.getETag()));
+ request.setRange(start, end);
+
+ return Optional.of(client.getObject(request).getObjectContent());
+ }
+ catch (OSSException e) {
+ throw new IOException(e);
+ }
}
catch (OSSException e) {
if ("NoSuchKey".equals(e.getErrorCode())
diff --git
a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
index 1264a0fe9d..16b09866ec 100644
---
a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
+++
b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
@@ -23,11 +23,15 @@ import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.model.AccessControlList;
import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.Grant;
+import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.Owner;
import com.aliyun.oss.model.PutObjectRequest;
import com.aliyun.oss.model.PutObjectResult;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@@ -42,12 +46,19 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
+import javax.annotation.Nonnull;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
@RunWith(EasyMockRunner.class)
public class OssTaskLogsTest extends EasyMockSupport
@@ -65,6 +76,9 @@ public class OssTaskLogsTest extends EasyMockSupport
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new
ClientException(new IOException());
private static final Exception NON_RECOVERABLE_EXCEPTION = new
ClientException(new NullPointerException());
+ private static final String LOG_CONTENTS = "log_contents";
+ private static final String REPORT_CONTENTS = "report_contents";
+
@Mock
private CurrentTimeMillisSupplier timeSupplier;
@@ -113,12 +127,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
- OssTaskLogsConfig config = new OssTaskLogsConfig();
- config.setBucket(TEST_BUCKET);
- config.setPrefix(TEST_PREFIX);
- OssInputDataConfig inputDataConfig = new OssInputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig,
timeSupplier);
+ OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killAll();
EasyMock.verify(ossClient, timeSupplier);
@@ -147,12 +156,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
- OssTaskLogsConfig config = new OssTaskLogsConfig();
- config.setBucket(TEST_BUCKET);
- config.setPrefix(TEST_PREFIX);
- OssInputDataConfig inputDataConfig = new OssInputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig,
timeSupplier);
+ OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killAll();
EasyMock.verify(ossClient, timeSupplier);
@@ -181,12 +185,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
- OssTaskLogsConfig config = new OssTaskLogsConfig();
- config.setBucket(TEST_BUCKET);
- config.setPrefix(TEST_PREFIX);
- OssInputDataConfig inputDataConfig = new OssInputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config,
inputDataConfig, timeSupplier);
+ OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killAll();
}
catch (IOException e) {
@@ -217,12 +216,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
- OssTaskLogsConfig config = new OssTaskLogsConfig();
- config.setBucket(TEST_BUCKET);
- config.setPrefix(TEST_PREFIX);
- OssInputDataConfig inputDataConfig = new OssInputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig,
timeSupplier);
+ OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(ossClient, timeSupplier);
@@ -250,12 +244,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
- OssTaskLogsConfig config = new OssTaskLogsConfig();
- config.setBucket(TEST_BUCKET);
- config.setPrefix(TEST_PREFIX);
- OssInputDataConfig inputDataConfig = new OssInputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig,
timeSupplier);
+ OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(ossClient, timeSupplier);
@@ -283,12 +272,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
- OssTaskLogsConfig config = new OssTaskLogsConfig();
- config.setBucket(TEST_BUCKET);
- config.setPrefix(TEST_PREFIX);
- OssInputDataConfig inputDataConfig = new OssInputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config,
inputDataConfig, timeSupplier);
+ OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killOlderThan(TIME_NOW);
}
catch (IOException e) {
@@ -300,6 +284,115 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.verify(ossClient, timeSupplier);
}
+ @Test
+ public void test_taskLog_fetch() throws IOException
+ {
+ EasyMock.reset(ossClient);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(LOG_CONTENTS.length());
+ EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+
+ OSSObject ossObject = new OSSObject();
+ ossObject.setObjectContent(new
ByteArrayInputStream(LOG_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
+ EasyMock.replay(ossClient);
+
+ OssTaskLogs ossTaskLogs = getOssTaskLogs();
+ Optional<InputStream> inputStreamOptional =
ossTaskLogs.streamTaskLog(KEY_1, 0);
+ String taskLogs = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(LOG_CONTENTS, taskLogs);
+ }
+
+ @Test
+ public void test_taskLog_fetch_withRange() throws IOException
+ {
+ EasyMock.reset(ossClient);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(LOG_CONTENTS.length());
+ EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+
+ OSSObject ossObject = new OSSObject();
+ ossObject.setObjectContent(new
ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
+
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
+ EasyMock.replay(ossClient);
+
+ OssTaskLogs ossTaskLogs = getOssTaskLogs();
+ Optional<InputStream> inputStreamOptional =
ossTaskLogs.streamTaskLog(KEY_1, 1);
+ String taskLogs = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
+ }
+
+ @Test
+ public void test_taskLog_fetch_withNegativeRange() throws IOException
+ {
+ EasyMock.reset(ossClient);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(LOG_CONTENTS.length());
+ EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+
+ OSSObject ossObject = new OSSObject();
+ ossObject.setObjectContent(new
ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
+
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
+ EasyMock.replay(ossClient);
+
+ OssTaskLogs ossTaskLogs = getOssTaskLogs();
+ Optional<InputStream> inputStreamOptional =
ossTaskLogs.streamTaskLog(KEY_1, -1 * (LOG_CONTENTS.length() - 1));
+ String taskLogs = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
+ }
+
+
+ @Test
+ public void test_taskReport_fetch() throws IOException
+ {
+ EasyMock.reset(ossClient);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/report.json";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(REPORT_CONTENTS.length());
+ EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+
+ OSSObject ossObject = new OSSObject();
+ ossObject.setObjectContent(new
ByteArrayInputStream(REPORT_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
+ EasyMock.replay(ossClient);
+
+ OssTaskLogs ossTaskLogs = getOssTaskLogs();
+ Optional<InputStream> inputStreamOptional =
ossTaskLogs.streamTaskReports(KEY_1);
+ String report = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(REPORT_CONTENTS, report);
+ }
+
+ @Nonnull
+ private OssTaskLogs getOssTaskLogs()
+ {
+ OssTaskLogsConfig config = new OssTaskLogsConfig();
+ config.setBucket(TEST_BUCKET);
+ config.setPrefix(TEST_PREFIX);
+ OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_KEYS);
+ OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig,
timeSupplier);
+ return taskLogs;
+ }
+
private List<Grant> testPushInternal(boolean disableAcl, String ownerId,
String ownerDisplayName) throws Exception
{
EasyMock.expect(ossClient.putObject(EasyMock.anyObject()))
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 46e081eea3..678a43e5db 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -20,7 +20,6 @@
package org.apache.druid.storage.azure;
import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@@ -100,56 +99,45 @@ public class AzureTaskLogs implements TaskLogs
}
@Override
- public Optional<ByteSource> streamTaskLog(final String taskid, final long
offset) throws IOException
+ public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
return streamTaskFile(taskid, offset, getTaskLogKey(taskid));
}
@Override
- public Optional<ByteSource> streamTaskReports(String taskid) throws
IOException
+ public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
{
return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
}
- private Optional<ByteSource> streamTaskFile(final String taskid, final long
offset, String taskKey) throws IOException
+ private Optional<InputStream> streamTaskFile(final String taskid, final long
offset, String taskKey)
+ throws IOException
{
final String container = config.getContainer();
-
try {
if (!azureStorage.getBlobExists(container, taskKey)) {
return Optional.absent();
}
-
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- final long start;
- final long length = azureStorage.getBlobLength(container,
taskKey);
-
- if (offset > 0 && offset < length) {
- start = offset;
- } else if (offset < 0 && (-1 * offset) < length) {
- start = length + offset;
- } else {
- start = 0;
- }
-
- InputStream stream =
azureStorage.getBlobInputStream(container, taskKey);
- stream.skip(start);
-
- return stream;
-
- }
- catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
- );
+ try {
+ final long start;
+ final long length = azureStorage.getBlobLength(container, taskKey);
+
+ if (offset > 0 && offset < length) {
+ start = offset;
+ } else if (offset < 0 && (-1 * offset) < length) {
+ start = length + offset;
+ } else {
+ start = 0;
+ }
+
+ InputStream stream = azureStorage.getBlobInputStream(container,
taskKey);
+ stream.skip(start);
+
+ return Optional.of(stream);
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
}
catch (StorageException | URISyntaxException e) {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 519a6e923f..5313c1d5f5 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.storage.azure;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.io.ByteSource;
import com.microsoft.azure.storage.StorageException;
import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@@ -39,6 +38,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
@@ -191,10 +191,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
azureTaskLogs.streamTaskLog(TASK_ID, 0);
+ final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID,
0);
final StringWriter writer = new StringWriter();
- IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog);
verifyAll();
@@ -214,10 +214,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
azureTaskLogs.streamTaskLog(TASK_ID, 5);
+ final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID,
5);
final StringWriter writer = new StringWriter();
- IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(5));
verifyAll();
@@ -237,10 +237,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
azureTaskLogs.streamTaskLog(TASK_ID, -3);
+ final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID,
-3);
final StringWriter writer = new StringWriter();
- IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(testLog.length()
- 3));
verifyAll();
@@ -260,10 +260,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
azureTaskLogs.streamTaskReports(TASK_ID);
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskReports(TASK_ID);
final StringWriter writer = new StringWriter();
- IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog);
verifyAll();
@@ -279,10 +279,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
azureTaskLogs.streamTaskReports(TASK_ID_NOT_FOUND);
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskReports(TASK_ID_NOT_FOUND);
- Assert.assertFalse(byteSource.isPresent());
+ Assert.assertFalse(stream.isPresent());
verifyAll();
}
@@ -301,10 +301,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
azureTaskLogs.streamTaskReports(TASK_ID);
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskReports(TASK_ID);
final StringWriter writer = new StringWriter();
- IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ IOUtils.copy(stream.get(), writer, "UTF-8");
verifyAll();
}
diff --git
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
index d6d9ff569b..fcdee4039b 100644
---
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
+++
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
@@ -21,7 +21,6 @@ package org.apache.druid.storage.google;
import com.google.api.client.http.InputStreamContent;
import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
@@ -103,20 +102,21 @@ public class GoogleTaskLogs implements TaskLogs
}
@Override
- public Optional<ByteSource> streamTaskLog(final String taskid, final long
offset) throws IOException
+ public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
return streamTaskFile(taskid, offset, taskKey);
}
@Override
- public Optional<ByteSource> streamTaskReports(String taskid) throws
IOException
+ public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
{
final String taskKey = getTaskReportKey(taskid);
return streamTaskFile(taskid, 0, taskKey);
}
- private Optional<ByteSource> streamTaskFile(final String taskid, final long
offset, String taskKey) throws IOException
+ private Optional<InputStream> streamTaskFile(final String taskid, final long
offset, String taskKey)
+ throws IOException
{
try {
if (!storage.exists(config.getBucket(), taskKey)) {
@@ -124,32 +124,22 @@ public class GoogleTaskLogs implements TaskLogs
}
final long length = storage.size(config.getBucket(), taskKey);
+ try {
+ final long start;
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- final long start;
-
- if (offset > 0 && offset < length) {
- start = offset;
- } else if (offset < 0 && (-1 * offset) < length) {
- start = length + offset;
- } else {
- start = 0;
- }
-
- return new GoogleByteSource(storage, config.getBucket(),
taskKey).openStream(start);
- }
- catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
- );
+ if (offset > 0 && offset < length) {
+ start = offset;
+ } else if (offset < 0 && (-1 * offset) < length) {
+ start = length + offset;
+ } else {
+ start = 0;
+ }
+
+ return Optional.of(new GoogleByteSource(storage, config.getBucket(),
taskKey).openStream(start));
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
}
catch (IOException e) {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);
diff --git
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
index c5807723b6..d8b7c61cfc 100644
---
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
+++
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
@@ -27,7 +27,6 @@ import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.io.ByteSource;
import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.FileUtils;
@@ -42,6 +41,7 @@ import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.StringWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -121,10 +121,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
googleTaskLogs.streamTaskLog(TASKID, 0);
+ final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID,
0);
final StringWriter writer = new StringWriter();
- IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog);
verifyAll();
@@ -144,10 +144,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
googleTaskLogs.streamTaskLog(TASKID, offset);
+ final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID,
offset);
final StringWriter writer = new StringWriter();
- IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), expectedLog);
verifyAll();
@@ -168,10 +168,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
replayAll();
- final Optional<ByteSource> byteSource =
googleTaskLogs.streamTaskLog(TASKID, offset);
+ final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID,
offset);
final StringWriter writer = new StringWriter();
- IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
+ IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), expectedLog);
verifyAll();
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
index 7289901823..6026a3e3cb 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
@@ -20,7 +20,6 @@
package org.apache.druid.storage.hdfs.tasklog;
import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import org.apache.druid.guice.Hdfs;
@@ -88,44 +87,35 @@ public class HdfsTaskLogs implements TaskLogs
}
@Override
- public Optional<ByteSource> streamTaskLog(final String taskId, final long
offset) throws IOException
+ public Optional<InputStream> streamTaskLog(final String taskId, final long
offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
return streamTaskFile(path, offset);
}
@Override
- public Optional<ByteSource> streamTaskReports(String taskId) throws
IOException
+ public Optional<InputStream> streamTaskReports(String taskId) throws
IOException
{
final Path path = getTaskReportsFileFromId(taskId);
return streamTaskFile(path, 0);
}
- private Optional<ByteSource> streamTaskFile(final Path path, final long
offset) throws IOException
+ private Optional<InputStream> streamTaskFile(final Path path, final long
offset) throws IOException
{
final FileSystem fs = path.getFileSystem(hadoopConfig);
if (fs.exists(path)) {
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- log.info("Reading task log from: %s", path);
- final long seekPos;
- if (offset < 0) {
- final FileStatus stat = fs.getFileStatus(path);
- seekPos = Math.max(0, stat.getLen() + offset);
- } else {
- seekPos = offset;
- }
- final FSDataInputStream inputStream = fs.open(path);
- inputStream.seek(seekPos);
- log.info("Read task log from: %s (seek = %,d)", path, seekPos);
- return inputStream;
- }
- }
- );
+ log.info("Reading task log from: %s", path);
+ final long seekPos;
+ if (offset < 0) {
+ final FileStatus stat = fs.getFileStatus(path);
+ seekPos = Math.max(0, stat.getLen() + offset);
+ } else {
+ seekPos = offset;
+ }
+ final FSDataInputStream inputStream = fs.open(path);
+ inputStream.seek(seekPos);
+ log.info("Read task log from: %s (seek = %,d)", path, seekPos);
+ return Optional.of(inputStream);
} else {
return Optional.absent();
}
diff --git
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
index 49de4594be..9724cba694 100644
---
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
@@ -115,6 +115,6 @@ public class HdfsTaskLogsTest
private String readLog(TaskLogs taskLogs, String logFile, long offset)
throws IOException
{
- return
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile,
offset).get().openStream()));
+ return
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile,
offset).get()));
}
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
index 838b47174a..9319e8a5fc 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
@@ -25,7 +25,6 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
-import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
@@ -65,54 +64,45 @@ public class S3TaskLogs implements TaskLogs
}
@Override
- public Optional<ByteSource> streamTaskLog(final String taskid, final long
offset) throws IOException
+ public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "log");
return streamTaskFile(offset, taskKey);
}
@Override
- public Optional<ByteSource> streamTaskReports(String taskid) throws
IOException
+ public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
return streamTaskFile(0, taskKey);
}
- private Optional<ByteSource> streamTaskFile(final long offset, String
taskKey) throws IOException
+ private Optional<InputStream> streamTaskFile(final long offset, String
taskKey) throws IOException
{
try {
final ObjectMetadata objectMetadata =
service.getObjectMetadata(config.getS3Bucket(), taskKey);
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- final long start;
- final long end = objectMetadata.getContentLength() - 1;
-
- if (offset > 0 && offset < objectMetadata.getContentLength()) {
- start = offset;
- } else if (offset < 0 && (-1 * offset) <
objectMetadata.getContentLength()) {
- start = objectMetadata.getContentLength() + offset;
- } else {
- start = 0;
- }
-
- final GetObjectRequest request = new
GetObjectRequest(config.getS3Bucket(), taskKey)
- .withMatchingETagConstraint(objectMetadata.getETag())
- .withRange(start, end);
-
- return service.getObject(request).getObjectContent();
- }
- catch (AmazonServiceException e) {
- throw new IOException(e);
- }
- }
- }
- );
+ try {
+ final long start;
+ final long end = objectMetadata.getContentLength() - 1;
+
+ if (offset > 0 && offset < objectMetadata.getContentLength()) {
+ start = offset;
+ } else if (offset < 0 && (-1 * offset) <
objectMetadata.getContentLength()) {
+ start = objectMetadata.getContentLength() + offset;
+ } else {
+ start = 0;
+ }
+
+ final GetObjectRequest request = new
GetObjectRequest(config.getS3Bucket(), taskKey)
+ .withMatchingETagConstraint(objectMetadata.getETag())
+ .withRange(start, end);
+
+ return Optional.of(service.getObject(request).getObjectContent());
+ }
+ catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
}
catch (AmazonS3Exception e) {
if (404 == e.getStatusCode()
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
index 502897f010..3b28ac07e6 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
@@ -22,12 +22,16 @@ package org.apache.druid.storage.s3;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.Grant;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@@ -42,10 +46,17 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
+import javax.annotation.Nonnull;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.stream.Collectors;
@RunWith(EasyMockRunner.class)
public class S3TaskLogsTest extends EasyMockSupport
@@ -63,6 +74,8 @@ public class S3TaskLogsTest extends EasyMockSupport
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new
SdkClientException(new IOException());
private static final Exception NON_RECOVERABLE_EXCEPTION = new
SdkClientException(new NullPointerException());
+ private static final String LOG_CONTENTS = "log_contents";
+ private static final String REPORT_CONTENTS = "report_contents";
@Mock
private CurrentTimeMillisSupplier timeSupplier;
@@ -136,12 +149,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
- S3TaskLogsConfig config = new S3TaskLogsConfig();
- config.setS3Bucket(TEST_BUCKET);
- config.setS3Prefix(TEST_PREFIX);
- S3InputDataConfig inputDataConfig = new S3InputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig,
timeSupplier);
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killAll();
EasyMock.verify(s3Client, timeSupplier);
@@ -174,12 +182,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
- S3TaskLogsConfig config = new S3TaskLogsConfig();
- config.setS3Bucket(TEST_BUCKET);
- config.setS3Prefix(TEST_PREFIX);
- S3InputDataConfig inputDataConfig = new S3InputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig,
timeSupplier);
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killAll();
EasyMock.verify(s3Client, timeSupplier);
@@ -211,12 +214,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
- S3TaskLogsConfig config = new S3TaskLogsConfig();
- config.setS3Bucket(TEST_BUCKET);
- config.setS3Prefix(TEST_PREFIX);
- S3InputDataConfig inputDataConfig = new S3InputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config,
inputDataConfig, timeSupplier);
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killAll();
}
catch (IOException e) {
@@ -250,12 +248,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
- S3TaskLogsConfig config = new S3TaskLogsConfig();
- config.setS3Bucket(TEST_BUCKET);
- config.setS3Prefix(TEST_PREFIX);
- S3InputDataConfig inputDataConfig = new S3InputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig,
timeSupplier);
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(s3Client, timeSupplier);
@@ -286,12 +279,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
- S3TaskLogsConfig config = new S3TaskLogsConfig();
- config.setS3Bucket(TEST_BUCKET);
- config.setS3Prefix(TEST_PREFIX);
- S3InputDataConfig inputDataConfig = new S3InputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig,
timeSupplier);
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(s3Client, timeSupplier);
@@ -322,12 +310,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
- S3TaskLogsConfig config = new S3TaskLogsConfig();
- config.setS3Bucket(TEST_BUCKET);
- config.setS3Prefix(TEST_PREFIX);
- S3InputDataConfig inputDataConfig = new S3InputDataConfig();
- inputDataConfig.setMaxListingLength(MAX_KEYS);
- S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config,
inputDataConfig, timeSupplier);
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killOlderThan(TIME_NOW);
}
catch (IOException e) {
@@ -339,6 +322,131 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.verify(s3Client, timeSupplier);
}
+ @Test
+ public void test_taskLog_fetch() throws IOException
+ {
+ EasyMock.reset(s3Client);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(LOG_CONTENTS.length());
+ EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+
+ S3Object s3Object = new S3Object();
+ s3Object.setObjectContent(new
ByteArrayInputStream(LOG_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+ GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET,
logPath);
+ getObjectRequest.setRange(0, LOG_CONTENTS.length() - 1);
+ getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+ EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+ EasyMock.replay(s3Client);
+
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+ Optional<InputStream> inputStreamOptional =
s3TaskLogs.streamTaskLog(KEY_1, 0);
+ String taskLogs = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(LOG_CONTENTS, taskLogs);
+ }
+
+ @Test
+ public void test_taskLog_fetch_withRange() throws IOException
+ {
+ EasyMock.reset(s3Client);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(LOG_CONTENTS.length());
+ EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+
+ S3Object s3Object = new S3Object();
+ s3Object.setObjectContent(new
ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
+ GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET,
logPath);
+ getObjectRequest.setRange(1, LOG_CONTENTS.length() - 1);
+ getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+ EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+ EasyMock.replay(s3Client);
+
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+ Optional<InputStream> inputStreamOptional =
s3TaskLogs.streamTaskLog(KEY_1, 1);
+ String taskLogs = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
+ }
+
+ @Test
+ public void test_taskLog_fetch_withNegativeRange() throws IOException
+ {
+ EasyMock.reset(s3Client);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(LOG_CONTENTS.length());
+ EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+
+ S3Object s3Object = new S3Object();
+ s3Object.setObjectContent(new
ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
+ GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET,
logPath);
+ getObjectRequest.setRange(1, LOG_CONTENTS.length() - 1);
+ getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+ EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+ EasyMock.replay(s3Client);
+
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+ Optional<InputStream> inputStreamOptional =
s3TaskLogs.streamTaskLog(KEY_1, -1 * (LOG_CONTENTS.length() - 1));
+ String taskLogs = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
+ }
+
+
+ @Test
+ public void test_report_fetch() throws IOException
+ {
+ EasyMock.reset(s3Client);
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/report.json";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(REPORT_CONTENTS.length());
+ EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+ S3Object s3Object = new S3Object();
+ s3Object.setObjectContent(new
ByteArrayInputStream(REPORT_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+ GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET,
logPath);
+ getObjectRequest.setRange(0, REPORT_CONTENTS.length() - 1);
+ getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+ EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+ EasyMock.replay(s3Client);
+
+ S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+ Optional<InputStream> inputStreamOptional =
s3TaskLogs.streamTaskReports(KEY_1);
+ String report = new BufferedReader(
+ new InputStreamReader(inputStreamOptional.get(),
StandardCharsets.UTF_8))
+ .lines()
+ .collect(Collectors.joining("\n"));
+
+ Assert.assertEquals(REPORT_CONTENTS, report);
+ }
+
+
+ @Nonnull
+ private S3TaskLogs getS3TaskLogs()
+ {
+ S3TaskLogsConfig config = new S3TaskLogsConfig();
+ config.setS3Bucket(TEST_BUCKET);
+ config.setS3Prefix(TEST_PREFIX);
+ S3InputDataConfig inputDataConfig = new S3InputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_KEYS);
+ S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig,
timeSupplier);
+ return s3TaskLogs;
+ }
+
private List<Grant> testPushInternal(boolean disableAcl, String ownerId,
String ownerDisplayName) throws Exception
{
EasyMock.expect(s3Client.putObject(EasyMock.anyObject()))
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
index 857d4c65bf..5d546da494 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
@@ -67,40 +66,22 @@ public class FileTaskLogs implements TaskLogs
}
@Override
- public Optional<ByteSource> streamTaskLog(final String taskid, final long
offset)
+ public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
final File file = fileForTask(taskid, "log");
if (file.exists()) {
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- return LogUtils.streamFile(file, offset);
- }
- }
- );
+ return Optional.of(LogUtils.streamFile(file, offset));
} else {
return Optional.absent();
}
}
@Override
- public Optional<ByteSource> streamTaskReports(final String taskid)
+ public Optional<InputStream> streamTaskReports(final String taskid) throws
IOException
{
final File file = fileForTask(taskid, "report.json");
if (file.exists()) {
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- return LogUtils.streamFile(file, 0);
- }
- }
- );
+ return Optional.of(LogUtils.streamFile(file, 0));
} else {
return Optional.absent();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java
index 78806d6eef..098b72854a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java
@@ -21,11 +21,12 @@ package org.apache.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
-import com.google.common.io.ByteSource;
import com.google.inject.Inject;
+import com.google.inject.name.Named;
import org.apache.druid.tasklogs.TaskLogStreamer;
import java.io.IOException;
+import java.io.InputStream;
import java.util.List;
/**
@@ -33,37 +34,93 @@ import java.util.List;
*/
public class SwitchingTaskLogStreamer implements TaskLogStreamer
{
- private final List<TaskLogStreamer> providers;
+ private final TaskLogStreamer taskRunnerTaskLogStreamer;
+ private final List<TaskLogStreamer> deepStorageStreamers;
@Inject
- public SwitchingTaskLogStreamer(List<TaskLogStreamer> providers)
+ public SwitchingTaskLogStreamer(
+ @Named("taskstreamer") TaskLogStreamer taskRunnerTaskLogStreamer,
+ List<TaskLogStreamer> deepStorageStreamer
+ )
{
- this.providers = ImmutableList.copyOf(providers);
+ this.taskRunnerTaskLogStreamer = taskRunnerTaskLogStreamer;
+ this.deepStorageStreamers = ImmutableList.copyOf(deepStorageStreamer);
}
@Override
- public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws
IOException
+ public Optional<InputStream> streamTaskLog(String taskid, long offset)
throws IOException
{
- for (TaskLogStreamer provider : providers) {
- final Optional<ByteSource> stream = provider.streamTaskLog(taskid,
offset);
+ IOException deferIOException = null;
+ try {
+ final Optional<InputStream> stream =
taskRunnerTaskLogStreamer.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return stream;
}
}
+ catch (IOException e) {
+ // defer first IO exception due to race in the way tasks update their
exit status in the overlord
+ // It may happen that the task sent the log to deep storage but is still
running with http chat handlers unregistered
+ // In such a case, catch and ignore the 1st IOException and try
deepStorage for the log. If the log is still not found, return the caught
exception
+ deferIOException = e;
+ }
+ for (TaskLogStreamer provider : deepStorageStreamers) {
+ try {
+ final Optional<InputStream> stream = provider.streamTaskLog(taskid,
offset);
+ if (stream.isPresent()) {
+ return stream;
+ }
+ }
+ catch (IOException e) {
+ if (deferIOException != null) {
+ e.addSuppressed(deferIOException);
+ }
+ throw e;
+ }
+ }
+ // Could not find any InputStream. Throw deferred exception if exists
+ if (deferIOException != null) {
+ throw deferIOException;
+ }
return Optional.absent();
}
@Override
- public Optional<ByteSource> streamTaskReports(String taskid) throws
IOException
+ public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
{
- for (TaskLogStreamer provider : providers) {
- final Optional<ByteSource> stream = provider.streamTaskReports(taskid);
+ IOException deferIOException = null;
+
+ try {
+ final Optional<InputStream> stream =
taskRunnerTaskLogStreamer.streamTaskReports(taskid);
if (stream.isPresent()) {
return stream;
}
}
+ catch (IOException e) {
+ // defer first IO exception due to race in the way tasks update their
exit status in the overlord
+ // It may happen that the task sent the report to deep storage but the
task is still running with http chat handlers unregistered
+ // In such a case, catch and ignore the 1st IOException and try
deepStorage for the report. If the report is still not found, return the caught
exception
+ deferIOException = e;
+ }
+ for (TaskLogStreamer provider : deepStorageStreamers) {
+ try {
+ final Optional<InputStream> stream =
provider.streamTaskReports(taskid);
+ if (stream.isPresent()) {
+ return stream;
+ }
+ }
+ catch (IOException e) {
+ if (deferIOException != null) {
+ e.addSuppressed(deferIOException);
+ }
+ throw e;
+ }
+ }
+ // Could not find any InputStream. Throw deferred exception if exists
+ if (deferIOException != null) {
+ throw deferIOException;
+ }
return Optional.absent();
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java
index 89b5d648cb..206dff097e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java
@@ -20,13 +20,13 @@
package org.apache.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
-import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.tasklogs.TaskLogStreamer;
import java.io.IOException;
+import java.io.InputStream;
/**
*/
@@ -41,7 +41,7 @@ public class TaskRunnerTaskLogStreamer implements
TaskLogStreamer
}
@Override
- public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws
IOException
+ public Optional<InputStream> streamTaskLog(String taskid, long offset)
throws IOException
{
final TaskRunner runner = taskMaster.getTaskRunner().orNull();
if (runner instanceof TaskLogStreamer) {
@@ -52,7 +52,7 @@ public class TaskRunnerTaskLogStreamer implements
TaskLogStreamer
}
@Override
- public Optional<ByteSource> streamTaskReports(String taskId) throws
IOException
+ public Optional<InputStream> streamTaskReports(String taskId) throws
IOException
{
final TaskRunner runner = taskMaster.getTaskRunner().orNull();
if (runner instanceof TaskLogStreamer) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index f07a859e77..98ef0739d2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -33,7 +33,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.io.ByteSink;
-import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
@@ -652,7 +651,7 @@ public class ForkingTaskRunner
}
@Override
- public Optional<ByteSource> streamTaskLog(final String taskid, final long
offset)
+ public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
final ProcessHolder processHolder;
@@ -664,17 +663,7 @@ public class ForkingTaskRunner
return Optional.absent();
}
}
-
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- return LogUtils.streamFile(processHolder.logFile, offset);
- }
- }
- );
+ return Optional.of(LogUtils.streamFile(processHolder.logFile, offset));
}
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 03654b86a1..42ff3c2467 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -34,7 +34,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -611,7 +610,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
}
@Override
- public Optional<ByteSource> streamTaskLog(final String taskId, final long
offset)
+ public Optional<InputStream> streamTaskLog(final String taskId, final long
offset) throws IOException
{
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
@@ -626,34 +625,26 @@ public class RemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
taskId,
Long.toString(offset)
);
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- return httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get();
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(),
IOException.class);
- throw new RuntimeException(e);
- }
- }
- }
- );
+ try {
+ return Optional.of(httpClient.go(
+ new Request(HttpMethod.GET, url),
+ new InputStreamResponseHandler()
+ ).get());
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e) {
+ // Unwrap if possible
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw new RuntimeException(e);
+ }
}
}
+
@Override
- public Optional<ByteSource> streamTaskReports(final String taskId)
+ public Optional<InputStream> streamTaskReports(final String taskId) throws
IOException
{
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
@@ -681,31 +672,24 @@ public class RemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
"/druid/worker/v1/chat/%s/liveReports",
taskId
);
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- return httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get();
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(), IOException.class);
- throw new RuntimeException(e);
- }
- }
- }
- );
+
+ try {
+ return Optional.of(httpClient.go(
+ new Request(HttpMethod.GET, url),
+ new InputStreamResponseHandler()
+ ).get());
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e) {
+ // Unwrap if possible
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw new RuntimeException(e);
+ }
}
+
/**
* Adds a task to the pending queue.
* {@link #runPendingTasks()} should be called to run the pending task.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index 79a67b4141..db792b6b99 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -24,7 +24,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -61,6 +60,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -128,7 +128,7 @@ public class ThreadingTaskRunner
}
@Override
- public Optional<ByteSource> streamTaskLog(String taskid, long offset)
+ public Optional<InputStream> streamTaskLog(String taskid, long offset)
{
// task logs will appear in the main indexer log, streaming individual
task logs is not supported
return Optional.absent();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 19279af327..245933eaf8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -32,7 +32,6 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -962,7 +961,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
}
@Override
- public Optional<ByteSource> streamTaskLog(String taskId, long offset)
+ public Optional<InputStream> streamTaskLog(String taskId, long offset)
throws IOException
{
@SuppressWarnings("GuardedBy") // Read on tasks is safe
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
@@ -982,34 +981,26 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
taskId,
Long.toString(offset)
);
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- return httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get();
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(),
IOException.class);
- throw new RuntimeException(e);
- }
- }
- }
- );
+
+ try {
+ return Optional.of(httpClient.go(
+ new Request(HttpMethod.GET, url),
+ new InputStreamResponseHandler()
+ ).get());
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e) {
+ // Unwrap if possible
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw new RuntimeException(e);
+ }
}
}
@Override
- public Optional<ByteSource> streamTaskReports(String taskId)
+ public Optional<InputStream> streamTaskReports(String taskId) throws
IOException
{
@SuppressWarnings("GuardedBy") // Read on tasks is safe
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
@@ -1035,29 +1026,21 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
"/druid/worker/v1/chat/%s/liveReports",
taskId
);
- return Optional.of(
- new ByteSource()
- {
- @Override
- public InputStream openStream() throws IOException
- {
- try {
- return httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get();
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(),
IOException.class);
- throw new RuntimeException(e);
- }
- }
- }
- );
+
+ try {
+ return Optional.of(httpClient.go(
+ new Request(HttpMethod.GET, url),
+ new InputStreamResponseHandler()
+ ).get());
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e) {
+ // Unwrap if possible
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index a111ebe787..a3a7f8de69 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditEntry;
@@ -100,6 +99,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1008,9 +1008,9 @@ public class OverlordResource
)
{
try {
- final Optional<ByteSource> stream =
taskLogStreamer.streamTaskLog(taskid, offset);
+ final Optional<InputStream> stream =
taskLogStreamer.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
- return Response.ok(stream.get().openStream()).build();
+ return Response.ok(stream.get()).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(
@@ -1035,9 +1035,9 @@ public class OverlordResource
)
{
try {
- final Optional<ByteSource> stream =
taskLogStreamer.streamTaskReports(taskid);
+ final Optional<InputStream> stream =
taskLogStreamer.streamTaskReports(taskid);
if (stream.isPresent()) {
- return Response.ok(stream.get().openStream()).build();
+ return Response.ok(stream.get()).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
index 935514f27b..42d145edb2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java
@@ -24,7 +24,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.sun.jersey.spi.container.ResourceFilters;
@@ -53,6 +52,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
+import java.io.InputStream;
/**
*/
@@ -214,10 +214,10 @@ public class WorkerResource
.build();
}
try {
- final Optional<ByteSource> stream = ((TaskLogStreamer)
taskRunner).streamTaskLog(taskId, offset);
+ final Optional<InputStream> stream = ((TaskLogStreamer)
taskRunner).streamTaskLog(taskId, offset);
if (stream.isPresent()) {
- return Response.ok(stream.get().openStream()).build();
+ return Response.ok(stream.get()).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
index 74503d9b08..22c13e3464 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
@@ -63,7 +63,7 @@ public class FileTaskLogsTest
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L,
"lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
- final byte[] bytes =
ByteStreams.toByteArray(taskLogs.streamTaskLog("foo",
entry.getKey()).get().openStream());
+ final byte[] bytes =
ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get());
final String string = StringUtils.fromUtf8(bytes);
Assert.assertEquals(StringUtils.format("Read with offset %,d",
entry.getKey()), string, entry.getValue());
}
@@ -91,7 +91,7 @@ public class FileTaskLogsTest
Assert.assertEquals(
testReportString,
-
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskReports("foo").get().openStream()))
+
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskReports("foo").get()))
);
}
@@ -147,7 +147,7 @@ public class FileTaskLogsTest
private String readLog(TaskLogs taskLogs, String logFile, long offset)
throws IOException
{
- return
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile,
offset).get().openStream()));
+ return
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile,
offset).get()));
}
private static class TestTaskReport implements TaskReport
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamerTest.java
new file mode 100644
index 0000000000..afe9c62931
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/SwitchingTaskLogStreamerTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.tasklogs;
+
+import com.google.common.base.Optional;
+import com.google.common.io.ByteStreams;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class SwitchingTaskLogStreamerTest
+{
+ private static final String LOG = "LOG";
+ private static final String REPORT = "REPORT";
+ private static final String TASK_ID = "foo";
+
+ private final TaskLogStreamer streamer1 = new TestTaskLogStreamer(1);
+ private final TaskLogStreamer streamer2 = new TestTaskLogStreamer(2);
+ private final TaskLogStreamer emptyStreamer = new NoopTaskLogs();
+ private final TaskLogStreamer ioExceptionStreamer = new TaskLogStreamer()
+ {
+ @Override
+ public Optional<InputStream> streamTaskLog(String taskid, long offset)
throws IOException
+ {
+ throw new IOE("expected log exception");
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
+ {
+ throw new IOE("expected task exception");
+ }
+ };
+
+ @Test
+ public void foundInRemoteTasks() throws IOException
+ {
+ SwitchingTaskLogStreamer switchingTaskLogStreamer = new
SwitchingTaskLogStreamer(
+ streamer1,
+ Arrays.asList(
+ streamer2,
+ emptyStreamer
+ )
+ );
+ Assert.assertEquals(
+ getLogString(1, TASK_ID, 1),
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
1).get()))
+ );
+
+ Assert.assertEquals(
+ getReportString(1, TASK_ID),
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+ );
+ }
+
+ @Test
+ public void foundInDeepStorage() throws IOException
+ {
+
+ SwitchingTaskLogStreamer switchingTaskLogStreamer = new
SwitchingTaskLogStreamer(
+ emptyStreamer,
+ Arrays.asList(
+ streamer2,
+ emptyStreamer
+ )
+ );
+ Assert.assertEquals(
+ getLogString(2, TASK_ID, 1),
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
1).get()))
+ );
+
+ Assert.assertEquals(
+ getReportString(2, TASK_ID),
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+ );
+ }
+
+ @Test
+ public void exceptionInTaskStreamerButFoundInDeepStrorage() throws
IOException
+ {
+ SwitchingTaskLogStreamer switchingTaskLogStreamer = new
SwitchingTaskLogStreamer(
+ ioExceptionStreamer,
+ Arrays.asList(
+ streamer2,
+ emptyStreamer
+ )
+ );
+ Assert.assertEquals(
+ getLogString(2, TASK_ID, 1),
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
1).get()))
+ );
+
+ Assert.assertEquals(
+ getReportString(2, TASK_ID),
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+ );
+ }
+
+
+ @Test
+ public void exceptionInDeepStrorage()
+ {
+ SwitchingTaskLogStreamer switchingTaskLogStreamer = new
SwitchingTaskLogStreamer(
+ emptyStreamer,
+ Arrays.asList(
+ ioExceptionStreamer,
+ streamer2
+ )
+ );
+ Assert.assertThrows("expected log exception", IOException.class, () ->
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
1).get()))
+ );
+
+ Assert.assertThrows("expected report exception", IOException.class, () ->
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+ );
+ }
+
+ @Test
+ public void exceptionInRemoteTaskLogStreamerWithEmptyDeepStorage()
+ {
+ SwitchingTaskLogStreamer switchingTaskLogStreamer = new
SwitchingTaskLogStreamer(
+ ioExceptionStreamer,
+ Collections.singletonList(
+ emptyStreamer
+ )
+ );
+ Assert.assertThrows("expected log exception", IOException.class, () ->
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
1).get()))
+ );
+
+ Assert.assertThrows("expected report exception", IOException.class, () ->
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+ );
+
+ }
+
+ @Test
+ public void exceptionEverywhere()
+ {
+ SwitchingTaskLogStreamer switchingTaskLogStreamer = new
SwitchingTaskLogStreamer(
+ ioExceptionStreamer,
+ Collections.singletonList(
+ ioExceptionStreamer
+ )
+ );
+ Assert.assertThrows("expected log exception", IOException.class, () ->
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
1).get()))
+ );
+
+ Assert.assertThrows("expected report exception", IOException.class, () ->
+
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
+ );
+ }
+
+ @Test
+ public void empty() throws IOException
+ {
+ SwitchingTaskLogStreamer switchingTaskLogStreamer = new
SwitchingTaskLogStreamer(
+ emptyStreamer,
+ Collections.singletonList(
+ emptyStreamer
+ )
+ );
+ Assert.assertFalse(switchingTaskLogStreamer.streamTaskLog(TASK_ID,
1).isPresent());
+
Assert.assertFalse(switchingTaskLogStreamer.streamTaskReports(TASK_ID).isPresent());
+ }
+
+ private static String getLogString(int id, String taskid, long offset)
+ {
+ return StringUtils.format(
+ LOG + " with id %d, task %s and offset %d",
+ id,
+ taskid,
+ offset
+ );
+ }
+
+
+ private static String getReportString(int id, String taskid)
+ {
+ return StringUtils.format(
+ REPORT + " with id %d, task %s",
+ id,
+ taskid
+ );
+ }
+
+ private static class TestTaskLogStreamer implements TaskLogStreamer
+ {
+ private final int id;
+
+ public TestTaskLogStreamer(int id)
+ {
+ this.id = id;
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskLog(String taskid, long offset)
+ {
+ return Optional.of(new ByteArrayInputStream(getLogString(id, taskid,
offset).getBytes(StandardCharsets.UTF_8)));
+ }
+
+
+ @Override
+ public Optional<InputStream> streamTaskReports(String taskid)
+ {
+ return Optional.of(new ByteArrayInputStream(getReportString(id,
taskid).getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 2be0cc6541..ea4c299485 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -1116,7 +1116,7 @@ public class RemoteTaskRunnerTest
);
// Stream task reports from a running task.
- final InputStream in =
remoteTaskRunner.streamTaskReports(task.getId()).get().openStream();
+ final InputStream in =
remoteTaskRunner.streamTaskReports(task.getId()).get();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteStreams.copy(in, baos);
Assert.assertEquals(reportString,
StringUtils.fromUtf8(baos.toByteArray()));
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 14ec99195b..3a45975bf9 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -198,11 +198,15 @@ public class CliOverlord extends ServerRunnable
)
.toProvider(
new ListProvider<TaskLogStreamer>()
- .add(TaskRunnerTaskLogStreamer.class)
.add(TaskLogs.class)
)
.in(LazySingleton.class);
+ binder.bind(TaskLogStreamer.class)
+ .annotatedWith(Names.named("taskstreamer"))
+ .to(TaskRunnerTaskLogStreamer.class)
+ .in(LazySingleton.class);
+
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]