This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6caa8a038f upload large s3 object by multi parts to overcome the 5GB
limit of PutObject (#10263)
6caa8a038f is described below
commit 6caa8a038f328feed381511c382c8593e06138a0
Author: Xiaobing <[email protected]>
AuthorDate: Sat Feb 11 10:04:42 2023 -0800
upload large s3 object by multi parts to overcome the 5GB limit of
PutObject (#10263)
---
.../apache/pinot/plugin/filesystem/S3Config.java | 27 ++++++-
.../apache/pinot/plugin/filesystem/S3PinotFS.java | 90 ++++++++++++++++++++--
.../pinot/plugin/filesystem/S3PinotFSTest.java | 27 +++++++
3 files changed, 136 insertions(+), 8 deletions(-)
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
index b9b6c1ca35..9d2fd6d3c2 100644
---
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
@@ -22,14 +22,18 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.util.UUID;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.DataSizeUtils;
/**
* S3 related config
*/
public class S3Config {
-
private static final boolean DEFAULT_DISABLE_ACL = true;
+ // From https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html,
the part number must be an integer
+ // between 1 and 10000, inclusive; and the min part size allowed is 5MiB,
except the last one.
+ private static final long MULTI_PART_UPLOAD_MIN_PART_SIZE = 5 * 1024 * 1024;
+ public static final int MULTI_PART_UPLOAD_MAX_PART_NUM = 10000;
public static final String ACCESS_KEY = "accessKey";
public static final String SECRET_KEY = "secretKey";
@@ -49,6 +53,9 @@ public class S3Config {
public static final String EXTERNAL_ID = "externalId";
public static final String SESSION_DURATION_SECONDS =
"sessionDurationSeconds";
public static final String ASYNC_SESSION_UPDATED_ENABLED =
"asyncSessionUpdateEnabled";
+ public static final String MIN_OBJECT_SIZE_FOR_MULTI_PART_UPLOAD =
"minObjectSizeForMultiPartUpload";
+ public static final String MULTI_PART_UPLOAD_PART_SIZE =
"multiPartUploadPartSize";
+ private static final String DEFAULT_MULTI_PART_UPLOAD_PART_SIZE = "128MB";
public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
@@ -69,6 +76,8 @@ public class S3Config {
private String _externalId;
private int _sessionDurationSeconds;
private boolean _asyncSessionUpdateEnabled;
+ private final long _minObjectSizeForMultiPartUpload;
+ private final long _multiPartUploadPartSize;
public S3Config(PinotConfiguration pinotConfig) {
_disableAcl = pinotConfig.getProperty(DISABLE_ACL_CONFIG_KEY,
DEFAULT_DISABLE_ACL);
@@ -91,7 +100,13 @@ public class S3Config {
Integer.parseInt(pinotConfig.getProperty(SESSION_DURATION_SECONDS,
DEFAULT_SESSION_DURATION_SECONDS));
_asyncSessionUpdateEnabled = Boolean.parseBoolean(
pinotConfig.getProperty(ASYNC_SESSION_UPDATED_ENABLED,
DEFAULT_ASYNC_SESSION_UPDATED_ENABLED));
-
+ // non-positive values to disable multipart upload.
+ _minObjectSizeForMultiPartUpload =
+
DataSizeUtils.toBytes(pinotConfig.getProperty(MIN_OBJECT_SIZE_FOR_MULTI_PART_UPLOAD,
"-1"));
+ _multiPartUploadPartSize = DataSizeUtils.toBytes(
+ pinotConfig.getProperty(MULTI_PART_UPLOAD_PART_SIZE,
DEFAULT_MULTI_PART_UPLOAD_PART_SIZE));
+ Preconditions.checkArgument(_multiPartUploadPartSize >
MULTI_PART_UPLOAD_MIN_PART_SIZE,
+ "The part size for multipart upload must be larger than 5MB");
if (_iamRoleBasedAccess) {
Preconditions.checkNotNull(_roleArn, "Must provide 'roleArn' if
iamRoleBasedAccess is enabled");
}
@@ -152,4 +167,12 @@ public class S3Config {
public boolean isAsyncSessionUpdateEnabled() {
return _asyncSessionUpdateEnabled;
}
+
+ public long getMinObjectSizeForMultiPartUpload() {
+ return _minObjectSizeForMultiPartUpload;
+ }
+
+ public long getMultiPartUploadPartSize() {
+ return _multiPartUploadPartSize;
+ }
}
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index ca2a938ef0..db976415f3 100644
---
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -18,10 +18,12 @@
*/
package org.apache.pinot.plugin.filesystem;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
@@ -31,6 +33,7 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -52,8 +55,14 @@ import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -69,6 +78,8 @@ import
software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.sts.StsClient;
import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
@@ -87,6 +98,8 @@ public class S3PinotFS extends BasePinotFS {
private ServerSideEncryption _serverSideEncryption = null;
private String _ssekmsKeyId;
private String _ssekmsEncryptionContext;
+ private long _minObjectSizeToUploadInParts;
+ private long _multiPartUploadPartSize;
@Override
public void init(PinotConfiguration config) {
@@ -135,6 +148,7 @@ public class S3PinotFS extends BasePinotFS {
}
}
_s3Client = s3ClientBuilder.build();
+ setMultiPartUploadConfigs(s3Config);
} catch (S3Exception e) {
throw new RuntimeException("Could not initialize S3PinotFS", e);
}
@@ -147,6 +161,7 @@ public class S3PinotFS extends BasePinotFS {
*/
public void init(S3Client s3Client) {
_s3Client = s3Client;
+ setMultiPartUploadConfigs(-1, -1);
}
/**
@@ -157,7 +172,9 @@ public class S3PinotFS extends BasePinotFS {
*/
public void init(S3Client s3Client, String serverSideEncryption,
PinotConfiguration serverSideEncryptionConfig) {
_s3Client = s3Client;
- setServerSideEncryption(serverSideEncryption, new
S3Config(serverSideEncryptionConfig));
+ S3Config s3Config = new S3Config(serverSideEncryptionConfig);
+ setServerSideEncryption(serverSideEncryption, s3Config);
+ setMultiPartUploadConfigs(s3Config);
}
private void setServerSideEncryption(@Nullable String serverSideEncryption,
S3Config s3Config) {
@@ -551,11 +568,72 @@ public class S3PinotFS extends BasePinotFS {
@Override
public void copyFromLocalFile(File srcFile, URI dstUri)
throws Exception {
- LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
- URI base = getBase(dstUri);
- String prefix = sanitizePath(base.relativize(dstUri).getPath());
- PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri,
prefix);
- _s3Client.putObject(putObjectRequest, srcFile.toPath());
+ if (_minObjectSizeToUploadInParts > 0 && srcFile.length() >
_minObjectSizeToUploadInParts) {
+ LOGGER.info("Copy {} from local to {} in parts",
srcFile.getAbsolutePath(), dstUri);
+ uploadFileInParts(srcFile, dstUri);
+ } else {
+ LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(),
dstUri);
+ String prefix =
sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+ PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri,
prefix);
+ _s3Client.putObject(putObjectRequest, srcFile.toPath());
+ }
+ }
+
+ private void uploadFileInParts(File srcFile, URI dstUri)
+ throws Exception {
+ String bucket = dstUri.getHost();
+ String prefix = sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+ CreateMultipartUploadResponse multipartUpload =
+
_s3Client.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket).key(prefix).build());
+ String uploadId = multipartUpload.uploadId();
+ // Upload parts sequentially to overcome the 5GB limit of a single
PutObject call.
+ // TODO: parts can be uploaded in parallel for higher throughput, given a
thread pool.
+ try (FileInputStream inputStream = FileUtils.openInputStream(srcFile)) {
+ long totalUploaded = 0;
+ long fileSize = srcFile.length();
+ // The part number must start from 1 and no more than the max part num
allowed, 10000 by default.
+ // The default configs can upload a single file of 1TB, so the if-branch
should rarely happen.
+ int partNum = 1;
+ long partSizeToUse = _multiPartUploadPartSize;
+ if (partSizeToUse * S3Config.MULTI_PART_UPLOAD_MAX_PART_NUM < fileSize) {
+ partSizeToUse =
+ (fileSize + S3Config.MULTI_PART_UPLOAD_MAX_PART_NUM - 1) /
S3Config.MULTI_PART_UPLOAD_MAX_PART_NUM;
+ LOGGER.info("Increased part size from {} to {} for large file size {}
due to max allowed uploads {}",
+ _multiPartUploadPartSize, partSizeToUse, fileSize,
S3Config.MULTI_PART_UPLOAD_MAX_PART_NUM);
+ }
+ List<CompletedPart> parts = new ArrayList<>();
+ while (totalUploaded < srcFile.length()) {
+ long nextPartSize = Math.min(partSizeToUse, fileSize - totalUploaded);
+ UploadPartResponse uploadPartResponse = _s3Client.uploadPart(
+
UploadPartRequest.builder().bucket(bucket).key(prefix).uploadId(uploadId).partNumber(partNum).build(),
+ RequestBody.fromInputStream(inputStream, nextPartSize));
+
parts.add(CompletedPart.builder().partNumber(partNum).eTag(uploadPartResponse.eTag()).build());
+ totalUploaded += nextPartSize;
+ LOGGER.debug("Uploaded part {} of size {}, with total uploaded {} and
file size {}", partNum, nextPartSize,
+ totalUploaded, fileSize);
+ // set counters to upload the next part.
+ partNum++;
+ }
+ // complete the multipart upload
+ _s3Client.completeMultipartUpload(
+
CompleteMultipartUploadRequest.builder().uploadId(uploadId).bucket(bucket).key(prefix)
+
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build()).build());
+ } catch (Exception e) {
+ LOGGER.error("Failed to upload file {} to {} in parts. Abort upload
request: {}", srcFile, dstUri, uploadId, e);
+ _s3Client.abortMultipartUpload(
+
AbortMultipartUploadRequest.builder().uploadId(uploadId).bucket(bucket).key(prefix).build());
+ throw e;
+ }
+ }
+
+ private void setMultiPartUploadConfigs(S3Config s3Config) {
+ setMultiPartUploadConfigs(s3Config.getMinObjectSizeForMultiPartUpload(),
s3Config.getMultiPartUploadPartSize());
+ }
+
+ @VisibleForTesting
+ void setMultiPartUploadConfigs(long minObjectSizeToUploadInParts, long
multiPartUploadPartSize) {
+ _minObjectSizeToUploadInParts = minObjectSizeToUploadInParts;
+ _multiPartUploadPartSize = multiPartUploadPartSize;
}
@Override
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
index b877c29e91..7c6cc6dbf7 100644
---
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
@@ -349,6 +349,33 @@ public class S3PinotFSTest {
fileToDownload.deleteOnExit();
}
+ @Test
+ public void testMultiPartUpload()
+ throws Exception {
+ String fileName = "copyFile.txt";
+
+ File fileToCopy = new
File(getClass().getClassLoader().getResource(fileName).getFile());
+
+ // input file size is 20
+ _s3PinotFS.setMultiPartUploadConfigs(1, 3);
+ try {
+ _s3PinotFS.copyFromLocalFile(fileToCopy,
URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
+ } finally {
+ // disable multipart upload again for the other UT cases.
+ _s3PinotFS.setMultiPartUploadConfigs(-1, 128 * 1024 * 1024);
+ }
+
+ HeadObjectResponse headObjectResponse =
_s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
+
+ Assert.assertEquals(headObjectResponse.contentLength(), (Long)
fileToCopy.length());
+
+ File fileToDownload = new
File("copyFile_download_multipart.txt").getAbsoluteFile();
+ _s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME,
BUCKET, fileName)), fileToDownload);
+ Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
+
+ fileToDownload.deleteOnExit();
+ }
+
@Test
public void testOpenFile()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]