[
https://issues.apache.org/jira/browse/NIFI-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15028152#comment-15028152
]
ASF GitHub Bot commented on NIFI-1107:
--------------------------------------
Github user trkurc commented on a diff in the pull request:
https://github.com/apache/nifi/pull/132#discussion_r45944159
--- Diff:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
---
@@ -143,23 +316,223 @@ public void process(final InputStream rawIn) throws
IOException {
objectMetadata.setUserMetadata(userMetadata);
}
- final PutObjectRequest request = new
PutObjectRequest(bucket, key, in, objectMetadata);
-
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
- final AccessControlList acl = createACL(context,
ff);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
+ if (ff.getSize() <= multipartThreshold) {
+ //----------------------------------------
+ // single part upload
+ //----------------------------------------
+ final PutObjectRequest request = new
PutObjectRequest(bucket, key, in, objectMetadata);
+ request.setStorageClass(
+
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ final AccessControlList acl =
createACL(context, ff);
+ if (acl != null) {
+ request.setAccessControlList(acl);
+ }
- final PutObjectResult result =
s3.putObject(request);
- if (result.getVersionId() != null) {
- attributes.put("s3.version",
result.getVersionId());
- }
+ try {
+ final PutObjectResult result =
s3.putObject(request);
+ if (result.getVersionId() != null) {
+ attributes.put(S3_VERSION_ATTR_KEY,
result.getVersionId());
+ }
+ if (result.getETag() != null) {
+ attributes.put(S3_ETAG_ATTR_KEY,
result.getETag());
+ }
+ if (result.getExpirationTime() != null) {
+ attributes.put(S3_EXPIRATION_ATTR_KEY,
result.getExpirationTime().toString());
+ }
+ if
(result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY))
{
+
attributes.put(S3_STORAGECLASS_ATTR_KEY,
+
result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
+ }
+ if (userMetadata.size() > 0) {
+ StringBuilder userMetaBldr = new
StringBuilder();
+ for (String userKey :
userMetadata.keySet()) {
+
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+ }
+ attributes.put(S3_USERMETA_ATTR_KEY,
userMetaBldr.toString());
+ }
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure completing
upload flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename, bucket,
key, e.getMessage()});
+ throw (e);
+ }
+ } else {
+ //----------------------------------------
+ // multippart upload
+ //----------------------------------------
- attributes.put("s3.etag", result.getETag());
+ // load or create persistent state
+
//------------------------------------------------------------
+ MultipartState currentState;
+ try {
+ currentState = getState(cacheKey);
+ if (currentState != null) {
+ if (currentState.getPartETags().size()
> 0) {
+ final PartETag lastETag =
currentState.getPartETags().get(
+
currentState.getPartETags().size() - 1);
+ getLogger().info("Resuming upload
for flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}'
filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}'
partsLoaded={} lastPart={}/{}",
+ new Object[]{ffFilename,
bucket, key, currentState.getUploadId(),
+
currentState.getFilePosition(), currentState.getPartSize(),
+
currentState.getStorageClass().toString(),
+
currentState.getContentLength(),
+
currentState.getPartETags().size(),
+
Integer.toString(lastETag.getPartNumber()),
+
lastETag.getETag()});
+ } else {
+ getLogger().info("Resuming upload
for flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}'
filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}' no
partsLoaded",
+ new Object[]{ffFilename,
bucket, key, currentState.getUploadId(),
+
currentState.getFilePosition(), currentState.getPartSize(),
+
currentState.getStorageClass().toString(),
+
currentState.getContentLength()});
+ }
+ } else {
+ currentState = new MultipartState();
+
currentState.setPartSize(multipartPartSize);
+ currentState.setStorageClass(
+
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+
currentState.setContentLength(ff.getSize());
+ persistState(cacheKey, currentState);
+ getLogger().info("Starting new upload
for flowfile='{}' bucket='{}' key='{}'",
+ new Object[]{ffFilename,
bucket, key});
+ }
+ } catch (IOException e) {
+ getLogger().error("IOException initiating
cache state while processing flow files: " +
+ e.getMessage());
+ throw (e);
+ }
+
+ // initiate multipart upload or find position
in file
+
//------------------------------------------------------------
+ if (currentState.getUploadId().isEmpty()) {
+ final InitiateMultipartUploadRequest
initiateRequest =
+ new
InitiateMultipartUploadRequest(bucket, key, objectMetadata);
+
initiateRequest.setStorageClass(currentState.getStorageClass());
+ final AccessControlList acl =
createACL(context, ff);
+ if (acl != null) {
+
initiateRequest.setAccessControlList(acl);
+ }
+ try {
+ final InitiateMultipartUploadResult
initiateResult =
+
s3.initiateMultipartUpload(initiateRequest);
+
currentState.setUploadId(initiateResult.getUploadId());
+ currentState.getPartETags().clear();
+ try {
+ persistState(cacheKey,
currentState);
+ } catch (Exception e) {
+ getLogger().info("Exception saving
cache state while processing flow file: " +
+ e.getMessage());
+ throw(new
ProcessException("Exception saving cache state", e));
+ }
+ getLogger().info("Success initiating
upload flowfile={} available={} position={} " +
+ "length={} bucket={} key={}
uploadId={}",
+ new Object[]{ffFilename,
in.available(), currentState.getFilePosition(),
+
currentState.getContentLength(), bucket, key,
+
currentState.getUploadId()});
+ if (initiateResult.getUploadId() !=
null) {
+
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
+ }
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure initiating
upload flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename,
bucket, key, e.getMessage()});
+ throw(e);
+ }
+ } else {
+ if (currentState.getFilePosition() > 0) {
+ try {
+ final long skipped =
in.skip(currentState.getFilePosition());
+ if (skipped !=
currentState.getFilePosition()) {
+ getLogger().info("Failure
skipping to resume upload flowfile={} " +
+ "bucket={} key={}
position={} skipped={}",
+ new
Object[]{ffFilename, bucket, key,
+
currentState.getFilePosition(), skipped});
+ }
+ } catch (Exception e) {
+ getLogger().info("Failure skipping
to resume upload flowfile={} bucket={} " +
+ "key={} position={}
reason={}",
+ new Object[]{ffFilename,
bucket, key, currentState.getFilePosition(),
+ e.getMessage()});
+ throw(new ProcessException(e));
+ }
+ }
+ }
+
+ // upload parts
+
//------------------------------------------------------------
+ long thisPartSize;
+ for (int part =
currentState.getPartETags().size() + 1;
+ currentState.getFilePosition() <
currentState.getContentLength(); part++) {
+ if (!PutS3Object.this.isScheduled()) {
+ getLogger().info("Processor
unscheduled, stopping upload flowfile={} part={} " +
+ "uploadId={}", new
Object[]{ffFilename, part, currentState.getUploadId()});
+ session.rollback();
+ return;
+ }
+ thisPartSize =
Math.min(currentState.getPartSize(),
+ (currentState.getContentLength() -
currentState.getFilePosition()));
+ UploadPartRequest uploadRequest = new
UploadPartRequest()
+ .withBucketName(bucket)
+ .withKey(key)
+
.withUploadId(currentState.getUploadId())
+ .withInputStream(in)
+ .withPartNumber(part)
+ .withPartSize(thisPartSize);
+ try {
+ UploadPartResult uploadPartResult =
s3.uploadPart(uploadRequest);
+
currentState.addPartETag(uploadPartResult.getPartETag());
+
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
+ try {
+ persistState(cacheKey,
currentState);
+ } catch (Exception e) {
--- End diff --
I'd add a comment here on why swallowing exceptions here is okay, as it is
not immediately intuitive.
> Create new PutS3ObjectMultipart processor
> -----------------------------------------
>
> Key: NIFI-1107
> URL: https://issues.apache.org/jira/browse/NIFI-1107
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Reporter: Joe Skora
> Assignee: Joe Skora
> Labels: s3
> Fix For: 0.5.0
>
>
> A new `PutS3ObjectMultipart` processor using the AWS S3 API to upload files
> larger than those supported by `PutS3Object` which has a [5GB
> limit|http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html]
> limit.
> To support S3 compatible endpoints this will also add an `Endpoint Override
> URL` property to `AbstractAWSProcessor` to set the service
> [endpoint|http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/AmazonWebServiceClient.html#setEndpoint(java.lang.String)]
> to override the endpoint URL normally selected based on the the Amazon
> region.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)