FrankChen021 commented on a change in pull request #12307:
URL: https://github.com/apache/druid/pull/12307#discussion_r820016475
##########
File path:
core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
##########
@@ -19,99 +19,105 @@
package org.apache.druid.data.input.impl;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.io.CountingInputStream;
import org.apache.druid.data.input.impl.prefetch.Fetcher;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
-import java.net.SocketException;
/**
* This class is used by {@link Fetcher} when prefetch is disabled. It's
responsible for re-opening the underlying input
- * stream for the input object on the socket connection reset as well as the
given {@link #retryCondition}.
+ * stream for the input object on the given {@link #retryCondition}.
*
* @param <T> object type
*/
public class RetryingInputStream<T> extends InputStream
{
-
- public static final Predicate<Throwable> DEFAULT_RETRY_CONDITION =
Predicates.alwaysFalse();
- public static final Predicate<Throwable> DEFAULT_RESET_CONDITION =
RetryingInputStream::isConnectionReset;
-
private static final Logger log = new Logger(RetryingInputStream.class);
private final T object;
private final ObjectOpenFunction<T> objectOpenFunction;
private final Predicate<Throwable> retryCondition;
- private final Predicate<Throwable> resetCondition;
- private final int maxRetry;
+ private final int maxTries;
private CountingInputStream delegate;
private long startOffset;
+ // Used in tests to disable waiting.
+ private boolean doWait;
+
/**
- *
- * @param object The object entity to open
+ * @param object The object entity to open
* @param objectOpenFunction How to open the object
- * @param retryCondition A predicate on a throwable to indicate if stream
should retry. This defaults to
- * {@link IOException}, not retryable, when null is
passed
- * @param resetCondition A predicate on a throwable to indicate if stream
should reset. This defaults to
- * a generic reset test, see {@link
#isConnectionReset(Throwable)} when null is passed
- * @param maxRetry The maximum times to retry. Defaults to {@link
RetryUtils#DEFAULT_MAX_TRIES} when null
+ * @param retryCondition A predicate on a throwable to indicate if
stream should retry.
+ * @param maxTries The maximum times to try. Defaults to {@link
RetryUtils#DEFAULT_MAX_TRIES} when null
+ *
* @throws IOException
*/
public RetryingInputStream(
T object,
ObjectOpenFunction<T> objectOpenFunction,
- @Nullable Predicate<Throwable> retryCondition,
- @Nullable Predicate<Throwable> resetCondition,
- @Nullable Integer maxRetry
+ Predicate<Throwable> retryCondition,
+ @Nullable Integer maxTries
) throws IOException
{
- this.object = object;
- this.objectOpenFunction = objectOpenFunction;
- this.retryCondition = retryCondition == null ? DEFAULT_RETRY_CONDITION :
retryCondition;
- this.resetCondition = resetCondition == null ? DEFAULT_RESET_CONDITION :
resetCondition;
- this.maxRetry = maxRetry == null ? RetryUtils.DEFAULT_MAX_TRIES : maxRetry;
+ this.object = Preconditions.checkNotNull(object, "object");
+ this.objectOpenFunction = Preconditions.checkNotNull(objectOpenFunction,
"objectOpenFunction");
+ this.retryCondition = Preconditions.checkNotNull(retryCondition,
"retryCondition");
+ this.maxTries = maxTries == null ? RetryUtils.DEFAULT_MAX_TRIES : maxTries;
this.delegate = new CountingInputStream(objectOpenFunction.open(object));
+ this.doWait = true;
+
+ if (this.maxTries <= 1) {
Review comment:
Why is maxTries with 1 not allowed? From the implementation below we can
see that if maxTries equals to 1, it also reads the underlying stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]