jihoonson commented on a change in pull request #12307:
URL: https://github.com/apache/druid/pull/12307#discussion_r819955319



##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
##########
@@ -131,76 +136,92 @@ private static void throwAsIOException(Throwable t) 
throws IOException
   @Override
   public int read() throws IOException
   {
-    for (int nTry = 0; nTry < maxRetry; nTry++) {
+    openIfNeeded();
+
+    for (int nTry = 0; nTry < maxTries; nTry++) {
       try {
         return delegate.read();
       }
       catch (Throwable t) {
         waitOrThrow(t, nTry);
       }
     }
-    return delegate.read();
+
+    // Can't happen, because the final waitOrThrow would have thrown.
+    throw new IllegalStateException();
   }
 
   @Override
   public int read(byte[] b) throws IOException
   {
-    for (int nTry = 0; nTry < maxRetry; nTry++) {
-      try {
-        return delegate.read(b);
-      }
-      catch (Throwable t) {
-        waitOrThrow(t, nTry);
-      }
-    }
-    return delegate.read(b);
+    return read(b, 0, b.length);

Review comment:
       I overrode this method just in case `delegate.read(byte[])` does 
something else than simply calling `read(b, 0, b.length)` even though it's very 
rare (and likely a wrong implementation). I think it would be safer to keep it.

##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
##########
@@ -19,99 +19,104 @@
 
 package org.apache.druid.data.input.impl;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 import com.google.common.io.CountingInputStream;
 import org.apache.druid.data.input.impl.prefetch.Fetcher;
 import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.SocketException;
 
 /**
  * This class is used by {@link Fetcher} when prefetch is disabled. It's 
responsible for re-opening the underlying input
- * stream for the input object on the socket connection reset as well as the 
given {@link #retryCondition}.
+ * stream for the input object on the given {@link #retryCondition}.
  *
  * @param <T> object type
  */
 public class RetryingInputStream<T> extends InputStream
 {
-
-  public static final Predicate<Throwable> DEFAULT_RETRY_CONDITION = 
Predicates.alwaysFalse();
-  public static final Predicate<Throwable> DEFAULT_RESET_CONDITION = 
RetryingInputStream::isConnectionReset;
-
   private static final Logger log = new Logger(RetryingInputStream.class);
 
   private final T object;
   private final ObjectOpenFunction<T> objectOpenFunction;
   private final Predicate<Throwable> retryCondition;
-  private final Predicate<Throwable> resetCondition;
-  private final int maxRetry;
+  private final int maxTries;
 
   private CountingInputStream delegate;
   private long startOffset;
 
+  // Used in tests to disable waiting.
+  private boolean doWait;
+
   /**
-   *
-   * @param object The object entity to open
+   * @param object             The object entity to open
    * @param objectOpenFunction How to open the object
-   * @param retryCondition A predicate on a throwable to indicate if stream 
should retry. This defaults to
-   *                       {@link IOException}, not retryable, when null is 
passed
-   * @param resetCondition A predicate on a throwable to indicate if stream 
should reset. This defaults to
-   *                       a generic reset test, see {@link 
#isConnectionReset(Throwable)} when null is passed
-   * @param maxRetry      The maximum times to retry. Defaults to {@link 
RetryUtils#DEFAULT_MAX_TRIES} when null
+   * @param retryCondition     A predicate on a throwable to indicate if 
stream should retry.
+   * @param maxTries           The maximum times to try. Defaults to {@link 
RetryUtils#DEFAULT_MAX_TRIES} when null
+   *
    * @throws IOException
    */
   public RetryingInputStream(
       T object,
       ObjectOpenFunction<T> objectOpenFunction,
-      @Nullable Predicate<Throwable> retryCondition,
-      @Nullable Predicate<Throwable> resetCondition,
-      @Nullable Integer maxRetry
+      Predicate<Throwable> retryCondition,
+      @Nullable Integer maxTries
   ) throws IOException
   {
-    this.object = object;
-    this.objectOpenFunction = objectOpenFunction;
-    this.retryCondition = retryCondition == null ? DEFAULT_RETRY_CONDITION : 
retryCondition;
-    this.resetCondition = resetCondition == null ? DEFAULT_RESET_CONDITION : 
resetCondition;
-    this.maxRetry = maxRetry == null ? RetryUtils.DEFAULT_MAX_TRIES : maxRetry;
+    this.object = Preconditions.checkNotNull(object, "object");
+    this.objectOpenFunction = Preconditions.checkNotNull(objectOpenFunction, 
"objectOpenFunction");
+    this.retryCondition = Preconditions.checkNotNull(retryCondition, 
"retryCondition");
+    this.maxTries = maxTries == null ? RetryUtils.DEFAULT_MAX_TRIES : maxTries;
     this.delegate = new CountingInputStream(objectOpenFunction.open(object));
+    this.doWait = true;
+
+    if (this.maxTries <= 1) {
+      throw new IAE("maxTries must be greater than 1");
+    }
   }
 
-  private static boolean isConnectionReset(Throwable t)
+  private void openIfNeeded() throws IOException
   {
-    return (t instanceof SocketException && (t.getMessage() != null && 
t.getMessage().contains("Connection reset"))) ||
-           (t.getCause() != null && isConnectionReset(t.getCause()));
+    if (delegate == null) {
+      delegate = new CountingInputStream(objectOpenFunction.open(object, 
startOffset));
+    }
   }
 
   private void waitOrThrow(Throwable t, int nTry) throws IOException
   {
-    final boolean isConnectionReset = resetCondition.apply(t);
-    if (isConnectionReset || retryCondition.apply(t)) {
-      if (isConnectionReset) {
-        // Re-open the input stream on connection reset
-        startOffset += delegate.getCount();
-        try {
-          delegate.close();
-        }
-        catch (IOException e) {
-          // ignore this exception
-          log.warn(e, "Error while closing the delegate input stream");
-        }
-      }
+    // Update startOffset first, since we're about to close and null out the 
delegate.
+    startOffset += delegate.getCount();
+
+    try {
+      delegate.close();
+    }
+    catch (IOException e) {
+      // ignore this exception
+      log.warn(e, "Error while closing the delegate input stream. 
Discarding.");
+    }
+    finally {
+      delegate = null;
+    }
+
+    final int nextTry = nTry + 1;
+
+    if (nextTry < maxTries && retryCondition.apply(t)) {
       try {
-        // Wait for the next try
-        RetryUtils.awaitNextRetry(t, null, nTry + 1, maxRetry, false);
+        // Pause for some time and then re-open the input stream.
+        final String message = String.format("Stream interrupted at position 
[%d]", startOffset);

Review comment:
       ```suggestion
           final String message = StringUtils.format("Stream interrupted at 
position [%d]", startOffset);
   ```
   
   CI failed because of the use of `String.format()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to