This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa119dfda Add retry to opening retrying stream (#14126)
5aa119dfda is described below
commit 5aa119dfdaed4b0727e62659c18df5816685551a
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Apr 27 16:52:22 2023 +0530
Add retry to opening retrying stream (#14126)
* Add retry to opening retrying stream
* Add retry to S3Entity for network issues
* Fix tests and clean up code
---
.../java/org/apache/druid/storage/s3/S3Utils.java | 3 +
.../druid/data/input/impl/RetryingInputStream.java | 56 ++++++++++++++----
.../data/input/impl/RetryingInputStreamTest.java | 69 +++++++++++++++++-----
3 files changed, 102 insertions(+), 26 deletions(-)
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
index 06a0832410..a8bc009746 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
@@ -79,6 +79,9 @@ public class S3Utils
// SdkClientException can be thrown for many reasons and the only way
to distinguish it is to look at
// the message. This is not ideal, since the message may change, so it
may need to be adjusted in the future.
return true;
+ } else if (e instanceof SdkClientException &&
e.getMessage().contains("Unable to execute HTTP request")) {
+ // This is likely due to a temporary DNS issue and can be retried.
+ return true;
} else if (e instanceof AmazonClientException) {
return
AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) e);
} else {
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
b/processing/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
index d517d5fcb1..99d9bf8070 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
@@ -53,7 +53,7 @@ public class RetryingInputStream<T> extends InputStream
private long startOffset;
// Used in tests to disable waiting.
- private boolean doWait;
+ private final boolean doWait;
/**
* @param object The object entity to open
@@ -69,23 +69,62 @@ public class RetryingInputStream<T> extends InputStream
Predicate<Throwable> retryCondition,
@Nullable Integer maxTries
) throws IOException
+ {
+ this(object, objectOpenFunction, retryCondition, maxTries, true);
+ }
+
+ @VisibleForTesting
+ RetryingInputStream(
+ T object,
+ ObjectOpenFunction<T> objectOpenFunction,
+ Predicate<Throwable> retryCondition,
+ @Nullable Integer maxTries,
+ boolean doWait
+ ) throws IOException
{
this.object = Preconditions.checkNotNull(object, "object");
this.objectOpenFunction = Preconditions.checkNotNull(objectOpenFunction,
"objectOpenFunction");
this.retryCondition = Preconditions.checkNotNull(retryCondition,
"retryCondition");
this.maxTries = maxTries == null ? RetryUtils.DEFAULT_MAX_TRIES : maxTries;
- this.delegate = new CountingInputStream(objectOpenFunction.open(object));
- this.doWait = true;
+ this.doWait = doWait;
if (this.maxTries <= 1) {
throw new IAE("maxTries must be greater than 1");
}
+ openWithRetry(0);
}
private void openIfNeeded() throws IOException
{
if (delegate == null) {
- delegate = new CountingInputStream(objectOpenFunction.open(object,
startOffset));
+ openWithRetry(startOffset);
+ }
+ }
+
+ private void openWithRetry(final long offset) throws IOException
+ {
+ for (int nTry = 0; nTry < maxTries; nTry++) {
+ try {
+ delegate = new CountingInputStream(objectOpenFunction.open(object,
offset));
+ break;
+ }
+ catch (Throwable t) {
+ final int nextTry = nTry + 1;
+ if (nextTry < maxTries && retryCondition.apply(t)) {
+ final String message = StringUtils.format("Stream interrupted at
position [%d]", offset);
+ try {
+ if (doWait) {
+ RetryUtils.awaitNextRetry(t, message, nextTry, maxTries, false);
+ }
+ }
+ catch (InterruptedException e) {
+ t.addSuppressed(e);
+ throwAsIOException(t);
+ }
+ } else {
+ throwAsIOException(t);
+ }
+ }
}
}
@@ -115,8 +154,7 @@ public class RetryingInputStream<T> extends InputStream
if (doWait) {
RetryUtils.awaitNextRetry(t, message, nextTry, maxTries, false);
}
-
- delegate = new CountingInputStream(objectOpenFunction.open(object,
startOffset));
+ openWithRetry(startOffset);
}
catch (InterruptedException | IOException e) {
t.addSuppressed(e);
@@ -233,10 +271,4 @@ public class RetryingInputStream<T> extends InputStream
delegate.close();
}
}
-
- @VisibleForTesting
- void setNoWait()
- {
- this.doWait = false;
- }
}
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/RetryingInputStreamTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/RetryingInputStreamTest.java
index 28726e6e1f..c6eac5212e 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/RetryingInputStreamTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/RetryingInputStreamTest.java
@@ -31,6 +31,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import javax.annotation.Nonnull;
import java.io.DataInputStream;
@@ -44,6 +46,13 @@ import java.io.InputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class RetryingInputStreamTest
{
private static final int MAX_RETRY = 5;
@@ -58,7 +67,7 @@ public class RetryingInputStreamTest
private int throwIOExceptions = 0;
- private final ObjectOpenFunction<File> objectOpenFunction = new
ObjectOpenFunction<File>()
+ private final ObjectOpenFunction<File> objectOpenFunction = spy(new
ObjectOpenFunction<File>()
{
@Override
public InputStream open(File object) throws IOException
@@ -73,7 +82,7 @@ public class RetryingInputStreamTest
Preconditions.checkState(fis.skip(start) == start);
return new TestInputStream(fis);
}
- };
+ });
@Before
public void setup() throws IOException
@@ -107,10 +116,10 @@ public class RetryingInputStreamTest
testFile,
objectOpenFunction,
t -> false, // will not retry
- MAX_RETRY
+ MAX_RETRY,
+ false
);
- retryingInputStream.setNoWait();
Assert.assertThrows(
IOException.class,
() -> retryHelper(retryingInputStream)
@@ -127,10 +136,10 @@ public class RetryingInputStreamTest
testFile,
objectOpenFunction,
t -> t instanceof CustomException,
- MAX_RETRY
+ MAX_RETRY,
+ false
);
- retryingInputStream.setNoWait();
retryHelper(retryingInputStream);
Assert.assertEquals(0, throwCustomExceptions);
@@ -144,10 +153,10 @@ public class RetryingInputStreamTest
testFile,
objectOpenFunction,
t -> false, // will not retry
- MAX_RETRY
+ MAX_RETRY,
+ false
);
- retryingInputStream.setNoWait();
final IOException e = Assert.assertThrows(
IOException.class,
() -> retryHelper(retryingInputStream)
@@ -167,10 +176,10 @@ public class RetryingInputStreamTest
testFile,
objectOpenFunction,
t -> true, // always retry
- MAX_RETRY
+ MAX_RETRY,
+ false
);
- retryingInputStream.setNoWait();
retryHelper(retryingInputStream);
// Tried more than MAX_RETRY times because progress was being made.
(MAX_RETRIES applies to each call individually.)
@@ -185,10 +194,10 @@ public class RetryingInputStreamTest
testFile,
objectOpenFunction,
t -> t instanceof IOException,
- MAX_RETRY
+ MAX_RETRY,
+ false
);
- retryingInputStream.setNoWait();
Assert.assertThrows(
IOException.class,
() -> retryHelper(retryingInputStream)
@@ -206,16 +215,48 @@ public class RetryingInputStreamTest
testFile,
objectOpenFunction,
t -> t instanceof IOException || t instanceof CustomException,
- MAX_RETRY
+ MAX_RETRY,
+ false
);
- retryingInputStream.setNoWait();
retryHelper(retryingInputStream);
Assert.assertEquals(0, throwCustomExceptions);
Assert.assertEquals(0, throwIOExceptions);
}
+ @Test
+ public void testRetryOnExceptionWhenOpeningStream() throws Exception
+ {
+ throwCustomExceptions = 2;
+
+ doAnswer(new Answer<InputStream>()
+ {
+ int retryCount = 0;
+ @Override
+ public InputStream answer(InvocationOnMock invocation) throws Throwable
+ {
+ if (retryCount < 2) {
+ retryCount += 1;
+ throwCustomExceptions -= 1;
+ throw new CustomException("I am a custom retryable exception", new
RuntimeException());
+ } else {
+ return (InputStream) invocation.callRealMethod();
+ }
+ }
+ }).when(objectOpenFunction).open(any(), anyLong());
+
+ new RetryingInputStream<>(
+ testFile,
+ objectOpenFunction,
+ t -> t instanceof CustomException,
+ MAX_RETRY,
+ false
+ );
+ verify(objectOpenFunction, times(3)).open(any(), anyLong());
+ Assert.assertEquals(0, throwCustomExceptions);
+ }
+
private void retryHelper(RetryingInputStream<File> retryingInputStream)
throws IOException
{
try (DataInputStream inputStream = new DataInputStream(new
GZIPInputStream(retryingInputStream))) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]