jackye1995 commented on a change in pull request #1767:
URL: https://github.com/apache/iceberg/pull/1767#discussion_r523154834
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
##########
@@ -29,10 +31,18 @@
private final S3Client client;
private final S3URI uri;
private HeadObjectResponse metadata;
+ private Map<String, String> properties;
BaseS3File(S3Client client, S3URI uri) {
this.client = client;
this.uri = uri;
+ this.properties = Collections.emptyMap();
Review comment:
In #1754 I chose to directly store the necessary arguments instead of
storing the entire property map. Not sure which way is better, what do you
think?
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -19,42 +19,83 @@
package org.apache.iceberg.aws.s3;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.io.UncheckedIOException;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Predicates;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
class S3OutputStream extends PositionOutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(S3OutputStream.class);
+ static final String MULTIPART_SIZE = "s3fileio.multipart.size";
Review comment:
Also in #1754 and #1633 I created a file `AwsCatalogProperties` to
centralize all the properties, we can move all the properties there. I am not
sure if `.` is allowed character in Spark and Flink catalog config property,
let me verify that and come back later.
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -87,18 +168,105 @@ public void close() throws IOException {
super.close();
closed = true;
+ currentStagingFile = null;
try {
stream.close();
- s3.putObject(
-
PutObjectRequest.builder().bucket(location.bucket()).key(location.key()).build(),
- RequestBody.fromFile(stagingFile));
+ completeUploads();
} finally {
- if (!stagingFile.delete()) {
- LOG.warn("Could not delete temporary file: {}", stagingFile);
+ stagingFiles.forEach(f -> {
Review comment:
Can we also delete the temp file progressively instead of deleting
everything after close() is called?
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -87,18 +168,105 @@ public void close() throws IOException {
super.close();
closed = true;
+ currentStagingFile = null;
try {
stream.close();
- s3.putObject(
-
PutObjectRequest.builder().bucket(location.bucket()).key(location.key()).build(),
- RequestBody.fromFile(stagingFile));
+ completeUploads();
} finally {
- if (!stagingFile.delete()) {
- LOG.warn("Could not delete temporary file: {}", stagingFile);
+ stagingFiles.forEach(f -> {
+ if (f.exists() && !f.delete()) {
+ LOG.warn("Could not delete temporary file: {}", f);
+ }
+ });
+ }
+ }
+
+ private void initializeMultiPartUpload() {
+ multipartUploadId =
s3.createMultipartUpload(CreateMultipartUploadRequest.builder()
+ .bucket(location.bucket()).key(location.key()).build()).uploadId();
+ }
+
+ private void uploadParts() {
+ // exit if multipart has not been initiated
+ if (multipartUploadId == null) {
+ return;
+ }
+
+ stagingFiles.stream()
+ // do not upload the file currently being written
+ .filter(f -> currentStagingFile == null ||
!currentStagingFile.equals(f))
+ // do not upload any files that have already been processed
+ .filter(Predicates.not(multiPartMap::containsKey))
+ .forEach(f -> {
+ UploadPartRequest uploadRequest = UploadPartRequest.builder()
+ .bucket(location.bucket())
+ .key(location.key())
+ .uploadId(multipartUploadId)
+ .partNumber(stagingFiles.indexOf(f) + 1)
+ .contentLength(f.length())
+ .build();
+
+ CompletableFuture<CompletedPart> future =
CompletableFuture.supplyAsync(
Review comment:
If we want to make it async, can we directly leverage `S3AsyncClient`?
##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
##########
@@ -19,42 +19,83 @@
package org.apache.iceberg.aws.s3;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.io.UncheckedIOException;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Predicates;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
class S3OutputStream extends PositionOutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(S3OutputStream.class);
+ static final String MULTIPART_SIZE = "s3fileio.multipart.size";
+ static final String MULTIPART_THRESHOLD_FACTOR =
"s3fileio.multipart.threshold";
+
+ static final int MIN_MULTIPART_UPLOAD_SIZE = 5 * 1024 * 1024;
+ static final int DEFAULT_MULTIPART_SIZE = 32 * 1024 * 1024;
+ static final double DEFAULT_MULTIPART_THRESHOLD = 1.5;
+
private final StackTraceElement[] createStack;
private final S3Client s3;
private final S3URI location;
- private final OutputStream stream;
- private final File stagingFile;
- private long pos = 0;
+ private CountingOutputStream stream;
+ private final List<File> stagingFiles = Lists.newArrayList();
+ private File currentStagingFile;
+ private String multipartUploadId;
+ private final Map<File, CompletableFuture<CompletedPart>> multiPartMap =
Maps.newHashMap();
+ private final int multiPartSize;
+ private final double multiPartThresholdFactor;
Review comment:
Looks like `multiPartThresholdFactor` is only used once in `write` with
`multiPartSize * multiPartThresholdFactor`, so why not just use a configuration
`multiPartThresholdSize` that defines the actual threshold to start multipart
upload?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]