Github user trkurc commented on a diff in the pull request:
https://github.com/apache/nifi/pull/132#discussion_r45944159
--- Diff:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
---
@@ -143,23 +316,223 @@ public void process(final InputStream rawIn) throws
IOException {
objectMetadata.setUserMetadata(userMetadata);
}
- final PutObjectRequest request = new
PutObjectRequest(bucket, key, in, objectMetadata);
-
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
- final AccessControlList acl = createACL(context,
ff);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
+ if (ff.getSize() <= multipartThreshold) {
+ //----------------------------------------
+ // single part upload
+ //----------------------------------------
+ final PutObjectRequest request = new
PutObjectRequest(bucket, key, in, objectMetadata);
+ request.setStorageClass(
+
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ final AccessControlList acl =
createACL(context, ff);
+ if (acl != null) {
+ request.setAccessControlList(acl);
+ }
- final PutObjectResult result =
s3.putObject(request);
- if (result.getVersionId() != null) {
- attributes.put("s3.version",
result.getVersionId());
- }
+ try {
+ final PutObjectResult result =
s3.putObject(request);
+ if (result.getVersionId() != null) {
+ attributes.put(S3_VERSION_ATTR_KEY,
result.getVersionId());
+ }
+ if (result.getETag() != null) {
+ attributes.put(S3_ETAG_ATTR_KEY,
result.getETag());
+ }
+ if (result.getExpirationTime() != null) {
+ attributes.put(S3_EXPIRATION_ATTR_KEY,
result.getExpirationTime().toString());
+ }
+ if
(result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY))
{
+
attributes.put(S3_STORAGECLASS_ATTR_KEY,
+
result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
+ }
+ if (userMetadata.size() > 0) {
+ StringBuilder userMetaBldr = new
StringBuilder();
+ for (String userKey :
userMetadata.keySet()) {
+
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+ }
+ attributes.put(S3_USERMETA_ATTR_KEY,
userMetaBldr.toString());
+ }
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure completing
upload flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename, bucket,
key, e.getMessage()});
+ throw (e);
+ }
+ } else {
+ //----------------------------------------
+ // multippart upload
+ //----------------------------------------
- attributes.put("s3.etag", result.getETag());
+ // load or create persistent state
+
//------------------------------------------------------------
+ MultipartState currentState;
+ try {
+ currentState = getState(cacheKey);
+ if (currentState != null) {
+ if (currentState.getPartETags().size()
> 0) {
+ final PartETag lastETag =
currentState.getPartETags().get(
+
currentState.getPartETags().size() - 1);
+ getLogger().info("Resuming upload
for flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}'
filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}'
partsLoaded={} lastPart={}/{}",
+ new Object[]{ffFilename,
bucket, key, currentState.getUploadId(),
+
currentState.getFilePosition(), currentState.getPartSize(),
+
currentState.getStorageClass().toString(),
+
currentState.getContentLength(),
+
currentState.getPartETags().size(),
+
Integer.toString(lastETag.getPartNumber()),
+
lastETag.getETag()});
+ } else {
+ getLogger().info("Resuming upload
for flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}'
filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}' no
partsLoaded",
+ new Object[]{ffFilename,
bucket, key, currentState.getUploadId(),
+
currentState.getFilePosition(), currentState.getPartSize(),
+
currentState.getStorageClass().toString(),
+
currentState.getContentLength()});
+ }
+ } else {
+ currentState = new MultipartState();
+
currentState.setPartSize(multipartPartSize);
+ currentState.setStorageClass(
+
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+
currentState.setContentLength(ff.getSize());
+ persistState(cacheKey, currentState);
+ getLogger().info("Starting new upload
for flowfile='{}' bucket='{}' key='{}'",
+ new Object[]{ffFilename,
bucket, key});
+ }
+ } catch (IOException e) {
+ getLogger().error("IOException initiating
cache state while processing flow files: " +
+ e.getMessage());
+ throw (e);
+ }
+
+ // initiate multipart upload or find position
in file
+
//------------------------------------------------------------
+ if (currentState.getUploadId().isEmpty()) {
+ final InitiateMultipartUploadRequest
initiateRequest =
+ new
InitiateMultipartUploadRequest(bucket, key, objectMetadata);
+
initiateRequest.setStorageClass(currentState.getStorageClass());
+ final AccessControlList acl =
createACL(context, ff);
+ if (acl != null) {
+
initiateRequest.setAccessControlList(acl);
+ }
+ try {
+ final InitiateMultipartUploadResult
initiateResult =
+
s3.initiateMultipartUpload(initiateRequest);
+
currentState.setUploadId(initiateResult.getUploadId());
+ currentState.getPartETags().clear();
+ try {
+ persistState(cacheKey,
currentState);
+ } catch (Exception e) {
+ getLogger().info("Exception saving
cache state while processing flow file: " +
+ e.getMessage());
+ throw(new
ProcessException("Exception saving cache state", e));
+ }
+ getLogger().info("Success initiating
upload flowfile={} available={} position={} " +
+ "length={} bucket={} key={}
uploadId={}",
+ new Object[]{ffFilename,
in.available(), currentState.getFilePosition(),
+
currentState.getContentLength(), bucket, key,
+
currentState.getUploadId()});
+ if (initiateResult.getUploadId() !=
null) {
+
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
+ }
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure initiating
upload flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename,
bucket, key, e.getMessage()});
+ throw(e);
+ }
+ } else {
+ if (currentState.getFilePosition() > 0) {
+ try {
+ final long skipped =
in.skip(currentState.getFilePosition());
+ if (skipped !=
currentState.getFilePosition()) {
+ getLogger().info("Failure
skipping to resume upload flowfile={} " +
+ "bucket={} key={}
position={} skipped={}",
+ new
Object[]{ffFilename, bucket, key,
+
currentState.getFilePosition(), skipped});
+ }
+ } catch (Exception e) {
+ getLogger().info("Failure skipping
to resume upload flowfile={} bucket={} " +
+ "key={} position={}
reason={}",
+ new Object[]{ffFilename,
bucket, key, currentState.getFilePosition(),
+ e.getMessage()});
+ throw(new ProcessException(e));
+ }
+ }
+ }
+
+ // upload parts
+
//------------------------------------------------------------
+ long thisPartSize;
+ for (int part =
currentState.getPartETags().size() + 1;
+ currentState.getFilePosition() <
currentState.getContentLength(); part++) {
+ if (!PutS3Object.this.isScheduled()) {
+ getLogger().info("Processor
unscheduled, stopping upload flowfile={} part={} " +
+ "uploadId={}", new
Object[]{ffFilename, part, currentState.getUploadId()});
+ session.rollback();
+ return;
+ }
+ thisPartSize =
Math.min(currentState.getPartSize(),
+ (currentState.getContentLength() -
currentState.getFilePosition()));
+ UploadPartRequest uploadRequest = new
UploadPartRequest()
+ .withBucketName(bucket)
+ .withKey(key)
+
.withUploadId(currentState.getUploadId())
+ .withInputStream(in)
+ .withPartNumber(part)
+ .withPartSize(thisPartSize);
+ try {
+ UploadPartResult uploadPartResult =
s3.uploadPart(uploadRequest);
+
currentState.addPartETag(uploadPartResult.getPartETag());
+
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
+ try {
+ persistState(cacheKey,
currentState);
+ } catch (Exception e) {
--- End diff --
I'd add a comment here on why swallowing exceptions here is okay, as it is
not immediately intuitive.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---