kfaraz commented on code in PR #14714:
URL: https://github.com/apache/druid/pull/14714#discussion_r1282607572
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java:
##########
@@ -67,57 +66,64 @@ public S3TaskLogs(
public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "log");
- return streamTaskFile(offset, taskKey);
+ return streamTaskFileWithRetry(offset, taskKey);
}
@Override
public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
- return streamTaskFile(0, taskKey);
+ return streamTaskFileWithRetry(0, taskKey);
}
@Override
public Optional<InputStream> streamTaskStatus(String taskid) throws
IOException
{
final String taskKey = getTaskLogKey(taskid, "status.json");
- return streamTaskFile(0, taskKey);
+ return streamTaskFileWithRetry(0, taskKey);
}
- private Optional<InputStream> streamTaskFile(final long offset, String
taskKey) throws IOException
+ /**
+ * Using the retry conditions defined in {@link S3Utils#S3RETRY}.
+ */
+ private Optional<InputStream> streamTaskFileWithRetry(final long offset,
String taskKey) throws IOException
+ {
+ try {
+ return S3Utils.retryS3Operation(() -> streamTaskFile(offset, taskKey));
+ }
+ catch (Exception e) {
+ throw new IOE(e, "Failed to stream logs from: %s", taskKey);
+ }
+ }
+
+ private Optional<InputStream> streamTaskFile(final long offset, String
taskKey)
{
try {
final ObjectMetadata objectMetadata =
service.getObjectMetadata(config.getS3Bucket(), taskKey);
- try {
- final long start;
- final long end = objectMetadata.getContentLength() - 1;
+ final long start;
+ final long end = objectMetadata.getContentLength() - 1;
- if (offset > 0 && offset < objectMetadata.getContentLength()) {
- start = offset;
- } else if (offset < 0 && (-1 * offset) <
objectMetadata.getContentLength()) {
- start = objectMetadata.getContentLength() + offset;
- } else {
- start = 0;
- }
+ long contentLength = objectMetadata.getContentLength();
+ if (offset >= contentLength || offset <= -contentLength) {
+ start = 0;
+ } else {
+ start = offset >= 0 ? offset : contentLength + offset;
+ }
Review Comment:
best not to mix ternary operator and if-else
```suggestion
} else if (offset >= 0) {
start = offset;
} else {
start = contentLength + offset;
}
```
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java:
##########
@@ -67,57 +66,64 @@ public S3TaskLogs(
public Optional<InputStream> streamTaskLog(final String taskid, final long
offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "log");
- return streamTaskFile(offset, taskKey);
+ return streamTaskFileWithRetry(offset, taskKey);
}
@Override
public Optional<InputStream> streamTaskReports(String taskid) throws
IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
- return streamTaskFile(0, taskKey);
+ return streamTaskFileWithRetry(0, taskKey);
}
@Override
public Optional<InputStream> streamTaskStatus(String taskid) throws
IOException
{
final String taskKey = getTaskLogKey(taskid, "status.json");
- return streamTaskFile(0, taskKey);
+ return streamTaskFileWithRetry(0, taskKey);
}
- private Optional<InputStream> streamTaskFile(final long offset, String
taskKey) throws IOException
+ /**
+ * Using the retry conditions defined in {@link S3Utils#S3RETRY}.
+ */
+ private Optional<InputStream> streamTaskFileWithRetry(final long offset,
String taskKey) throws IOException
+ {
+ try {
+ return S3Utils.retryS3Operation(() -> streamTaskFile(offset, taskKey));
+ }
+ catch (Exception e) {
+ throw new IOE(e, "Failed to stream logs from: %s", taskKey);
Review Comment:
```suggestion
throw new IOE(e, "Failed to stream logs for task[%s] starting at
offset[%d]", taskKey, offset);
```
##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java:
##########
@@ -487,6 +488,42 @@ public void test_status_fetch() throws IOException
Assert.assertEquals(STATUS_CONTENTS, report);
}
+ @Test
+ public void test_retryStatusFetch_whenExceptionThrown() throws IOException
+ {
+ EasyMock.reset(s3Client);
+ AmazonS3Exception awsError = new AmazonS3Exception("AWS Error");
+ awsError.setErrorCode("503");
+ awsError.setStatusCode(503);
+ EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyString(),
EasyMock.anyString())).andThrow(awsError);
+ EasyMock.expectLastCall().once();
+ String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json";
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(STATUS_CONTENTS.length());
+ EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET,
logPath)).andReturn(objectMetadata);
+ S3Object s3Object = new S3Object();
+ s3Object.setObjectContent(new
ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+ GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET,
logPath);
+ getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1);
+ getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+ EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+ EasyMock.expectLastCall().once();
+ replayAll();
Review Comment:
Some 1-line comments might be good here, or at least a logical separation
using newlines.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]