This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new dd03f7061a7 Implement task payload management with azure storage 
(#15695)
dd03f7061a7 is described below

commit dd03f7061a722d111491390a9a3da22ecab0c3c1
Author: George Shiqi Wu <[email protected]>
AuthorDate: Thu Jan 18 13:21:36 2024 -0500

    Implement task payload management with azure storage (#15695)
    
    * Implement task payload manageent with azure storage
    
    * Remove unnecessary parameter
---
 .../apache/druid/storage/azure/AzureTaskLogs.java  | 27 +++++-
 .../druid/storage/azure/AzureTaskLogsTest.java     | 97 ++++++++++++++++++++++
 2 files changed, 120 insertions(+), 4 deletions(-)

diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index edec449c43d..74e928edc67 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -99,25 +99,39 @@ public class AzureTaskLogs implements TaskLogs
     }
   }
 
+  @Override
+  public void pushTaskPayload(String taskid, File taskPayloadFile)
+  {
+    final String taskKey = getTaskPayloadKey(taskid);
+    log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile, 
taskKey);
+    pushTaskFile(taskPayloadFile, taskKey);
+  }
+
+  @Override
+  public Optional<InputStream> streamTaskPayload(String taskid) throws 
IOException
+  {
+    return streamTaskFile(0, getTaskPayloadKey(taskid));
+  }
+
   @Override
   public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
-    return streamTaskFile(taskid, offset, getTaskLogKey(taskid));
+    return streamTaskFile(offset, getTaskLogKey(taskid));
   }
 
   @Override
   public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
   {
-    return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
+    return streamTaskFile(0, getTaskReportsKey(taskid));
   }
 
   @Override
   public Optional<InputStream> streamTaskStatus(String taskid) throws 
IOException
   {
-    return streamTaskFile(taskid, 0, getTaskStatusKey(taskid));
+    return streamTaskFile(0, getTaskStatusKey(taskid));
   }
 
-  private Optional<InputStream> streamTaskFile(final String taskid, final long 
offset, String taskKey)
+  private Optional<InputStream> streamTaskFile(final long offset, String 
taskKey)
       throws IOException
   {
     final String container = config.getContainer();
@@ -166,6 +180,11 @@ public class AzureTaskLogs implements TaskLogs
     return StringUtils.format("%s/%s/status.json", config.getPrefix(), taskid);
   }
 
+  private String getTaskPayloadKey(String taskid)
+  {
+    return StringUtils.format("%s/%s/task.json", config.getPrefix(), taskid);
+  }
+
   @Override
   public void killAll() throws IOException
   {
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 3ccb5fd448e..3ce9d8fb3b2 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -181,6 +181,29 @@ public class AzureTaskLogsTest extends EasyMockSupport
     }
   }
 
+  @Test
+  public void test_PushTaskPayload_uploadsBlob() throws Exception
+  {
+    final File tmpDir = FileUtils.createTempDir();
+
+    try {
+      final File taskFile = new File(tmpDir, "task.json");
+
+      
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
+      azureStorage.uploadBlockBlob(taskFile, CONTAINER, PREFIX + "/" + TASK_ID 
+ "/task.json", MAX_TRIES);
+      EasyMock.expectLastCall();
+
+      replayAll();
+
+      azureTaskLogs.pushTaskPayload(TASK_ID, taskFile);
+
+      verifyAll();
+    }
+    finally {
+      FileUtils.deleteDirectory(tmpDir);
+    }
+  }
+
   @Test(expected = RuntimeException.class)
   public void test_PushTaskReports_exception_rethrowsException() throws 
Exception
   {
@@ -422,6 +445,80 @@ public class AzureTaskLogsTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void test_streamTaskPayload_blobExists_succeeds() throws Exception
+  {
+    final String taskPayload = "{}";
+
+    final String blobPath = PREFIX + "/" + TASK_ID + "/task.json";
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) taskPayload.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andReturn(
+        new 
ByteArrayInputStream(taskPayload.getBytes(StandardCharsets.UTF_8)));
+
+
+    replayAll();
+
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskPayload(TASK_ID);
+
+    final StringWriter writer = new StringWriter();
+    IOUtils.copy(stream.get(), writer, "UTF-8");
+    Assert.assertEquals(writer.toString(), taskPayload);
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_streamTaskPayload_blobDoesNotExist_returnsAbsent() throws 
Exception
+  {
+    final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/task.json";
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(false);
+
+    replayAll();
+
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskPayload(TASK_ID_NOT_FOUND);
+
+
+    Assert.assertFalse(stream.isPresent());
+
+    verifyAll();
+  }
+
+  @Test(expected = IOException.class)
+  public void 
test_streamTaskPayload_exceptionWhenGettingStream_throwsException() throws 
Exception
+  {
+    final String taskPayload = "{}";
+
+    final String blobPath = PREFIX + "/" + TASK_ID + "/task.json";
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, 
blobPath)).andReturn((long) taskPayload.length());
+    EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, 
blobPath)).andThrow(
+        new BlobStorageException("", null, null));
+
+
+    replayAll();
+
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskPayload(TASK_ID);
+
+    final StringWriter writer = new StringWriter();
+    IOUtils.copy(stream.get(), writer, "UTF-8");
+    verifyAll();
+  }
+
+  @Test(expected = IOException.class)
+  public void 
test_streamTaskPayload_exceptionWhenCheckingBlobExistence_throwsException() 
throws Exception
+  {
+    final String blobPath = PREFIX + "/" + TASK_ID + "/task.json";
+    EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, 
blobPath)).andThrow(new BlobStorageException("", null, null));
+
+    replayAll();
+
+    azureTaskLogs.streamTaskPayload(TASK_ID);
+
+    verifyAll();
+  }
+
+
   @Test
   public void test_killAll_noException_deletesAllTaskLogs() throws Exception
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to