adarshsanjeev commented on code in PR #14660:
URL: https://github.com/apache/druid/pull/14660#discussion_r1285330752


##########
processing/src/main/java/org/apache/druid/storage/remote/ChunkingStorageConnector.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.remote;
+
+import com.google.common.base.Predicates;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.druid.data.input.impl.RetryingInputStream;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.Enumeration;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An abstract implementation of the storage connectors that download the file 
from the remote storage in chunks
+ * and presents the downloaded chunks as a single {@link InputStream} for the 
consumers of the connector.
+ * This implementation benefits over keeping the InputStream to the remote 
source open since we don't require the
+ * connection to be open for the entire duration.
+ * Checkout {@link ChunkingStorageConnectorParameters} to see the inputs 
required to support chunking
+ */
+public abstract class ChunkingStorageConnector<T> implements StorageConnector
+{
+  private static final long DOWNLOAD_MAX_CHUNK_SIZE_BYTES = 100_000_000;
+
+  private final long chunkSizeBytes;
+
+  public ChunkingStorageConnector()
+  {
+    this(DOWNLOAD_MAX_CHUNK_SIZE_BYTES);
+  }
+
+  public ChunkingStorageConnector(
+      final long chunkSizeBytes
+  )
+  {
+    this.chunkSizeBytes = chunkSizeBytes;
+  }
+
+  @Override
+  public InputStream read(String path) throws IOException
+  {
+    return buildInputStream(buildInputParams(path));
+  }
+
+  @Override
+  public InputStream readRange(String path, long from, long size)
+  {
+    return buildInputStream(buildInputParams(path, from, size));
+  }
+
+  public abstract ChunkingStorageConnectorParameters<T> 
buildInputParams(String path) throws IOException;
+
+  public abstract ChunkingStorageConnectorParameters<T> 
buildInputParams(String path, long from, long size);
+
+  private InputStream buildInputStream(ChunkingStorageConnectorParameters<T> 
params)
+  {
+    // Position from where the read needs to be resumed
+    final AtomicLong currentReadStartPosition = new 
AtomicLong(params.getStart());
+
+    // Final position, exclusive
+    long readEnd = params.getEnd();
+
+    AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
+
+    return new SequenceInputStream(
+
+        new Enumeration<InputStream>()
+        {
+          boolean initStream = false;
+
+          @Override
+          public boolean hasMoreElements()
+          {
+            // Checking if the stream was already closed. If it was, then 
don't iterate over the remaining chunks
+            // SequenceInputStream's close method closes all the chunk streams 
in its close. Since we're opening them
+            // lazily, we don't need to close them.
+            if (isSequenceStreamClosed.get()) {
+              return false;
+            }
+            // Don't stop until the whole object is downloaded
+            return currentReadStartPosition.get() < readEnd;
+          }
+
+          @Override
+          public InputStream nextElement()
+          {
+            if (!initStream) {
+              initStream = true;
+              return new NullInputStream();
+            }
+
+            File outFile = new File(
+                params.getTempDirSupplier().get().getAbsolutePath(),
+                UUID.randomUUID().toString()
+            );
+
+            long currentReadEndPosition = Math.min(
+                currentReadStartPosition.get() + chunkSizeBytes,
+                readEnd
+            );
+
+            try {
+              if (!outFile.createNewFile()) {
+                throw new IOE(
+                    StringUtils.format(
+                        "Could not create temporary file [%s] for copying 
[%s]",
+                        outFile.getAbsolutePath(),
+                        params.getCloudStoragePath()
+                    )
+                );
+              }
+
+              FileUtils.copyLarge(
+                  () -> new RetryingInputStream<>(
+                      
params.getObjectSupplier().getObject(currentReadStartPosition.get(), 
currentReadEndPosition),
+                      params.getObjectOpenFunction(),
+                      params.getRetryCondition(),
+                      params.getMaxRetry()
+                  ),
+                  outFile,
+                  new byte[8 * 1024],

Review Comment:
   I know this code was only moved, but could you add a comment on why these 
numbers are chosen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to