gianm commented on a change in pull request #12307:
URL: https://github.com/apache/druid/pull/12307#discussion_r819988928
##########
File path:
core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
##########
@@ -131,76 +136,92 @@ private static void throwAsIOException(Throwable t)
throws IOException
@Override
public int read() throws IOException
{
- for (int nTry = 0; nTry < maxRetry; nTry++) {
+ openIfNeeded();
+
+ for (int nTry = 0; nTry < maxTries; nTry++) {
try {
return delegate.read();
}
catch (Throwable t) {
waitOrThrow(t, nTry);
}
}
- return delegate.read();
+
+ // Can't happen, because the final waitOrThrow would have thrown.
+ throw new IllegalStateException();
}
@Override
public int read(byte[] b) throws IOException
{
- for (int nTry = 0; nTry < maxRetry; nTry++) {
- try {
- return delegate.read(b);
- }
- catch (Throwable t) {
- waitOrThrow(t, nTry);
- }
- }
- return delegate.read(b);
+ return read(b, 0, b.length);
Review comment:
Sure, I'll restore it.
##########
File path:
core/src/main/java/org/apache/druid/data/input/impl/RetryingInputStream.java
##########
@@ -19,99 +19,104 @@
package org.apache.druid.data.input.impl;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.io.CountingInputStream;
import org.apache.druid.data.input.impl.prefetch.Fetcher;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
-import java.net.SocketException;
/**
* This class is used by {@link Fetcher} when prefetch is disabled. It's
responsible for re-opening the underlying input
- * stream for the input object on the socket connection reset as well as the
given {@link #retryCondition}.
+ * stream for the input object on the given {@link #retryCondition}.
*
* @param <T> object type
*/
public class RetryingInputStream<T> extends InputStream
{
-
- public static final Predicate<Throwable> DEFAULT_RETRY_CONDITION =
Predicates.alwaysFalse();
- public static final Predicate<Throwable> DEFAULT_RESET_CONDITION =
RetryingInputStream::isConnectionReset;
-
private static final Logger log = new Logger(RetryingInputStream.class);
private final T object;
private final ObjectOpenFunction<T> objectOpenFunction;
private final Predicate<Throwable> retryCondition;
- private final Predicate<Throwable> resetCondition;
- private final int maxRetry;
+ private final int maxTries;
private CountingInputStream delegate;
private long startOffset;
+ // Used in tests to disable waiting.
+ private boolean doWait;
+
/**
- *
- * @param object The object entity to open
+ * @param object The object entity to open
* @param objectOpenFunction How to open the object
- * @param retryCondition A predicate on a throwable to indicate if stream
should retry. This defaults to
- * {@link IOException}, not retryable, when null is
passed
- * @param resetCondition A predicate on a throwable to indicate if stream
should reset. This defaults to
- * a generic reset test, see {@link
#isConnectionReset(Throwable)} when null is passed
- * @param maxRetry The maximum times to retry. Defaults to {@link
RetryUtils#DEFAULT_MAX_TRIES} when null
+ * @param retryCondition A predicate on a throwable to indicate if
stream should retry.
+ * @param maxTries The maximum times to try. Defaults to {@link
RetryUtils#DEFAULT_MAX_TRIES} when null
+ *
* @throws IOException
*/
public RetryingInputStream(
T object,
ObjectOpenFunction<T> objectOpenFunction,
- @Nullable Predicate<Throwable> retryCondition,
- @Nullable Predicate<Throwable> resetCondition,
- @Nullable Integer maxRetry
+ Predicate<Throwable> retryCondition,
+ @Nullable Integer maxTries
) throws IOException
{
- this.object = object;
- this.objectOpenFunction = objectOpenFunction;
- this.retryCondition = retryCondition == null ? DEFAULT_RETRY_CONDITION :
retryCondition;
- this.resetCondition = resetCondition == null ? DEFAULT_RESET_CONDITION :
resetCondition;
- this.maxRetry = maxRetry == null ? RetryUtils.DEFAULT_MAX_TRIES : maxRetry;
+ this.object = Preconditions.checkNotNull(object, "object");
+ this.objectOpenFunction = Preconditions.checkNotNull(objectOpenFunction,
"objectOpenFunction");
+ this.retryCondition = Preconditions.checkNotNull(retryCondition,
"retryCondition");
+ this.maxTries = maxTries == null ? RetryUtils.DEFAULT_MAX_TRIES : maxTries;
this.delegate = new CountingInputStream(objectOpenFunction.open(object));
+ this.doWait = true;
+
+ if (this.maxTries <= 1) {
+ throw new IAE("maxTries must be greater than 1");
+ }
}
- private static boolean isConnectionReset(Throwable t)
+ private void openIfNeeded() throws IOException
{
- return (t instanceof SocketException && (t.getMessage() != null &&
t.getMessage().contains("Connection reset"))) ||
- (t.getCause() != null && isConnectionReset(t.getCause()));
+ if (delegate == null) {
+ delegate = new CountingInputStream(objectOpenFunction.open(object,
startOffset));
+ }
}
private void waitOrThrow(Throwable t, int nTry) throws IOException
{
- final boolean isConnectionReset = resetCondition.apply(t);
- if (isConnectionReset || retryCondition.apply(t)) {
- if (isConnectionReset) {
- // Re-open the input stream on connection reset
- startOffset += delegate.getCount();
- try {
- delegate.close();
- }
- catch (IOException e) {
- // ignore this exception
- log.warn(e, "Error while closing the delegate input stream");
- }
- }
+ // Update startOffset first, since we're about to close and null out the
delegate.
+ startOffset += delegate.getCount();
+
+ try {
+ delegate.close();
+ }
+ catch (IOException e) {
+ // ignore this exception
+ log.warn(e, "Error while closing the delegate input stream.
Discarding.");
+ }
+ finally {
+ delegate = null;
+ }
+
+ final int nextTry = nTry + 1;
+
+ if (nextTry < maxTries && retryCondition.apply(t)) {
try {
- // Wait for the next try
- RetryUtils.awaitNextRetry(t, null, nTry + 1, maxRetry, false);
+ // Pause for some time and then re-open the input stream.
+ final String message = String.format("Stream interrupted at position
[%d]", startOffset);
Review comment:
Oops!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]