danielcweeks commented on a change in pull request #1767:
URL: https://github.com/apache/iceberg/pull/1767#discussion_r526296702
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -19,42 +19,83 @@
package org.apache.iceberg.aws.s3;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.io.UncheckedIOException;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Predicates;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
class S3OutputStream extends PositionOutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(S3OutputStream.class);
+ static final String MULTIPART_SIZE = "s3fileio.multipart.size";
+ static final String MULTIPART_THRESHOLD_FACTOR =
"s3fileio.multipart.threshold";
+
+ static final int MIN_MULTIPART_UPLOAD_SIZE = 5 * 1024 * 1024;
+ static final int DEFAULT_MULTIPART_SIZE = 32 * 1024 * 1024;
+ static final double DEFAULT_MULTIPART_THRESHOLD = 1.5;
+
private final StackTraceElement[] createStack;
private final S3Client s3;
private final S3URI location;
- private final OutputStream stream;
- private final File stagingFile;
- private long pos = 0;
+ private CountingOutputStream stream;
+ private final List<File> stagingFiles = Lists.newArrayList();
+ private File currentStagingFile;
+ private String multipartUploadId;
+ private final Map<File, CompletableFuture<CompletedPart>> multiPartMap =
Maps.newHashMap();
+ private final int multiPartSize;
+ private final double multiPartThresholdFactor;
Review comment:
I updated the code to use a threshold size, but still expose the factor
as the user facing property to adjust.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]