This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aa119dfda Add retry to opening retrying stream (#14126)
5aa119dfda is described below

commit 5aa119dfdaed4b0727e62659c18df5816685551a
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Apr 27 16:52:22 2023 +0530

    Add retry to opening retrying stream (#14126)
    
    * Add retry to opening retrying stream
    * Add retry to S3Entity for network issues
    
    * Fix tests and clean up code
---
 .../java/org/apache/druid/storage/s3/S3Utils.java  |  3 +
 .../druid/data/input/impl/RetryingInputStream.java | 56 ++++++++++++++----
 .../data/input/impl/RetryingInputStreamTest.java   | 69 +++++++++++++++++-----
 3 files changed, 102 insertions(+), 26 deletions(-)

diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
index 06a0832410..a8bc009746 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
@@ -79,6 +79,9 @@ public class S3Utils
         // SdkClientException can be thrown for many reasons and the only way 
to distinguish it is to look at
         // the message. This is not ideal, since the message may change, so it 
may need to be adjusted in the future.
         return true;
+      } else if (e instanceof SdkClientException && 
e.getMessage().contains("Unable to execute HTTP request")) {
+        // This is likely due to a temporary DNS issue and can be retried.
+        return true;
       } else if (e instanceof AmazonClientException) {
         return 
AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) e);
       } else {
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
index d517d5fcb1..99d9bf8070 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
@@ -53,7 +53,7 @@ public class RetryingInputStream<T> extends InputStream
   private long startOffset;
 
   // Used in tests to disable waiting.
-  private boolean doWait;
+  private final boolean doWait;
 
   /**
    * @param object             The object entity to open
@@ -69,23 +69,62 @@ public class RetryingInputStream<T> extends InputStream
       Predicate<Throwable> retryCondition,
       @Nullable Integer maxTries
   ) throws IOException
+  {
+    this(object, objectOpenFunction, retryCondition, maxTries, true);
+  }
+
+  @VisibleForTesting
+  RetryingInputStream(
+      T object,
+      ObjectOpenFunction<T> objectOpenFunction,
+      Predicate<Throwable> retryCondition,
+      @Nullable Integer maxTries,
+      boolean doWait
+  ) throws IOException
   {
     this.object = Preconditions.checkNotNull(object, "object");
     this.objectOpenFunction = Preconditions.checkNotNull(objectOpenFunction, 
"objectOpenFunction");
     this.retryCondition = Preconditions.checkNotNull(retryCondition, 
"retryCondition");
     this.maxTries = maxTries == null ? RetryUtils.DEFAULT_MAX_TRIES : maxTries;
-    this.delegate = new CountingInputStream(objectOpenFunction.open(object));
-    this.doWait = true;
+    this.doWait = doWait;
 
     if (this.maxTries <= 1) {
       throw new IAE("maxTries must be greater than 1");
     }
+    openWithRetry(0);
   }
 
   private void openIfNeeded() throws IOException
   {
     if (delegate == null) {
-      delegate = new CountingInputStream(objectOpenFunction.open(object, 
startOffset));
+      openWithRetry(startOffset);
+    }
+  }
+
+  private void openWithRetry(final long offset) throws IOException
+  {
+    for (int nTry = 0; nTry < maxTries; nTry++) {
+      try {
+        delegate = new CountingInputStream(objectOpenFunction.open(object, 
offset));
+        break;
+      }
+      catch (Throwable t) {
+        final int nextTry = nTry + 1;
+        if (nextTry < maxTries && retryCondition.apply(t)) {
+          final String message = StringUtils.format("Stream interrupted at 
position [%d]", offset);
+          try {
+            if (doWait) {
+              RetryUtils.awaitNextRetry(t, message, nextTry, maxTries, false);
+            }
+          }
+          catch (InterruptedException e) {
+            t.addSuppressed(e);
+            throwAsIOException(t);
+          }
+        } else {
+          throwAsIOException(t);
+        }
+      }
     }
   }
 
@@ -115,8 +154,7 @@ public class RetryingInputStream<T> extends InputStream
         if (doWait) {
           RetryUtils.awaitNextRetry(t, message, nextTry, maxTries, false);
         }
-
-        delegate = new CountingInputStream(objectOpenFunction.open(object, 
startOffset));
+        openWithRetry(startOffset);
       }
       catch (InterruptedException | IOException e) {
         t.addSuppressed(e);
@@ -233,10 +271,4 @@ public class RetryingInputStream<T> extends InputStream
       delegate.close();
     }
   }
-
-  @VisibleForTesting
-  void setNoWait()
-  {
-    this.doWait = false;
-  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/RetryingInputStreamTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/RetryingInputStreamTest.java
index 28726e6e1f..c6eac5212e 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/RetryingInputStreamTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/RetryingInputStreamTest.java
@@ -31,6 +31,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import javax.annotation.Nonnull;
 import java.io.DataInputStream;
@@ -44,6 +46,13 @@ import java.io.InputStream;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 public class RetryingInputStreamTest
 {
   private static final int MAX_RETRY = 5;
@@ -58,7 +67,7 @@ public class RetryingInputStreamTest
   private int throwIOExceptions = 0;
 
 
-  private final ObjectOpenFunction<File> objectOpenFunction = new 
ObjectOpenFunction<File>()
+  private final ObjectOpenFunction<File> objectOpenFunction = spy(new 
ObjectOpenFunction<File>()
   {
     @Override
     public InputStream open(File object) throws IOException
@@ -73,7 +82,7 @@ public class RetryingInputStreamTest
       Preconditions.checkState(fis.skip(start) == start);
       return new TestInputStream(fis);
     }
-  };
+  });
 
   @Before
   public void setup() throws IOException
@@ -107,10 +116,10 @@ public class RetryingInputStreamTest
         testFile,
         objectOpenFunction,
         t -> false, // will not retry
-        MAX_RETRY
+        MAX_RETRY,
+        false
     );
 
-    retryingInputStream.setNoWait();
     Assert.assertThrows(
         IOException.class,
         () -> retryHelper(retryingInputStream)
@@ -127,10 +136,10 @@ public class RetryingInputStreamTest
         testFile,
         objectOpenFunction,
         t -> t instanceof CustomException,
-        MAX_RETRY
+        MAX_RETRY,
+        false
     );
 
-    retryingInputStream.setNoWait();
     retryHelper(retryingInputStream);
 
     Assert.assertEquals(0, throwCustomExceptions);
@@ -144,10 +153,10 @@ public class RetryingInputStreamTest
         testFile,
         objectOpenFunction,
         t -> false, // will not retry
-        MAX_RETRY
+        MAX_RETRY,
+        false
     );
 
-    retryingInputStream.setNoWait();
     final IOException e = Assert.assertThrows(
         IOException.class,
         () -> retryHelper(retryingInputStream)
@@ -167,10 +176,10 @@ public class RetryingInputStreamTest
         testFile,
         objectOpenFunction,
         t -> true, // always retry
-        MAX_RETRY
+        MAX_RETRY,
+        false
     );
 
-    retryingInputStream.setNoWait();
     retryHelper(retryingInputStream);
 
     // Tried more than MAX_RETRY times because progress was being made. 
(MAX_RETRIES applies to each call individually.)
@@ -185,10 +194,10 @@ public class RetryingInputStreamTest
         testFile,
         objectOpenFunction,
         t -> t instanceof IOException,
-        MAX_RETRY
+        MAX_RETRY,
+        false
     );
 
-    retryingInputStream.setNoWait();
     Assert.assertThrows(
         IOException.class,
         () -> retryHelper(retryingInputStream)
@@ -206,16 +215,48 @@ public class RetryingInputStreamTest
         testFile,
         objectOpenFunction,
         t -> t instanceof IOException || t instanceof CustomException,
-        MAX_RETRY
+        MAX_RETRY,
+        false
     );
 
-    retryingInputStream.setNoWait();
     retryHelper(retryingInputStream);
 
     Assert.assertEquals(0, throwCustomExceptions);
     Assert.assertEquals(0, throwIOExceptions);
   }
 
+  @Test
+  public void testRetryOnExceptionWhenOpeningStream() throws Exception
+  {
+    throwCustomExceptions = 2;
+
+    doAnswer(new Answer<InputStream>()
+    {
+      int retryCount = 0;
+      @Override
+      public InputStream answer(InvocationOnMock invocation) throws Throwable
+      {
+        if (retryCount < 2) {
+          retryCount += 1;
+          throwCustomExceptions -= 1;
+          throw new CustomException("I am a custom retryable exception", new 
RuntimeException());
+        } else {
+          return (InputStream) invocation.callRealMethod();
+        }
+      }
+    }).when(objectOpenFunction).open(any(), anyLong());
+
+    new RetryingInputStream<>(
+        testFile,
+        objectOpenFunction,
+        t -> t instanceof CustomException,
+        MAX_RETRY,
+        false
+    );
+    verify(objectOpenFunction, times(3)).open(any(), anyLong());
+    Assert.assertEquals(0, throwCustomExceptions);
+  }
+
   private void retryHelper(RetryingInputStream<File> retryingInputStream) 
throws IOException
   {
     try (DataInputStream inputStream = new DataInputStream(new 
GZIPInputStream(retryingInputStream))) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to