kfaraz commented on code in PR #14714:
URL: https://github.com/apache/druid/pull/14714#discussion_r1282607572


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java:
##########
@@ -67,57 +66,64 @@ public S3TaskLogs(
   public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
     final String taskKey = getTaskLogKey(taskid, "log");
-    return streamTaskFile(offset, taskKey);
+    return streamTaskFileWithRetry(offset, taskKey);
   }
 
   @Override
   public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
   {
     final String taskKey = getTaskLogKey(taskid, "report.json");
-    return streamTaskFile(0, taskKey);
+    return streamTaskFileWithRetry(0, taskKey);
   }
 
   @Override
   public Optional<InputStream> streamTaskStatus(String taskid) throws 
IOException
   {
     final String taskKey = getTaskLogKey(taskid, "status.json");
-    return streamTaskFile(0, taskKey);
+    return streamTaskFileWithRetry(0, taskKey);
   }
 
-  private Optional<InputStream> streamTaskFile(final long offset, String 
taskKey) throws IOException
+  /**
+   * Using the retry conditions defined in {@link S3Utils#S3RETRY}.
+   */
+  private Optional<InputStream> streamTaskFileWithRetry(final long offset, 
String taskKey) throws IOException
+  {
+    try {
+      return S3Utils.retryS3Operation(() -> streamTaskFile(offset, taskKey));
+    }
+    catch (Exception e) {
+      throw new IOE(e, "Failed to stream logs from: %s", taskKey);
+    }
+  }
+
+  private Optional<InputStream> streamTaskFile(final long offset, String 
taskKey)
   {
     try {
       final ObjectMetadata objectMetadata = 
service.getObjectMetadata(config.getS3Bucket(), taskKey);
 
-      try {
-        final long start;
-        final long end = objectMetadata.getContentLength() - 1;
+      final long start;
+      final long end = objectMetadata.getContentLength() - 1;
 
-        if (offset > 0 && offset < objectMetadata.getContentLength()) {
-          start = offset;
-        } else if (offset < 0 && (-1 * offset) < 
objectMetadata.getContentLength()) {
-          start = objectMetadata.getContentLength() + offset;
-        } else {
-          start = 0;
-        }
+      long contentLength = objectMetadata.getContentLength();
+      if (offset >= contentLength || offset <= -contentLength) {
+        start = 0;
+      } else {
+        start = offset >= 0 ? offset : contentLength + offset;
+      }

Review Comment:
   best not to mix ternary operator and if-else
   ```suggestion
         } else if (offset >= 0) {
           start = offset;
         } else {
           start = contentLength + offset;
         }
   ```



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java:
##########
@@ -67,57 +66,64 @@ public S3TaskLogs(
   public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
     final String taskKey = getTaskLogKey(taskid, "log");
-    return streamTaskFile(offset, taskKey);
+    return streamTaskFileWithRetry(offset, taskKey);
   }
 
   @Override
   public Optional<InputStream> streamTaskReports(String taskid) throws 
IOException
   {
     final String taskKey = getTaskLogKey(taskid, "report.json");
-    return streamTaskFile(0, taskKey);
+    return streamTaskFileWithRetry(0, taskKey);
   }
 
   @Override
   public Optional<InputStream> streamTaskStatus(String taskid) throws 
IOException
   {
     final String taskKey = getTaskLogKey(taskid, "status.json");
-    return streamTaskFile(0, taskKey);
+    return streamTaskFileWithRetry(0, taskKey);
   }
 
-  private Optional<InputStream> streamTaskFile(final long offset, String 
taskKey) throws IOException
+  /**
+   * Using the retry conditions defined in {@link S3Utils#S3RETRY}.
+   */
+  private Optional<InputStream> streamTaskFileWithRetry(final long offset, 
String taskKey) throws IOException
+  {
+    try {
+      return S3Utils.retryS3Operation(() -> streamTaskFile(offset, taskKey));
+    }
+    catch (Exception e) {
+      throw new IOE(e, "Failed to stream logs from: %s", taskKey);

Review Comment:
   ```suggestion
         throw new IOE(e, "Failed to stream logs for task[%s] starting at 
offset[%d]", taskKey, offset);
   ```



##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java:
##########
@@ -487,6 +488,42 @@ public void test_status_fetch() throws IOException
     Assert.assertEquals(STATUS_CONTENTS, report);
   }
 
+  @Test
+  public void test_retryStatusFetch_whenExceptionThrown() throws IOException
+  {
+    EasyMock.reset(s3Client);
+    AmazonS3Exception awsError = new AmazonS3Exception("AWS Error");
+    awsError.setErrorCode("503");
+    awsError.setStatusCode(503);
+    EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyString(), 
EasyMock.anyString())).andThrow(awsError);
+    EasyMock.expectLastCall().once();
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(STATUS_CONTENTS.length());
+    EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+    S3Object s3Object = new S3Object();
+    s3Object.setObjectContent(new 
ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+    GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, 
logPath);
+    getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1);
+    getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+    EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+    EasyMock.expectLastCall().once();
+    replayAll();

Review Comment:
   Some 1-line comments might be good here, or at least a logical separation 
using newlines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to