This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 217b0f6832 Eagerly fetching remote s3 files leading to out of disk 
(OOD) (#13981)
217b0f6832 is described below

commit 217b0f6832c819b6c2b264418038b86049769ad8
Author: Karan Kumar <[email protected]>
AuthorDate: Mon Apr 3 14:10:37 2023 +0530

    Eagerly fetching remote s3 files leading to out of disk (OOD) (#13981)
    
    * Eagerly fetching remote s3 files leading to OOD.
---
 .../shuffle/DurableStorageInputChannelFactory.java | 22 +++++++----------
 .../DurableStorageOutputChannelFactory.java        | 10 +++-----
 .../storage/s3/output/S3StorageConnector.java      |  8 +++++++
 .../channel/ReadableInputStreamFrameChannel.java   | 28 +++++++++++++++-------
 4 files changed, 39 insertions(+), 29 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
index 1f6c1e44fa..8e8315d902 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
@@ -26,7 +26,6 @@ import 
org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
 import org.apache.druid.frame.util.DurableStorageUtils;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -97,18 +96,15 @@ public class DurableStorageInputChannelFactory implements 
InputChannelFactory
           workerNumber,
           remotePartitionPath
       );
-      RetryUtils.retry(() -> {
-        if (!storageConnector.pathExists(remotePartitionPath)) {
-          throw new ISE(
-              "Could not find remote outputs of stage [%d] partition [%d] for 
worker [%d] at the path [%s]",
-              stageId.getStageNumber(),
-              partitionNumber,
-              workerNumber,
-              remotePartitionPath
-          );
-        }
-        return Boolean.TRUE;
-      }, (throwable) -> true, 10);
+      if (!storageConnector.pathExists(remotePartitionPath)) {
+        throw new ISE(
+            "Could not find remote outputs of stage [%d] partition [%d] for 
worker [%d] at the path [%s]",
+            stageId.getStageNumber(),
+            partitionNumber,
+            workerNumber,
+            remotePartitionPath
+        );
+      }
       final InputStream inputStream = 
storageConnector.read(remotePartitionPath);
 
       return ReadableInputStreamFrameChannel.open(
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
index 4505be5846..ca7041cfa0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
@@ -39,7 +39,6 @@ import org.apache.druid.frame.util.DurableStorageUtils;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.MappedByteBufferHandler;
-import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -142,12 +141,9 @@ public class DurableStorageOutputChannelFactory implements 
OutputChannelFactory
         ArenaMemoryAllocator.createOnHeap(frameSize),
         () -> {
           try {
-            RetryUtils.retry(() -> {
-              if (!storageConnector.pathExists(fileName)) {
-                throw new ISE("File does not exist : %s", fileName);
-              }
-              return Boolean.TRUE;
-            }, (throwable) -> true, 10);
+            if (!storageConnector.pathExists(fileName)) {
+              throw new ISE("File does not exist : %s", fileName);
+            }
           }
           catch (Exception exception) {
             throw new RuntimeException(exception);
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index c72f954401..a1e31e3a92 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import org.apache.commons.io.input.NullInputStream;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.data.input.impl.RetryingInputStream;
 import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
@@ -150,6 +151,7 @@ public class S3StorageConnector implements StorageConnector
     // build a sequence input stream from chunks
     return new SequenceInputStream(new Enumeration<InputStream>()
     {
+      boolean initStream = false;
       @Override
       public boolean hasMoreElements()
       {
@@ -166,6 +168,12 @@ public class S3StorageConnector implements StorageConnector
       @Override
       public InputStream nextElement()
       {
+        // since Sequence input stream calls nextElement in the constructor, 
we start chunking as soon as we call read.
+        // to avoid that we pass a nullInputStream for the first iteration.
+        if (!initStream) {
+          initStream = true;
+          return new NullInputStream();
+        }
         File outFile = new File(config.getTempDir().getAbsolutePath(), 
UUID.randomUUID().toString());
         // in a single chunk, only download a maximum of 
DOWNLOAD_MAX_CHUNK_SIZE
         long endPoint = Math.min(currReadStart.get() + 
DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
index 280589da1d..5f03a5e8ef 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Channel backed by an {@link InputStream}.
- *
+ * <p>
  * Frame channels are expected to be nonblocking, but InputStreams cannot be 
read in nonblocking fashion.
  * This implementation deals with that by using an {@link ExecutorService} to 
read from the stream in a
  * separate thread.
@@ -56,6 +56,8 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
   @GuardedBy("lock")
   private boolean inputStreamError = false;
 
+  private boolean isStarted = false;
+
   private volatile boolean keepReading = true;
 
   private final Object readMonitor = new Object();
@@ -96,20 +98,18 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
       boolean framesOnly
   )
   {
-    final ReadableInputStreamFrameChannel channel = new 
ReadableInputStreamFrameChannel(
+    return new ReadableInputStreamFrameChannel(
         inputStream,
         ReadableByteChunksFrameChannel.create(id, framesOnly),
         executorService
     );
-
-    channel.startReading();
-    return channel;
   }
 
   @Override
   public boolean isFinished()
   {
     synchronized (lock) {
+      startReading();
       return delegate.isFinished();
     }
   }
@@ -118,6 +118,7 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
   public boolean canRead()
   {
     synchronized (lock) {
+      startReading();
       return delegate.canRead();
     }
   }
@@ -126,6 +127,7 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
   public Frame read()
   {
     synchronized (lock) {
+      startReading();
       return delegate.read();
     }
   }
@@ -134,6 +136,7 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
   public ListenableFuture<?> readabilityFuture()
   {
     synchronized (lock) {
+      startReading();
       return delegate.readabilityFuture();
     }
   }
@@ -150,6 +153,12 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
 
   private void startReading()
   {
+
+    // the task to the executor service is submitted only once.
+    if (isStarted) {
+      return;
+    }
+    isStarted = true;
     executorService.submit(() -> {
       int nTry = 1;
       while (true) {
@@ -168,7 +177,7 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
             ++nTry;
           }
           catch (InterruptedException e) {
-            // close inputstream anyway if the thread interrups
+            // close input stream anyway if the thread interrupts
             IOUtils.closeQuietly(inputStream);
             throw new ISE(e, Thread.currentThread().getName() + "interrupted");
           }
@@ -187,6 +196,8 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
               if (bytesRead == -1) {
                 inputStreamFinished = true;
                 delegate.doneWriting();
+                // eagerly release input stream resources since everything is 
read.
+                IOUtils.closeQuietly(inputStream);
                 break;
               } else {
                 ListenableFuture<?> backpressureFuture = 
delegate.addChunk(Arrays.copyOfRange(buffer, 0, bytesRead));
@@ -233,8 +244,7 @@ public class ReadableInputStreamFrameChannel implements 
ReadableFrameChannel
   private static long nextRetrySleepMillis(final int nTry)
   {
     final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * 
ThreadLocalRandom.current().nextGaussian(), 0), 2);
-    final long sleepMillis = (long) (Math.min(MAX_SLEEP_MILLIS, 
BASE_SLEEP_MILLIS * Math.pow(2, nTry - 1))
-                                     * fuzzyMultiplier);
-    return sleepMillis;
+    return (long) (Math.min(MAX_SLEEP_MILLIS, BASE_SLEEP_MILLIS * Math.pow(2, 
nTry - 1))
+                   * fuzzyMultiplier);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to