This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 217b0f6832 Eagerly fetching remote s3 files leading to out of disk
(OOD) (#13981)
217b0f6832 is described below
commit 217b0f6832c819b6c2b264418038b86049769ad8
Author: Karan Kumar <[email protected]>
AuthorDate: Mon Apr 3 14:10:37 2023 +0530
Eagerly fetching remote s3 files leading to out of disk (OOD) (#13981)
* Eagerly fetching remote s3 files leading to OOD.
---
.../shuffle/DurableStorageInputChannelFactory.java | 22 +++++++----------
.../DurableStorageOutputChannelFactory.java | 10 +++-----
.../storage/s3/output/S3StorageConnector.java | 8 +++++++
.../channel/ReadableInputStreamFrameChannel.java | 28 +++++++++++++++-------
4 files changed, 39 insertions(+), 29 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
index 1f6c1e44fa..8e8315d902 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
@@ -26,7 +26,6 @@ import
org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
@@ -97,18 +96,15 @@ public class DurableStorageInputChannelFactory implements
InputChannelFactory
workerNumber,
remotePartitionPath
);
- RetryUtils.retry(() -> {
- if (!storageConnector.pathExists(remotePartitionPath)) {
- throw new ISE(
- "Could not find remote outputs of stage [%d] partition [%d] for
worker [%d] at the path [%s]",
- stageId.getStageNumber(),
- partitionNumber,
- workerNumber,
- remotePartitionPath
- );
- }
- return Boolean.TRUE;
- }, (throwable) -> true, 10);
+ if (!storageConnector.pathExists(remotePartitionPath)) {
+ throw new ISE(
+ "Could not find remote outputs of stage [%d] partition [%d] for
worker [%d] at the path [%s]",
+ stageId.getStageNumber(),
+ partitionNumber,
+ workerNumber,
+ remotePartitionPath
+ );
+ }
final InputStream inputStream =
storageConnector.read(remotePartitionPath);
return ReadableInputStreamFrameChannel.open(
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
index 4505be5846..ca7041cfa0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
@@ -39,7 +39,6 @@ import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MappedByteBufferHandler;
-import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@@ -142,12 +141,9 @@ public class DurableStorageOutputChannelFactory implements
OutputChannelFactory
ArenaMemoryAllocator.createOnHeap(frameSize),
() -> {
try {
- RetryUtils.retry(() -> {
- if (!storageConnector.pathExists(fileName)) {
- throw new ISE("File does not exist : %s", fileName);
- }
- return Boolean.TRUE;
- }, (throwable) -> true, 10);
+ if (!storageConnector.pathExists(fileName)) {
+ throw new ISE("File does not exist : %s", fileName);
+ }
}
catch (Exception exception) {
throw new RuntimeException(exception);
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index c72f954401..a1e31e3a92 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
+import org.apache.commons.io.input.NullInputStream;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.RetryingInputStream;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
@@ -150,6 +151,7 @@ public class S3StorageConnector implements StorageConnector
// build a sequence input stream from chunks
return new SequenceInputStream(new Enumeration<InputStream>()
{
+ boolean initStream = false;
@Override
public boolean hasMoreElements()
{
@@ -166,6 +168,12 @@ public class S3StorageConnector implements StorageConnector
@Override
public InputStream nextElement()
{
+ // since Sequence input stream calls nextElement in the constructor,
we start chunking as soon as we call read.
+ // to avoid that we pass a nullInputStream for the first iteration.
+ if (!initStream) {
+ initStream = true;
+ return new NullInputStream();
+ }
File outFile = new File(config.getTempDir().getAbsolutePath(),
UUID.randomUUID().toString());
// in a single chunk, only download a maximum of
DOWNLOAD_MAX_CHUNK_SIZE
long endPoint = Math.min(currReadStart.get() +
DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
index 280589da1d..5f03a5e8ef 100644
---
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
+++
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ThreadLocalRandom;
/**
* Channel backed by an {@link InputStream}.
- *
+ * <p>
* Frame channels are expected to be nonblocking, but InputStreams cannot be
read in nonblocking fashion.
* This implementation deals with that by using an {@link ExecutorService} to
read from the stream in a
* separate thread.
@@ -56,6 +56,8 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
@GuardedBy("lock")
private boolean inputStreamError = false;
+ private boolean isStarted = false;
+
private volatile boolean keepReading = true;
private final Object readMonitor = new Object();
@@ -96,20 +98,18 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
boolean framesOnly
)
{
- final ReadableInputStreamFrameChannel channel = new
ReadableInputStreamFrameChannel(
+ return new ReadableInputStreamFrameChannel(
inputStream,
ReadableByteChunksFrameChannel.create(id, framesOnly),
executorService
);
-
- channel.startReading();
- return channel;
}
@Override
public boolean isFinished()
{
synchronized (lock) {
+ startReading();
return delegate.isFinished();
}
}
@@ -118,6 +118,7 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
public boolean canRead()
{
synchronized (lock) {
+ startReading();
return delegate.canRead();
}
}
@@ -126,6 +127,7 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
public Frame read()
{
synchronized (lock) {
+ startReading();
return delegate.read();
}
}
@@ -134,6 +136,7 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
public ListenableFuture<?> readabilityFuture()
{
synchronized (lock) {
+ startReading();
return delegate.readabilityFuture();
}
}
@@ -150,6 +153,12 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
private void startReading()
{
+
+ // the task to the executor service is submitted only once.
+ if (isStarted) {
+ return;
+ }
+ isStarted = true;
executorService.submit(() -> {
int nTry = 1;
while (true) {
@@ -168,7 +177,7 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
++nTry;
}
catch (InterruptedException e) {
- // close inputstream anyway if the thread interrups
+ // close input stream anyway if the thread interrupts
IOUtils.closeQuietly(inputStream);
throw new ISE(e, Thread.currentThread().getName() + "interrupted");
}
@@ -187,6 +196,8 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
if (bytesRead == -1) {
inputStreamFinished = true;
delegate.doneWriting();
+ // eagerly release input stream resources since everything is
read.
+ IOUtils.closeQuietly(inputStream);
break;
} else {
ListenableFuture<?> backpressureFuture =
delegate.addChunk(Arrays.copyOfRange(buffer, 0, bytesRead));
@@ -233,8 +244,7 @@ public class ReadableInputStreamFrameChannel implements
ReadableFrameChannel
private static long nextRetrySleepMillis(final int nTry)
{
final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 *
ThreadLocalRandom.current().nextGaussian(), 0), 2);
- final long sleepMillis = (long) (Math.min(MAX_SLEEP_MILLIS,
BASE_SLEEP_MILLIS * Math.pow(2, nTry - 1))
- * fuzzyMultiplier);
- return sleepMillis;
+ return (long) (Math.min(MAX_SLEEP_MILLIS, BASE_SLEEP_MILLIS * Math.pow(2,
nTry - 1))
+ * fuzzyMultiplier);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]