rdblue commented on a change in pull request #1767:
URL: https://github.com/apache/iceberg/pull/1767#discussion_r523281702
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -19,42 +19,105 @@
package org.apache.iceberg.aws.s3;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.io.UncheckedIOException;
import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Predicates;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream;
+import
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
class S3OutputStream extends PositionOutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(S3OutputStream.class);
+ static final String UPLOAD_POOL_SIZE = "s3fileio.multipart.num-threads";
+ static final String MULTIPART_SIZE = "s3fileio.multipart.part.size";
+ static final String MULTIPART_THRESHOLD_FACTOR =
"s3fileio.multipart.threshold";
+
+ static final int DEFAULT_UPLOAD_WORKER_POOL_SIZE =
Runtime.getRuntime().availableProcessors();
+ static final int MIN_MULTIPART_UPLOAD_SIZE = 5 * 1024 * 1024;
+ static final int DEFAULT_MULTIPART_SIZE = 32 * 1024 * 1024;
+ static final double DEFAULT_MULTIPART_THRESHOLD = 1.5;
+
+ private static ExecutorService executorService;
+
private final StackTraceElement[] createStack;
private final S3Client s3;
private final S3URI location;
- private final OutputStream stream;
- private final File stagingFile;
- private long pos = 0;
+ private CountingOutputStream stream;
+ private final List<File> stagingFiles = Lists.newArrayList();
+ private File currentStagingFile;
+ private String multipartUploadId;
+ private final Map<File, CompletableFuture<CompletedPart>> multiPartMap =
Maps.newHashMap();
+ private final int multiPartSize;
+ private final double multiPartThresholdFactor;
+ private long pos = 0;
private boolean closed = false;
- S3OutputStream(S3Client s3, S3URI location) throws IOException {
+ S3OutputStream(Map<String, String> properties, S3Client s3, S3URI location)
throws IOException {
+ synchronized (this) {
+ if (executorService == null) {
+ executorService = MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(
+ Integer.parseInt(properties.getOrDefault(UPLOAD_POOL_SIZE,
Review comment:
Iceberg has `PropertyUtil.propertyAsInt` for this situation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]