imply-cheddar commented on code in PR #13741:
URL: https://github.com/apache/druid/pull/13741#discussion_r1095389740


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -48,11 +61,20 @@ public class S3StorageConnector implements StorageConnector
 
   private static final String DELIM = "/";
   private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
+  private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
 
   public S3StorageConnector(S3OutputConfig config, 
ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
   {
     this.config = config;
     this.s3Client = serverSideEncryptingAmazonS3;
+    if (config.getTempDir() != null) {

Review Comment:
   If `getTempDir` is null, this code is still gonna fail, but a lot later on.  
You can test and validate here instead.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -78,35 +100,117 @@ public InputStream readRange(String path, long from, long 
size) throws IOExcepti
           size
       );
     }
-    return buildInputStream(new GetObjectRequest(config.getBucket(), 
objectPath(path)).withRange(from, from + size - 1));
+    return buildInputStream(
+        new GetObjectRequest(config.getBucket(), 
objectPath(path)).withRange(from, from + size - 1),
+        path
+    );
   }
 
-  private RetryingInputStream buildInputStream(GetObjectRequest 
getObjectRequest) throws IOException
+  private InputStream buildInputStream(GetObjectRequest getObjectRequest, 
String path)
   {
-    return new RetryingInputStream<>(
-        getObjectRequest,
-        new ObjectOpenFunction<GetObjectRequest>()
-        {
-          @Override
-          public InputStream open(GetObjectRequest object)
-          {
-            return s3Client.getObject(object).getObjectContent();
-          }
+    // fetch the size of the whole object to make chunks
+    long readEnd;
+    AtomicLong currReadStart = new AtomicLong(0);
+    if (getObjectRequest.getRange() != null) {
+      currReadStart.set(getObjectRequest.getRange()[0]);
+      readEnd = getObjectRequest.getRange()[1] + 1;
+    } else {
+      readEnd = this.s3Client.getObjectMetadata(config.getBucket(), 
objectPath(path)).getInstanceLength();
+    }
 
-          @Override
-          public InputStream open(GetObjectRequest object, long offset)
-          {
-            final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
-                object.getBucketName(),
-                object.getKey()
+    // build a sequence input stream from chunks
+    return new SequenceInputStream(new Enumeration<InputStream>()
+    {
+      @Override
+      public boolean hasMoreElements()
+      {
+        // don't stop until the whole object is downloaded
+        return currReadStart.get() < readEnd;
+      }
+
+      @Override
+      public InputStream nextElement()
+      {
+        File outFile = new File(config.getTempDir().getAbsolutePath(), 
UUID.randomUUID().toString());
+        // in a single chunk, only download a maximum of 
DOWNLOAD_MAX_CHUNK_SIZE
+        long endPoint = Math.min(currReadStart.get() + 
DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
+        try {
+          if (!outFile.createNewFile()) {
+            throw new IOE(
+                StringUtils.format(
+                    "Could not create temporary file [%s] for copying [%s]",
+                    outFile.getAbsolutePath(),
+                    objectPath(path)
+                )
             );
-            offsetObjectRequest.setRange(offset);
-            return open(offsetObjectRequest);
           }
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    );
+          FileUtils.copyLarge(
+              () -> new RetryingInputStream<>(
+                  new GetObjectRequest(
+                      config.getBucket(),
+                      objectPath(path)
+                  ).withRange(currReadStart.get(), endPoint),
+                  new ObjectOpenFunction<GetObjectRequest>()
+                  {
+                    @Override
+                    public InputStream open(GetObjectRequest object)
+                    {
+                      return s3Client.getObject(object).getObjectContent();
+                    }
+
+                    @Override
+                    public InputStream open(GetObjectRequest object, long 
offset)
+                    {
+                      if (object.getRange() != null) {
+                        long[] oldRange = object.getRange();
+                        object.setRange(oldRange[0] + offset, oldRange[1]);
+                      } else {
+                        object.setRange(offset);
+                      }
+                      return open(object);
+                    }
+                  },
+                  S3Utils.S3RETRY,
+                  config.getMaxRetry()
+              ),
+              outFile,
+              new byte[8 * 1024],
+              Predicates.alwaysFalse(),
+              1,
+              StringUtils.format("Retrying copying of [%s] to [%s]", 
objectPath(path), outFile.getAbsolutePath())
+          );
+        }
+        catch (IOException e) {
+          throw new UncheckedIOException(e);

Review Comment:
   What's an UncheckedIOException other than just a RuntimeException?



##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java:
##########
@@ -104,9 +105,15 @@ public void pathExists() throws IOException
   public void pathRead() throws IOException
   {
     EasyMock.reset(S3_CLIENT);
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    long contentLength = "test".getBytes(StandardCharsets.UTF_8).length;
+    objectMetadata.setContentLength(contentLength);
     S3Object s3Object = new S3Object();
     s3Object.setObjectContent(new 
ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)));
-    EasyMock.expect(S3_CLIENT.getObject(new GetObjectRequest(BUCKET, PREFIX + 
"/" + TEST_FILE))).andReturn(s3Object);
+    
EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata);
+    EasyMock.expect(S3_CLIENT.getObject(
+        new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, 
contentLength - 1))
+    ).andReturn(s3Object);
     EasyMock.replay(S3_CLIENT);

Review Comment:
   You don't have any testing for the retry behavior.  If retries exist with 
the configs, please test all of the different ways that retries can happen and 
whether we expect multiplicativity or not.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -78,35 +100,117 @@ public InputStream readRange(String path, long from, long 
size) throws IOExcepti
           size
       );
     }
-    return buildInputStream(new GetObjectRequest(config.getBucket(), 
objectPath(path)).withRange(from, from + size - 1));
+    return buildInputStream(
+        new GetObjectRequest(config.getBucket(), 
objectPath(path)).withRange(from, from + size - 1),
+        path
+    );
   }
 
-  private RetryingInputStream buildInputStream(GetObjectRequest 
getObjectRequest) throws IOException
+  private InputStream buildInputStream(GetObjectRequest getObjectRequest, 
String path)
   {
-    return new RetryingInputStream<>(
-        getObjectRequest,
-        new ObjectOpenFunction<GetObjectRequest>()
-        {
-          @Override
-          public InputStream open(GetObjectRequest object)
-          {
-            return s3Client.getObject(object).getObjectContent();
-          }
+    // fetch the size of the whole object to make chunks
+    long readEnd;
+    AtomicLong currReadStart = new AtomicLong(0);
+    if (getObjectRequest.getRange() != null) {
+      currReadStart.set(getObjectRequest.getRange()[0]);
+      readEnd = getObjectRequest.getRange()[1] + 1;
+    } else {
+      readEnd = this.s3Client.getObjectMetadata(config.getBucket(), 
objectPath(path)).getInstanceLength();
+    }
 
-          @Override
-          public InputStream open(GetObjectRequest object, long offset)
-          {
-            final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
-                object.getBucketName(),
-                object.getKey()
+    // build a sequence input stream from chunks
+    return new SequenceInputStream(new Enumeration<InputStream>()
+    {
+      @Override
+      public boolean hasMoreElements()
+      {
+        // don't stop until the whole object is downloaded
+        return currReadStart.get() < readEnd;
+      }
+
+      @Override
+      public InputStream nextElement()
+      {
+        File outFile = new File(config.getTempDir().getAbsolutePath(), 
UUID.randomUUID().toString());
+        // in a single chunk, only download a maximum of 
DOWNLOAD_MAX_CHUNK_SIZE
+        long endPoint = Math.min(currReadStart.get() + 
DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
+        try {
+          if (!outFile.createNewFile()) {
+            throw new IOE(
+                StringUtils.format(
+                    "Could not create temporary file [%s] for copying [%s]",
+                    outFile.getAbsolutePath(),
+                    objectPath(path)
+                )
             );
-            offsetObjectRequest.setRange(offset);
-            return open(offsetObjectRequest);
           }
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    );
+          FileUtils.copyLarge(
+              () -> new RetryingInputStream<>(

Review Comment:
   Do you really need to do this inside of a retrying input stream?  We've had 
a rash of issues where code was trying to retry a thing, and then a layer above 
it was trying to retry a thing and then a layer above that leading to 
multiplicative growth of retries and really long delays in processing.  Given 
that the whole fault-tolerance stuff exists and will retry failed tasks anyway, 
perhaps this isn't the right layer to add yet another set of retries?  Taht or 
like, limit it to only 2 retries at a max if we really do want to have a retry 
at this layer.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -78,35 +100,117 @@ public InputStream readRange(String path, long from, long 
size) throws IOExcepti
           size
       );
     }
-    return buildInputStream(new GetObjectRequest(config.getBucket(), 
objectPath(path)).withRange(from, from + size - 1));
+    return buildInputStream(
+        new GetObjectRequest(config.getBucket(), 
objectPath(path)).withRange(from, from + size - 1),
+        path
+    );
   }
 
-  private RetryingInputStream buildInputStream(GetObjectRequest 
getObjectRequest) throws IOException
+  private InputStream buildInputStream(GetObjectRequest getObjectRequest, 
String path)
   {
-    return new RetryingInputStream<>(
-        getObjectRequest,
-        new ObjectOpenFunction<GetObjectRequest>()
-        {
-          @Override
-          public InputStream open(GetObjectRequest object)
-          {
-            return s3Client.getObject(object).getObjectContent();
-          }
+    // fetch the size of the whole object to make chunks
+    long readEnd;
+    AtomicLong currReadStart = new AtomicLong(0);
+    if (getObjectRequest.getRange() != null) {
+      currReadStart.set(getObjectRequest.getRange()[0]);
+      readEnd = getObjectRequest.getRange()[1] + 1;
+    } else {
+      readEnd = this.s3Client.getObjectMetadata(config.getBucket(), 
objectPath(path)).getInstanceLength();
+    }
 
-          @Override
-          public InputStream open(GetObjectRequest object, long offset)
-          {
-            final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
-                object.getBucketName(),
-                object.getKey()
+    // build a sequence input stream from chunks
+    return new SequenceInputStream(new Enumeration<InputStream>()
+    {
+      @Override
+      public boolean hasMoreElements()
+      {
+        // don't stop until the whole object is downloaded
+        return currReadStart.get() < readEnd;
+      }
+
+      @Override
+      public InputStream nextElement()
+      {
+        File outFile = new File(config.getTempDir().getAbsolutePath(), 
UUID.randomUUID().toString());
+        // in a single chunk, only download a maximum of 
DOWNLOAD_MAX_CHUNK_SIZE
+        long endPoint = Math.min(currReadStart.get() + 
DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
+        try {
+          if (!outFile.createNewFile()) {
+            throw new IOE(
+                StringUtils.format(
+                    "Could not create temporary file [%s] for copying [%s]",
+                    outFile.getAbsolutePath(),
+                    objectPath(path)
+                )
             );
-            offsetObjectRequest.setRange(offset);
-            return open(offsetObjectRequest);
           }
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    );
+          FileUtils.copyLarge(
+              () -> new RetryingInputStream<>(
+                  new GetObjectRequest(
+                      config.getBucket(),
+                      objectPath(path)
+                  ).withRange(currReadStart.get(), endPoint),
+                  new ObjectOpenFunction<GetObjectRequest>()
+                  {
+                    @Override
+                    public InputStream open(GetObjectRequest object)
+                    {
+                      return s3Client.getObject(object).getObjectContent();
+                    }
+
+                    @Override
+                    public InputStream open(GetObjectRequest object, long 
offset)
+                    {
+                      if (object.getRange() != null) {
+                        long[] oldRange = object.getRange();
+                        object.setRange(oldRange[0] + offset, oldRange[1]);
+                      } else {
+                        object.setRange(offset);
+                      }
+                      return open(object);
+                    }
+                  },
+                  S3Utils.S3RETRY,
+                  config.getMaxRetry()
+              ),
+              outFile,
+              new byte[8 * 1024],
+              Predicates.alwaysFalse(),
+              1,
+              StringUtils.format("Retrying copying of [%s] to [%s]", 
objectPath(path), outFile.getAbsolutePath())
+          );
+        }
+        catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+        try {
+          AtomicBoolean isClosed = new AtomicBoolean(false);
+          return new FileInputStream(outFile)
+          {
+            @Override
+            public void close() throws IOException
+            {
+              // close should be idempotent
+              if (isClosed.get()) {
+                return;
+              }

Review Comment:
   I'm very much scared of places where the contract is that close can be 
called multiple times.  It's indicative of a lack of ability and understanding 
of when the lifecycle of an object is truly over and often highlights other 
problems.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to