[ 
https://issues.apache.org/jira/browse/NIFI-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15028152#comment-15028152
 ] 

ASF GitHub Bot commented on NIFI-1107:
--------------------------------------

Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/132#discussion_r45944159
  
    --- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 ---
    @@ -143,23 +316,223 @@ public void process(final InputStream rawIn) throws 
IOException {
                                 objectMetadata.setUserMetadata(userMetadata);
                             }
     
    -                        final PutObjectRequest request = new 
PutObjectRequest(bucket, key, in, objectMetadata);
    -                        
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    -                        final AccessControlList acl = createACL(context, 
ff);
    -                        if (acl != null) {
    -                            request.setAccessControlList(acl);
    -                        }
    +                        if (ff.getSize() <= multipartThreshold) {
    +                            //----------------------------------------
    +                            // single part upload
    +                            //----------------------------------------
    +                            final PutObjectRequest request = new 
PutObjectRequest(bucket, key, in, objectMetadata);
    +                            request.setStorageClass(
    +                                    
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    +                            final AccessControlList acl = 
createACL(context, ff);
    +                            if (acl != null) {
    +                                request.setAccessControlList(acl);
    +                            }
     
    -                        final PutObjectResult result = 
s3.putObject(request);
    -                        if (result.getVersionId() != null) {
    -                            attributes.put("s3.version", 
result.getVersionId());
    -                        }
    +                            try {
    +                                final PutObjectResult result = 
s3.putObject(request);
    +                                if (result.getVersionId() != null) {
    +                                    attributes.put(S3_VERSION_ATTR_KEY, 
result.getVersionId());
    +                                }
    +                                if (result.getETag() != null) {
    +                                    attributes.put(S3_ETAG_ATTR_KEY, 
result.getETag());
    +                                }
    +                                if (result.getExpirationTime() != null) {
    +                                    attributes.put(S3_EXPIRATION_ATTR_KEY, 
result.getExpirationTime().toString());
    +                                }
    +                                if 
(result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY))
 {
    +                                    
attributes.put(S3_STORAGECLASS_ATTR_KEY,
    +                                            
result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
    +                                }
    +                                if (userMetadata.size() > 0) {
    +                                    StringBuilder userMetaBldr = new 
StringBuilder();
    +                                    for (String userKey : 
userMetadata.keySet()) {
    +                                        
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
    +                                    }
    +                                    attributes.put(S3_USERMETA_ATTR_KEY, 
userMetaBldr.toString());
    +                                }
    +                            } catch (AmazonClientException e) {
    +                                getLogger().info("Failure completing 
upload flowfile={} bucket={} key={} reason={}",
    +                                        new Object[]{ffFilename, bucket, 
key, e.getMessage()});
    +                                throw (e);
    +                            }
    +                        } else {
    +                            //----------------------------------------
    +                            // multippart upload
    +                            //----------------------------------------
     
    -                        attributes.put("s3.etag", result.getETag());
    +                            // load or create persistent state
    +                            
//------------------------------------------------------------
    +                            MultipartState currentState;
    +                            try {
    +                                currentState = getState(cacheKey);
    +                                if (currentState != null) {
    +                                    if (currentState.getPartETags().size() 
> 0) {
    +                                        final PartETag lastETag = 
currentState.getPartETags().get(
    +                                                
currentState.getPartETags().size() - 1);
    +                                        getLogger().info("Resuming upload 
for flowfile='{}' bucket='{}' key='{}' " +
    +                                                "uploadID='{}' 
filePosition='{}' partSize='{}' storageClass='{}' " +
    +                                                "contentLength='{}' 
partsLoaded={} lastPart={}/{}",
    +                                                new Object[]{ffFilename, 
bucket, key, currentState.getUploadId(),
    +                                                        
currentState.getFilePosition(), currentState.getPartSize(),
    +                                                        
currentState.getStorageClass().toString(),
    +                                                        
currentState.getContentLength(),
    +                                                        
currentState.getPartETags().size(),
    +                                                        
Integer.toString(lastETag.getPartNumber()),
    +                                                        
lastETag.getETag()});
    +                                    } else {
    +                                        getLogger().info("Resuming upload 
for flowfile='{}' bucket='{}' key='{}' " +
    +                                                "uploadID='{}' 
filePosition='{}' partSize='{}' storageClass='{}' " +
    +                                                "contentLength='{}' no 
partsLoaded",
    +                                                new Object[]{ffFilename, 
bucket, key, currentState.getUploadId(),
    +                                                        
currentState.getFilePosition(), currentState.getPartSize(),
    +                                                        
currentState.getStorageClass().toString(),
    +                                                        
currentState.getContentLength()});
    +                                    }
    +                                } else {
    +                                    currentState = new MultipartState();
    +                                    
currentState.setPartSize(multipartPartSize);
    +                                    currentState.setStorageClass(
    +                                            
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    +                                    
currentState.setContentLength(ff.getSize());
    +                                    persistState(cacheKey, currentState);
    +                                    getLogger().info("Starting new upload 
for flowfile='{}' bucket='{}' key='{}'",
    +                                            new Object[]{ffFilename, 
bucket, key});
    +                                }
    +                            } catch (IOException e) {
    +                                getLogger().error("IOException initiating 
cache state while processing flow files: " +
    +                                        e.getMessage());
    +                                throw (e);
    +                            }
    +
    +                            // initiate multipart upload or find position 
in file
    +                            
//------------------------------------------------------------
    +                            if (currentState.getUploadId().isEmpty()) {
    +                                final InitiateMultipartUploadRequest 
initiateRequest =
    +                                        new 
InitiateMultipartUploadRequest(bucket, key, objectMetadata);
    +                                
initiateRequest.setStorageClass(currentState.getStorageClass());
    +                                final AccessControlList acl = 
createACL(context, ff);
    +                                if (acl != null) {
    +                                    
initiateRequest.setAccessControlList(acl);
    +                                }
    +                                try {
    +                                    final InitiateMultipartUploadResult 
initiateResult =
    +                                            
s3.initiateMultipartUpload(initiateRequest);
    +                                    
currentState.setUploadId(initiateResult.getUploadId());
    +                                    currentState.getPartETags().clear();
    +                                    try {
    +                                        persistState(cacheKey, 
currentState);
    +                                    } catch (Exception e) {
    +                                        getLogger().info("Exception saving 
cache state while processing flow file: " +
    +                                                e.getMessage());
    +                                        throw(new 
ProcessException("Exception saving cache state", e));
    +                                    }
    +                                    getLogger().info("Success initiating 
upload flowfile={} available={} position={} " +
    +                                            "length={} bucket={} key={} 
uploadId={}",
    +                                            new Object[]{ffFilename, 
in.available(), currentState.getFilePosition(),
    +                                                    
currentState.getContentLength(), bucket, key,
    +                                                    
currentState.getUploadId()});
    +                                    if (initiateResult.getUploadId() != 
null) {
    +                                        
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
    +                                    }
    +                                } catch (AmazonClientException e) {
    +                                    getLogger().info("Failure initiating 
upload flowfile={} bucket={} key={} reason={}",
    +                                            new Object[]{ffFilename, 
bucket, key, e.getMessage()});
    +                                    throw(e);
    +                                }
    +                            } else {
    +                                if (currentState.getFilePosition() > 0) {
    +                                    try {
    +                                        final long skipped = 
in.skip(currentState.getFilePosition());
    +                                        if (skipped != 
currentState.getFilePosition()) {
    +                                            getLogger().info("Failure 
skipping to resume upload flowfile={} " +
    +                                                    "bucket={} key={} 
position={} skipped={}",
    +                                                    new 
Object[]{ffFilename, bucket, key,
    +                                                            
currentState.getFilePosition(), skipped});
    +                                        }
    +                                    } catch (Exception e) {
    +                                        getLogger().info("Failure skipping 
to resume upload flowfile={} bucket={} " +
    +                                                "key={} position={} 
reason={}",
    +                                                new Object[]{ffFilename, 
bucket, key, currentState.getFilePosition(),
    +                                                        e.getMessage()});
    +                                        throw(new ProcessException(e));
    +                                    }
    +                                }
    +                            }
    +
    +                            // upload parts
    +                            
//------------------------------------------------------------
    +                            long thisPartSize;
    +                            for (int part = 
currentState.getPartETags().size() + 1;
    +                                 currentState.getFilePosition() < 
currentState.getContentLength(); part++) {
    +                                if (!PutS3Object.this.isScheduled()) {
    +                                    getLogger().info("Processor 
unscheduled, stopping upload flowfile={} part={} " +
    +                                            "uploadId={}", new 
Object[]{ffFilename, part, currentState.getUploadId()});
    +                                    session.rollback();
    +                                    return;
    +                                }
    +                                thisPartSize = 
Math.min(currentState.getPartSize(),
    +                                        (currentState.getContentLength() - 
currentState.getFilePosition()));
    +                                UploadPartRequest uploadRequest = new 
UploadPartRequest()
    +                                        .withBucketName(bucket)
    +                                        .withKey(key)
    +                                        
.withUploadId(currentState.getUploadId())
    +                                        .withInputStream(in)
    +                                        .withPartNumber(part)
    +                                        .withPartSize(thisPartSize);
    +                                try {
    +                                    UploadPartResult uploadPartResult = 
s3.uploadPart(uploadRequest);
    +                                    
currentState.addPartETag(uploadPartResult.getPartETag());
    +                                    
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
    +                                    try {
    +                                        persistState(cacheKey, 
currentState);
    +                                    } catch (Exception e) {
    --- End diff --
    
    I'd add a comment here on why swallowing exceptions here is okay, as it is 
not immediately intuitive.


> Create new PutS3ObjectMultipart processor
> -----------------------------------------
>
>                 Key: NIFI-1107
>                 URL: https://issues.apache.org/jira/browse/NIFI-1107
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>            Reporter: Joe Skora
>            Assignee: Joe Skora
>              Labels: s3
>             Fix For: 0.5.0
>
>
> A new `PutS3ObjectMultipart` processor using the AWS S3 API to upload files 
> larger than those supported by `PutS3Object` which has a [5GB 
> limit|http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html] 
> limit.
> To support S3 compatible endpoints this will also add an `Endpoint Override 
> URL` property to `AbstractAWSProcessor` to set the service 
> [endpoint|http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/AmazonWebServiceClient.html#setEndpoint(java.lang.String)]
>  to override the endpoint URL normally selected based on the the Amazon 
> region.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to