This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new dd03f7061a7 Implement task payload management with azure storage
(#15695)
dd03f7061a7 is described below
commit dd03f7061a722d111491390a9a3da22ecab0c3c1
Author: George Shiqi Wu <[email protected]>
AuthorDate: Thu Jan 18 13:21:36 2024 -0500
Implement task payload management with azure storage (#15695)
* Implement task payload manageent with azure storage
* Remove unnecessary parameter
---
.../apache/druid/storage/azure/AzureTaskLogs.java | 27 +++++-
.../druid/storage/azure/AzureTaskLogsTest.java | 97 ++++++++++++++++++++++
2 files changed, 120 insertions(+), 4 deletions(-)
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index edec449c43d..74e928edc67 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -99,25 +99,39 @@ public class AzureTaskLogs implements TaskLogs
}
}
+ @Override
+ public void pushTaskPayload(String taskid, File taskPayloadFile)
+ {
+ final String taskKey = getTaskPayloadKey(taskid);
+ log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile,
taskKey);
+ pushTaskFile(taskPayloadFile, taskKey);
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskPayload(String taskid) throws
IOException
+ {
+ return streamTaskFile(0, getTaskPayloadKey(taskid));
+ }
+
@Override
public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
- return streamTaskFile(taskid, offset, getTaskLogKey(taskid));
+ return streamTaskFile(offset, getTaskLogKey(taskid));
}
@Override
public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
{
- return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
+ return streamTaskFile(0, getTaskReportsKey(taskid));
}
@Override
public Optional<InputStream> streamTaskStatus(String taskid) throws
IOException
{
- return streamTaskFile(taskid, 0, getTaskStatusKey(taskid));
+ return streamTaskFile(0, getTaskStatusKey(taskid));
}
- private Optional<InputStream> streamTaskFile(final String taskid, final long
offset, String taskKey)
+ private Optional<InputStream> streamTaskFile(final long offset, String
taskKey)
throws IOException
{
final String container = config.getContainer();
@@ -166,6 +180,11 @@ public class AzureTaskLogs implements TaskLogs
return StringUtils.format("%s/%s/status.json", config.getPrefix(), taskid);
}
+ private String getTaskPayloadKey(String taskid)
+ {
+ return StringUtils.format("%s/%s/task.json", config.getPrefix(), taskid);
+ }
+
@Override
public void killAll() throws IOException
{
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 3ccb5fd448e..3ce9d8fb3b2 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -181,6 +181,29 @@ public class AzureTaskLogsTest extends EasyMockSupport
}
}
+ @Test
+ public void test_PushTaskPayload_uploadsBlob() throws Exception
+ {
+ final File tmpDir = FileUtils.createTempDir();
+
+ try {
+ final File taskFile = new File(tmpDir, "task.json");
+
+
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
+ azureStorage.uploadBlockBlob(taskFile, CONTAINER, PREFIX + "/" + TASK_ID
+ "/task.json", MAX_TRIES);
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ azureTaskLogs.pushTaskPayload(TASK_ID, taskFile);
+
+ verifyAll();
+ }
+ finally {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+ }
+
@Test(expected = RuntimeException.class)
public void test_PushTaskReports_exception_rethrowsException() throws
Exception
{
@@ -422,6 +445,80 @@ public class AzureTaskLogsTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void test_streamTaskPayload_blobExists_succeeds() throws Exception
+ {
+ final String taskPayload = "{}";
+
+ final String blobPath = PREFIX + "/" + TASK_ID + "/task.json";
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER,
blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER,
blobPath)).andReturn((long) taskPayload.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER,
blobPath)).andReturn(
+ new
ByteArrayInputStream(taskPayload.getBytes(StandardCharsets.UTF_8)));
+
+
+ replayAll();
+
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskPayload(TASK_ID);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(stream.get(), writer, "UTF-8");
+ Assert.assertEquals(writer.toString(), taskPayload);
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_streamTaskPayload_blobDoesNotExist_returnsAbsent() throws
Exception
+ {
+ final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/task.json";
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER,
blobPath)).andReturn(false);
+
+ replayAll();
+
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskPayload(TASK_ID_NOT_FOUND);
+
+
+ Assert.assertFalse(stream.isPresent());
+
+ verifyAll();
+ }
+
+ @Test(expected = IOException.class)
+ public void
test_streamTaskPayload_exceptionWhenGettingStream_throwsException() throws
Exception
+ {
+ final String taskPayload = "{}";
+
+ final String blobPath = PREFIX + "/" + TASK_ID + "/task.json";
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER,
blobPath)).andReturn(true);
+ EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER,
blobPath)).andReturn((long) taskPayload.length());
+ EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER,
blobPath)).andThrow(
+ new BlobStorageException("", null, null));
+
+
+ replayAll();
+
+ final Optional<InputStream> stream =
azureTaskLogs.streamTaskPayload(TASK_ID);
+
+ final StringWriter writer = new StringWriter();
+ IOUtils.copy(stream.get(), writer, "UTF-8");
+ verifyAll();
+ }
+
+ @Test(expected = IOException.class)
+ public void
test_streamTaskPayload_exceptionWhenCheckingBlobExistence_throwsException()
throws Exception
+ {
+ final String blobPath = PREFIX + "/" + TASK_ID + "/task.json";
+ EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER,
blobPath)).andThrow(new BlobStorageException("", null, null));
+
+ replayAll();
+
+ azureTaskLogs.streamTaskPayload(TASK_ID);
+
+ verifyAll();
+ }
+
+
@Test
public void test_killAll_noException_deletesAllTaskLogs() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]