github-code-scanning[bot] commented on code in PR #14834:
URL: https://github.com/apache/druid/pull/14834#discussion_r1301101339


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -22,48 +22,108 @@
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
 import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.storage.remote.ChunkingStorageConnector;
 import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
+import org.apache.druid.storage.s3.NoopServerSideEncryption;
 import org.apache.druid.storage.s3.S3Utils;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.SequenceInputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 /**
  * In this implementation, all remote calls to aws s3 are retried {@link 
S3OutputConfig#getMaxRetry()} times.
  */
 public class S3StorageConnector extends 
ChunkingStorageConnector<GetObjectRequest>
 {
   private static final Logger log = new Logger(S3StorageConnector.class);
+  private static final int DOWNLOAD_PARALLELISM = 4;
 
   private final S3OutputConfig config;
   private final ServerSideEncryptingAmazonS3 s3Client;
 
+  @Nullable
+  private final TransferManager transferManager;
+
   private static final String DELIM = "/";
   private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
   private static final int MAX_NUMBER_OF_LISTINGS = 1000;
+  private static final long DOWNLOAD_SIZE_BYTES = 32 * 1024 * 1024;
+
+  private final long uploadChunkSize;
+
+  // cacheLocally is old behaviour
+  private final boolean cacheLocally;
 
   public S3StorageConnector(S3OutputConfig config, 
ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
   {
+    this(
+        config,
+        serverSideEncryptingAmazonS3,
+        config.getChunkSize(),
+        config.getChunkSize() / DOWNLOAD_PARALLELISM,
+        !(config.isTestingTransferManager()
+          && serverSideEncryptingAmazonS3.getUnderlyingServerSideEncryption() 
instanceof NoopServerSideEncryption)
+    );
+  }
+
+  private S3StorageConnector(
+      S3OutputConfig config,
+      ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3,
+      long downloadChunkSize,

Review Comment:
   ## Useless parameter
   
   The parameter 'downloadChunkSize' is never used.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5726)



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -264,4 +341,101 @@
   {
     return JOINER.join(config.getPrefix(), path);
   }
+
+  // Add some buffering as well
+  InputStream implementParallelDownload(GetObjectRequest getObjectRequest, int 
parallelism, long rangeSize)
+  {
+    ForkJoinPool fjp = new ForkJoinPool(parallelism);
+    long start = getObjectRequest.getRange()[0];
+    long end = getObjectRequest.getRange()[1];
+    List<Pair<Long, Long>> divisions = new ArrayList<>();
+
+    for (long st = start; st <= end; st += rangeSize) {
+      divisions.add(Pair.of(st, Math.min(st + rangeSize - 1, end)));
+    }
+
+    try {
+      List<Pair<Long, File>> startToIs =
+          fjp.submit(() -> divisions.stream()
+                                    .parallel()
+                                    .map(division -> {
+                                      try {
+                                        File outFile = new File(
+                                            
config.getTempDir().getAbsolutePath(),
+                                            UUID.randomUUID().toString()
+                                        );
+                                        DateTime startTime = 
DateTimes.nowUtc();
+                                        IOUtils.copy(
+                                            s3Client.getObject(
+                                                new GetObjectRequest(
+                                                    
getObjectRequest.getBucketName(),
+                                                    getObjectRequest.getKey()
+                                                ).withRange(division.lhs, 
division.rhs)
+                                            ).getObjectContent(),
+                                            new FileOutputStream(outFile)

Review Comment:
   ## Potential output resource leak
   
   This FileOutputStream is not always closed on method exit.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5727)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to