FrankChen021 commented on a change in pull request #12307:
URL: https://github.com/apache/druid/pull/12307#discussion_r820016475



##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
##########
@@ -19,99 +19,105 @@
 
 package org.apache.druid.data.input.impl;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 import com.google.common.io.CountingInputStream;
 import org.apache.druid.data.input.impl.prefetch.Fetcher;
 import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.SocketException;
 
 /**
  * This class is used by {@link Fetcher} when prefetch is disabled. It's 
responsible for re-opening the underlying input
- * stream for the input object on the socket connection reset as well as the 
given {@link #retryCondition}.
+ * stream for the input object on the given {@link #retryCondition}.
  *
  * @param <T> object type
  */
 public class RetryingInputStream<T> extends InputStream
 {
-
-  public static final Predicate<Throwable> DEFAULT_RETRY_CONDITION = 
Predicates.alwaysFalse();
-  public static final Predicate<Throwable> DEFAULT_RESET_CONDITION = 
RetryingInputStream::isConnectionReset;
-
   private static final Logger log = new Logger(RetryingInputStream.class);
 
   private final T object;
   private final ObjectOpenFunction<T> objectOpenFunction;
   private final Predicate<Throwable> retryCondition;
-  private final Predicate<Throwable> resetCondition;
-  private final int maxRetry;
+  private final int maxTries;
 
   private CountingInputStream delegate;
   private long startOffset;
 
+  // Used in tests to disable waiting.
+  private boolean doWait;
+
   /**
-   *
-   * @param object The object entity to open
+   * @param object             The object entity to open
    * @param objectOpenFunction How to open the object
-   * @param retryCondition A predicate on a throwable to indicate if stream 
should retry. This defaults to
-   *                       {@link IOException}, not retryable, when null is 
passed
-   * @param resetCondition A predicate on a throwable to indicate if stream 
should reset. This defaults to
-   *                       a generic reset test, see {@link 
#isConnectionReset(Throwable)} when null is passed
-   * @param maxRetry      The maximum times to retry. Defaults to {@link 
RetryUtils#DEFAULT_MAX_TRIES} when null
+   * @param retryCondition     A predicate on a throwable to indicate if 
stream should retry.
+   * @param maxTries           The maximum times to try. Defaults to {@link 
RetryUtils#DEFAULT_MAX_TRIES} when null
+   *
    * @throws IOException
    */
   public RetryingInputStream(
       T object,
       ObjectOpenFunction<T> objectOpenFunction,
-      @Nullable Predicate<Throwable> retryCondition,
-      @Nullable Predicate<Throwable> resetCondition,
-      @Nullable Integer maxRetry
+      Predicate<Throwable> retryCondition,
+      @Nullable Integer maxTries
   ) throws IOException
   {
-    this.object = object;
-    this.objectOpenFunction = objectOpenFunction;
-    this.retryCondition = retryCondition == null ? DEFAULT_RETRY_CONDITION : 
retryCondition;
-    this.resetCondition = resetCondition == null ? DEFAULT_RESET_CONDITION : 
resetCondition;
-    this.maxRetry = maxRetry == null ? RetryUtils.DEFAULT_MAX_TRIES : maxRetry;
+    this.object = Preconditions.checkNotNull(object, "object");
+    this.objectOpenFunction = Preconditions.checkNotNull(objectOpenFunction, 
"objectOpenFunction");
+    this.retryCondition = Preconditions.checkNotNull(retryCondition, 
"retryCondition");
+    this.maxTries = maxTries == null ? RetryUtils.DEFAULT_MAX_TRIES : maxTries;
     this.delegate = new CountingInputStream(objectOpenFunction.open(object));
+    this.doWait = true;
+
+    if (this.maxTries <= 1) {

Review comment:
       Why is maxTries with 1 not allowed? From the implementation below we can 
see that if maxTries equals to 1, it also reads the underlying stream.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to