This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c1a21ad078 NIFI-12642 Added support for FileResourceService in
PutS3Object
c1a21ad078 is described below
commit c1a21ad078d7b9c3b0422c48b9bb1f9faa3c0b6f
Author: Balázs Gerner <[email protected]>
AuthorDate: Mon Jan 22 13:11:18 2024 +0100
NIFI-12642 Added support for FileResourceService in PutS3Object
This closes #8295.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi-aws-bundle/nifi-aws-processors/pom.xml | 15 +
.../apache/nifi/processors/aws/s3/PutS3Object.java | 575 +++++++++++----------
.../nifi/processors/aws/s3/AbstractS3IT.java | 83 +--
.../nifi/processors/aws/s3/ITPutS3Object.java | 65 +++
.../nifi/processors/aws/s3/TestPutS3Object.java | 51 +-
5 files changed, 462 insertions(+), 327 deletions(-)
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index c0748af598..44ac6da9f3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -38,6 +38,15 @@
<artifactId>nifi-listed-entity</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-resource-transfer</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-file-resource-service-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-abstract-processors</artifactId>
@@ -141,6 +150,12 @@
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-file-resource-service</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 3247f9a6a8..26c759417a 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -53,14 +53,15 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
import java.io.File;
import java.io.FileInputStream;
@@ -78,6 +79,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
@@ -87,6 +89,10 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
+
@SupportsBatching
@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -261,6 +267,8 @@ public class PutS3Object extends AbstractS3Processor {
KEY,
S3_REGION,
AWS_CREDENTIALS_PROVIDER_SERVICE,
+ RESOURCE_TRANSFER_SOURCE,
+ FILE_RESOURCE_SERVICE,
STORAGE_CLASS,
ENCRYPTION_SERVICE,
SERVER_SIDE_ENCRYPTION,
@@ -501,6 +509,8 @@ public class PutS3Object extends AbstractS3Processor {
final FlowFile ff = flowFile;
final Map<String, String> attributes = new HashMap<>();
final String ffFilename =
ff.getAttributes().get(CoreAttributes.FILENAME.key());
+ final ResourceTransferSource resourceTransferSource =
context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
+
attributes.put(S3_BUCKET_KEY, bucket);
attributes.put(S3_OBJECT_KEY, key);
@@ -519,329 +529,332 @@ public class PutS3Object extends AbstractS3Processor {
*/
try {
final FlowFile flowFileCopy = flowFile;
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- final ObjectMetadata objectMetadata = new ObjectMetadata();
- objectMetadata.setContentLength(ff.getSize());
-
- final String contentType =
context.getProperty(CONTENT_TYPE)
- .evaluateAttributeExpressions(ff).getValue();
- if (contentType != null) {
- objectMetadata.setContentType(contentType);
- attributes.put(S3_CONTENT_TYPE, contentType);
- }
+ Optional<FileResource> optFileResource =
getFileResource(resourceTransferSource, context, flowFile.getAttributes());
+ try (InputStream in = optFileResource
+ .map(FileResource::getInputStream)
+ .orElseGet(() -> session.read(flowFileCopy))) {
+ final ObjectMetadata objectMetadata = new ObjectMetadata();
+
objectMetadata.setContentLength(optFileResource.map(FileResource::getSize).orElseGet(ff::getSize));
+
+ final String contentType = context.getProperty(CONTENT_TYPE)
+ .evaluateAttributeExpressions(ff).getValue();
+ if (contentType != null) {
+ objectMetadata.setContentType(contentType);
+ attributes.put(S3_CONTENT_TYPE, contentType);
+ }
+
+ final String cacheControl = context.getProperty(CACHE_CONTROL)
+ .evaluateAttributeExpressions(ff).getValue();
+ if (cacheControl != null) {
+ objectMetadata.setCacheControl(cacheControl);
+ attributes.put(S3_CACHE_CONTROL, cacheControl);
+ }
- final String cacheControl =
context.getProperty(CACHE_CONTROL)
- .evaluateAttributeExpressions(ff).getValue();
- if (cacheControl != null) {
- objectMetadata.setCacheControl(cacheControl);
- attributes.put(S3_CACHE_CONTROL, cacheControl);
+ final String contentDisposition =
context.getProperty(CONTENT_DISPOSITION).getValue();
+ String fileName =
URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()),
StandardCharsets.UTF_8);
+ if (contentDisposition != null &&
contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
+
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
+ attributes.put(S3_CONTENT_DISPOSITION,
CONTENT_DISPOSITION_INLINE);
+ } else if (contentDisposition != null &&
contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
+ String contentDispositionValue =
CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
+
objectMetadata.setContentDisposition(contentDispositionValue);
+ attributes.put(S3_CONTENT_DISPOSITION,
contentDispositionValue);
+ } else {
+ objectMetadata.setContentDisposition(fileName);
+ }
+
+ final String expirationRule =
context.getProperty(EXPIRATION_RULE_ID)
+ .evaluateAttributeExpressions(ff).getValue();
+ if (expirationRule != null) {
+ objectMetadata.setExpirationTimeRuleId(expirationRule);
+ }
+
+ final Map<String, String> userMetadata = new HashMap<>();
+ for (final Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ final String value = context.getProperty(
+
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+ userMetadata.put(entry.getKey().getName(), value);
}
+ }
- final String contentDisposition =
context.getProperty(CONTENT_DISPOSITION).getValue();
- String fileName =
URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()),
StandardCharsets.UTF_8);
- if (contentDisposition != null &&
contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
-
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
- attributes.put(S3_CONTENT_DISPOSITION,
CONTENT_DISPOSITION_INLINE);
- } else if (contentDisposition != null &&
contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
- String contentDispositionValue =
CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
-
objectMetadata.setContentDisposition(contentDispositionValue);
- attributes.put(S3_CONTENT_DISPOSITION,
contentDispositionValue);
- } else {
- objectMetadata.setContentDisposition(fileName);
+ final String serverSideEncryption =
context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
+ AmazonS3EncryptionService encryptionService = null;
+
+ if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
+ objectMetadata.setSSEAlgorithm(serverSideEncryption);
+ attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
+ } else {
+ encryptionService =
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+ }
+
+ if (!userMetadata.isEmpty()) {
+ objectMetadata.setUserMetadata(userMetadata);
+ }
+
+ if (ff.getSize() <= multipartThreshold) {
+ //----------------------------------------
+ // single part upload
+ //----------------------------------------
+ final PutObjectRequest request = new
PutObjectRequest(bucket, key, in, objectMetadata);
+ if (encryptionService != null) {
+ encryptionService.configurePutObjectRequest(request,
objectMetadata);
+ attributes.put(S3_ENCRYPTION_STRATEGY,
encryptionService.getStrategyName());
}
- final String expirationRule =
context.getProperty(EXPIRATION_RULE_ID)
- .evaluateAttributeExpressions(ff).getValue();
- if (expirationRule != null) {
- objectMetadata.setExpirationTimeRuleId(expirationRule);
+
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ final AccessControlList acl = createACL(context, ff);
+ if (acl != null) {
+ request.setAccessControlList(acl);
}
- final Map<String, String> userMetadata = new HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
- if (entry.getKey().isDynamic()) {
- final String value = context.getProperty(
-
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
- userMetadata.put(entry.getKey().getName(), value);
- }
+ final CannedAccessControlList cannedAcl =
createCannedACL(context, ff);
+ if (cannedAcl != null) {
+ request.withCannedAcl(cannedAcl);
}
- final String serverSideEncryption =
context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
- AmazonS3EncryptionService encryptionService = null;
+ if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
+ request.setTagging(new
ObjectTagging(getObjectTags(context, flowFileCopy)));
+ }
- if
(!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
- objectMetadata.setSSEAlgorithm(serverSideEncryption);
- attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
- } else {
- encryptionService =
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+ try {
+ final PutObjectResult result = s3.putObject(request);
+ if (result.getVersionId() != null) {
+ attributes.put(S3_VERSION_ATTR_KEY,
result.getVersionId());
+ }
+ if (result.getETag() != null) {
+ attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
+ }
+ if (result.getExpirationTime() != null) {
+ attributes.put(S3_EXPIRATION_ATTR_KEY,
result.getExpirationTime().toString());
+ }
+ if (result.getMetadata().getStorageClass() != null) {
+ attributes.put(S3_STORAGECLASS_ATTR_KEY,
result.getMetadata().getStorageClass());
+ } else {
+ attributes.put(S3_STORAGECLASS_ATTR_KEY,
StorageClass.Standard.toString());
+ }
+ if (userMetadata.size() > 0) {
+ StringBuilder userMetaBldr = new StringBuilder();
+ for (String userKey : userMetadata.keySet()) {
+
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+ }
+ attributes.put(S3_USERMETA_ATTR_KEY,
userMetaBldr.toString());
+ }
+ attributes.put(S3_API_METHOD_ATTR_KEY,
S3_API_METHOD_PUTOBJECT);
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure completing upload
flowfile={} bucket={} key={} reason={}",
+ ffFilename, bucket, key, e.getMessage());
+ throw (e);
}
+ } else {
+ //----------------------------------------
+ // multipart upload
+ //----------------------------------------
- if (!userMetadata.isEmpty()) {
- objectMetadata.setUserMetadata(userMetadata);
+ // load or create persistent state
+
//------------------------------------------------------------
+ MultipartState currentState;
+ try {
+ currentState = getLocalStateIfInS3(s3, bucket,
cacheKey);
+ if (currentState != null) {
+ if (currentState.getPartETags().size() > 0) {
+ final PartETag lastETag =
currentState.getPartETags().get(
+ currentState.getPartETags().size() -
1);
+ getLogger().info("Resuming upload for
flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}'
filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}'
partsLoaded={} lastPart={}/{}",
+ ffFilename, bucket, key,
currentState.getUploadId(),
+ currentState.getFilePosition(),
currentState.getPartSize(),
+
currentState.getStorageClass().toString(),
+ currentState.getContentLength(),
+ currentState.getPartETags().size(),
+
Integer.toString(lastETag.getPartNumber()),
+ lastETag.getETag());
+ } else {
+ getLogger().info("Resuming upload for
flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}'
filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}' no
partsLoaded",
+ ffFilename, bucket, key,
currentState.getUploadId(),
+ currentState.getFilePosition(),
currentState.getPartSize(),
+
currentState.getStorageClass().toString(),
+ currentState.getContentLength());
+ }
+ } else {
+ currentState = new MultipartState();
+ currentState.setPartSize(multipartPartSize);
+ currentState.setStorageClass(
+
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ currentState.setContentLength(ff.getSize());
+ persistLocalState(cacheKey, currentState);
+ getLogger().info("Starting new upload for
flowfile='{}' bucket='{}' key='{}'",
+ ffFilename, bucket, key);
+ }
+ } catch (IOException e) {
+ getLogger().error("IOException initiating cache state
while processing flow files: " +
+ e.getMessage());
+ throw (e);
}
- if (ff.getSize() <= multipartThreshold) {
- //----------------------------------------
- // single part upload
- //----------------------------------------
- final PutObjectRequest request = new
PutObjectRequest(bucket, key, in, objectMetadata);
+ // initiate multipart upload or find position in file
+
//------------------------------------------------------------
+ if (currentState.getUploadId().isEmpty()) {
+ final InitiateMultipartUploadRequest initiateRequest =
new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
if (encryptionService != null) {
-
encryptionService.configurePutObjectRequest(request, objectMetadata);
+
encryptionService.configureInitiateMultipartUploadRequest(initiateRequest,
objectMetadata);
attributes.put(S3_ENCRYPTION_STRATEGY,
encryptionService.getStrategyName());
}
+
initiateRequest.setStorageClass(currentState.getStorageClass());
-
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
final AccessControlList acl = createACL(context, ff);
if (acl != null) {
- request.setAccessControlList(acl);
+ initiateRequest.setAccessControlList(acl);
}
-
final CannedAccessControlList cannedAcl =
createCannedACL(context, ff);
if (cannedAcl != null) {
- request.withCannedAcl(cannedAcl);
+ initiateRequest.withCannedACL(cannedAcl);
}
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
- request.setTagging(new
ObjectTagging(getObjectTags(context, flowFileCopy)));
+ initiateRequest.setTagging(new
ObjectTagging(getObjectTags(context, flowFileCopy)));
}
try {
- final PutObjectResult result =
s3.putObject(request);
- if (result.getVersionId() != null) {
- attributes.put(S3_VERSION_ATTR_KEY,
result.getVersionId());
- }
- if (result.getETag() != null) {
- attributes.put(S3_ETAG_ATTR_KEY,
result.getETag());
- }
- if (result.getExpirationTime() != null) {
- attributes.put(S3_EXPIRATION_ATTR_KEY,
result.getExpirationTime().toString());
- }
- if (result.getMetadata().getStorageClass() !=
null) {
- attributes.put(S3_STORAGECLASS_ATTR_KEY,
result.getMetadata().getStorageClass());
- } else {
- attributes.put(S3_STORAGECLASS_ATTR_KEY,
StorageClass.Standard.toString());
+ final InitiateMultipartUploadResult initiateResult
=
+
s3.initiateMultipartUpload(initiateRequest);
+
currentState.setUploadId(initiateResult.getUploadId());
+ currentState.getPartETags().clear();
+ try {
+ persistLocalState(cacheKey, currentState);
+ } catch (Exception e) {
+ getLogger().info("Exception saving cache state
while processing flow file: " +
+ e.getMessage());
+ throw (new ProcessException("Exception saving
cache state", e));
}
- if (userMetadata.size() > 0) {
- StringBuilder userMetaBldr = new
StringBuilder();
- for (String userKey : userMetadata.keySet()) {
-
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
- }
- attributes.put(S3_USERMETA_ATTR_KEY,
userMetaBldr.toString());
+ getLogger().info("Success initiating upload
flowfile={} available={} position={} " +
+ "length={} bucket={} key={}
uploadId={}",
+ new Object[]{ffFilename, in.available(),
currentState.getFilePosition(),
+ currentState.getContentLength(),
bucket, key,
+ currentState.getUploadId()});
+ if (initiateResult.getUploadId() != null) {
+ attributes.put(S3_UPLOAD_ID_ATTR_KEY,
initiateResult.getUploadId());
}
- attributes.put(S3_API_METHOD_ATTR_KEY,
S3_API_METHOD_PUTOBJECT);
} catch (AmazonClientException e) {
- getLogger().info("Failure completing upload
flowfile={} bucket={} key={} reason={}",
- ffFilename, bucket, key, e.getMessage());
+ getLogger().info("Failure initiating upload
flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename, bucket, key,
e.getMessage()});
throw (e);
}
} else {
- //----------------------------------------
- // multipart upload
- //----------------------------------------
-
- // load or create persistent state
-
//------------------------------------------------------------
- MultipartState currentState;
- try {
- currentState = getLocalStateIfInS3(s3, bucket,
cacheKey);
- if (currentState != null) {
- if (currentState.getPartETags().size() > 0) {
- final PartETag lastETag =
currentState.getPartETags().get(
- currentState.getPartETags().size()
- 1);
- getLogger().info("Resuming upload for
flowfile='{}' bucket='{}' key='{}' " +
- "uploadID='{}'
filePosition='{}' partSize='{}' storageClass='{}' " +
- "contentLength='{}'
partsLoaded={} lastPart={}/{}",
- ffFilename, bucket, key,
currentState.getUploadId(),
- currentState.getFilePosition(),
currentState.getPartSize(),
-
currentState.getStorageClass().toString(),
- currentState.getContentLength(),
- currentState.getPartETags().size(),
-
Integer.toString(lastETag.getPartNumber()),
- lastETag.getETag());
- } else {
- getLogger().info("Resuming upload for
flowfile='{}' bucket='{}' key='{}' " +
- "uploadID='{}'
filePosition='{}' partSize='{}' storageClass='{}' " +
- "contentLength='{}' no
partsLoaded",
- ffFilename, bucket, key,
currentState.getUploadId(),
- currentState.getFilePosition(),
currentState.getPartSize(),
-
currentState.getStorageClass().toString(),
- currentState.getContentLength());
- }
- } else {
- currentState = new MultipartState();
- currentState.setPartSize(multipartPartSize);
- currentState.setStorageClass(
-
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
- currentState.setContentLength(ff.getSize());
- persistLocalState(cacheKey, currentState);
- getLogger().info("Starting new upload for
flowfile='{}' bucket='{}' key='{}'",
- ffFilename, bucket, key);
- }
- } catch (IOException e) {
- getLogger().error("IOException initiating cache
state while processing flow files: " +
- e.getMessage());
- throw (e);
- }
-
- // initiate multipart upload or find position in file
-
//------------------------------------------------------------
- if (currentState.getUploadId().isEmpty()) {
- final InitiateMultipartUploadRequest
initiateRequest = new InitiateMultipartUploadRequest(bucket, key,
objectMetadata);
- if (encryptionService != null) {
-
encryptionService.configureInitiateMultipartUploadRequest(initiateRequest,
objectMetadata);
- attributes.put(S3_ENCRYPTION_STRATEGY,
encryptionService.getStrategyName());
- }
-
initiateRequest.setStorageClass(currentState.getStorageClass());
-
- final AccessControlList acl = createACL(context,
ff);
- if (acl != null) {
- initiateRequest.setAccessControlList(acl);
- }
- final CannedAccessControlList cannedAcl =
createCannedACL(context, ff);
- if (cannedAcl != null) {
- initiateRequest.withCannedACL(cannedAcl);
- }
-
- if
(context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
- initiateRequest.setTagging(new
ObjectTagging(getObjectTags(context, flowFileCopy)));
- }
-
+ if (currentState.getFilePosition() > 0) {
try {
- final InitiateMultipartUploadResult
initiateResult =
-
s3.initiateMultipartUpload(initiateRequest);
-
currentState.setUploadId(initiateResult.getUploadId());
- currentState.getPartETags().clear();
- try {
- persistLocalState(cacheKey, currentState);
- } catch (Exception e) {
- getLogger().info("Exception saving cache
state while processing flow file: " +
- e.getMessage());
- throw (new ProcessException("Exception
saving cache state", e));
- }
- getLogger().info("Success initiating upload
flowfile={} available={} position={} " +
- "length={} bucket={} key={}
uploadId={}",
- new Object[]{ffFilename,
in.available(), currentState.getFilePosition(),
-
currentState.getContentLength(), bucket, key,
- currentState.getUploadId()});
- if (initiateResult.getUploadId() != null) {
- attributes.put(S3_UPLOAD_ID_ATTR_KEY,
initiateResult.getUploadId());
- }
- } catch (AmazonClientException e) {
- getLogger().info("Failure initiating upload
flowfile={} bucket={} key={} reason={}",
- new Object[]{ffFilename, bucket, key,
e.getMessage()});
- throw (e);
- }
- } else {
- if (currentState.getFilePosition() > 0) {
- try {
- final long skipped =
in.skip(currentState.getFilePosition());
- if (skipped !=
currentState.getFilePosition()) {
- getLogger().info("Failure skipping to
resume upload flowfile={} " +
- "bucket={} key={}
position={} skipped={}",
- new Object[]{ffFilename,
bucket, key,
-
currentState.getFilePosition(), skipped});
- }
- } catch (Exception e) {
- getLogger().info("Failure skipping to
resume upload flowfile={} bucket={} " +
- "key={} position={}
reason={}",
- new Object[]{ffFilename, bucket,
key, currentState.getFilePosition(),
- e.getMessage()});
- throw (new ProcessException(e));
+ final long skipped =
in.skip(currentState.getFilePosition());
+ if (skipped != currentState.getFilePosition())
{
+ getLogger().info("Failure skipping to
resume upload flowfile={} " +
+ "bucket={} key={}
position={} skipped={}",
+ new Object[]{ffFilename, bucket,
key,
+
currentState.getFilePosition(), skipped});
}
+ } catch (Exception e) {
+ getLogger().info("Failure skipping to resume
upload flowfile={} bucket={} " +
+ "key={} position={} reason={}",
+ new Object[]{ffFilename, bucket, key,
currentState.getFilePosition(),
+ e.getMessage()});
+ throw (new ProcessException(e));
}
}
+ }
- // upload parts
-
//------------------------------------------------------------
- long thisPartSize;
- boolean isLastPart;
- for (int part = currentState.getPartETags().size() + 1;
- currentState.getFilePosition() <
currentState.getContentLength(); part++) {
- if (!PutS3Object.this.isScheduled()) {
- throw new
IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
- " part=" + part + " uploadId=" +
currentState.getUploadId());
- }
- thisPartSize = Math.min(currentState.getPartSize(),
- (currentState.getContentLength() -
currentState.getFilePosition()));
- isLastPart = currentState.getContentLength() ==
currentState.getFilePosition() + thisPartSize;
- UploadPartRequest uploadRequest = new
UploadPartRequest()
- .withBucketName(bucket)
- .withKey(key)
- .withUploadId(currentState.getUploadId())
- .withInputStream(in)
- .withPartNumber(part)
- .withPartSize(thisPartSize)
- .withLastPart(isLastPart);
- if (encryptionService != null) {
-
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
+ // upload parts
+
//------------------------------------------------------------
+ long thisPartSize;
+ boolean isLastPart;
+ for (int part = currentState.getPartETags().size() + 1;
+ currentState.getFilePosition() <
currentState.getContentLength(); part++) {
+ if (!PutS3Object.this.isScheduled()) {
+ throw new
IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
+ " part=" + part + " uploadId=" +
currentState.getUploadId());
+ }
+ thisPartSize = Math.min(currentState.getPartSize(),
+ (currentState.getContentLength() -
currentState.getFilePosition()));
+ isLastPart = currentState.getContentLength() ==
currentState.getFilePosition() + thisPartSize;
+ UploadPartRequest uploadRequest = new
UploadPartRequest()
+ .withBucketName(bucket)
+ .withKey(key)
+ .withUploadId(currentState.getUploadId())
+ .withInputStream(in)
+ .withPartNumber(part)
+ .withPartSize(thisPartSize)
+ .withLastPart(isLastPart);
+ if (encryptionService != null) {
+
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
+ }
+ try {
+ UploadPartResult uploadPartResult =
s3.uploadPart(uploadRequest);
+
currentState.addPartETag(uploadPartResult.getPartETag());
+
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
+ try {
+ persistLocalState(cacheKey, currentState);
+ } catch (Exception e) {
+ getLogger().info("Exception saving cache state
processing flow file: " +
+ e.getMessage());
}
+ int available = 0;
try {
- UploadPartResult uploadPartResult =
s3.uploadPart(uploadRequest);
-
currentState.addPartETag(uploadPartResult.getPartETag());
-
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
- try {
- persistLocalState(cacheKey, currentState);
- } catch (Exception e) {
- getLogger().info("Exception saving cache
state processing flow file: " +
- e.getMessage());
- }
- int available = 0;
- try {
- available = in.available();
- } catch (IOException e) {
- // in case of the last part, the stream is
already closed
- }
- getLogger().info("Success uploading part
flowfile={} part={} available={} " +
- "etag={} uploadId={}", new
Object[]{ffFilename, part, available,
- uploadPartResult.getETag(),
currentState.getUploadId()});
- } catch (AmazonClientException e) {
- getLogger().info("Failure uploading part
flowfile={} part={} bucket={} key={} " +
- "reason={}", new Object[]{ffFilename,
part, bucket, key, e.getMessage()});
- throw (e);
+ available = in.available();
+ } catch (IOException e) {
+ // in case of the last part, the stream is
already closed
}
+ getLogger().info("Success uploading part
flowfile={} part={} available={} " +
+ "etag={} uploadId={}", new
Object[]{ffFilename, part, available,
+ uploadPartResult.getETag(),
currentState.getUploadId()});
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure uploading part
flowfile={} part={} bucket={} key={} " +
+ "reason={}", new Object[]{ffFilename,
part, bucket, key, e.getMessage()});
+ throw (e);
}
+ }
- // complete multipart upload
-
//------------------------------------------------------------
- CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
- bucket, key, currentState.getUploadId(),
currentState.getPartETags());
+ // complete multipart upload
+
//------------------------------------------------------------
+ CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
+ bucket, key, currentState.getUploadId(),
currentState.getPartETags());
- // No call to an encryption service is needed for a
CompleteMultipartUploadRequest.
- try {
- CompleteMultipartUploadResult completeResult =
-
s3.completeMultipartUpload(completeRequest);
- getLogger().info("Success completing upload
flowfile={} etag={} uploadId={}",
- new Object[]{ffFilename,
completeResult.getETag(), currentState.getUploadId()});
- if (completeResult.getVersionId() != null) {
- attributes.put(S3_VERSION_ATTR_KEY,
completeResult.getVersionId());
- }
- if (completeResult.getETag() != null) {
- attributes.put(S3_ETAG_ATTR_KEY,
completeResult.getETag());
- }
- if (completeResult.getExpirationTime() != null) {
- attributes.put(S3_EXPIRATION_ATTR_KEY,
-
completeResult.getExpirationTime().toString());
- }
- if (currentState.getStorageClass() != null) {
- attributes.put(S3_STORAGECLASS_ATTR_KEY,
currentState.getStorageClass().toString());
- }
- if (userMetadata.size() > 0) {
- StringBuilder userMetaBldr = new
StringBuilder();
- for (String userKey : userMetadata.keySet()) {
-
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
- }
- attributes.put(S3_USERMETA_ATTR_KEY,
userMetaBldr.toString());
+ // No call to an encryption service is needed for a
CompleteMultipartUploadRequest.
+ try {
+ CompleteMultipartUploadResult completeResult =
+ s3.completeMultipartUpload(completeRequest);
+ getLogger().info("Success completing upload
flowfile={} etag={} uploadId={}",
+ new Object[]{ffFilename,
completeResult.getETag(), currentState.getUploadId()});
+ if (completeResult.getVersionId() != null) {
+ attributes.put(S3_VERSION_ATTR_KEY,
completeResult.getVersionId());
+ }
+ if (completeResult.getETag() != null) {
+ attributes.put(S3_ETAG_ATTR_KEY,
completeResult.getETag());
+ }
+ if (completeResult.getExpirationTime() != null) {
+ attributes.put(S3_EXPIRATION_ATTR_KEY,
+
completeResult.getExpirationTime().toString());
+ }
+ if (currentState.getStorageClass() != null) {
+ attributes.put(S3_STORAGECLASS_ATTR_KEY,
currentState.getStorageClass().toString());
+ }
+ if (userMetadata.size() > 0) {
+ StringBuilder userMetaBldr = new StringBuilder();
+ for (String userKey : userMetadata.keySet()) {
+
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
}
- attributes.put(S3_API_METHOD_ATTR_KEY,
S3_API_METHOD_MULTIPARTUPLOAD);
- } catch (AmazonClientException e) {
- getLogger().info("Failure completing upload
flowfile={} bucket={} key={} reason={}",
- new Object[]{ffFilename, bucket, key,
e.getMessage()});
- throw (e);
+ attributes.put(S3_USERMETA_ATTR_KEY,
userMetaBldr.toString());
}
+ attributes.put(S3_API_METHOD_ATTR_KEY,
S3_API_METHOD_MULTIPARTUPLOAD);
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure completing upload
flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename, bucket, key,
e.getMessage()});
+ throw (e);
}
}
- });
+ } catch (IOException e) {
+ getLogger().error("Error during upload of flow files: " +
e.getMessage());
+ throw e;
+ }
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
@@ -852,25 +865,25 @@ public class PutS3Object extends AbstractS3Processor {
final long millis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
- getLogger().info("Successfully put {} to Amazon S3 in {}
milliseconds", new Object[] {ff, millis});
+ getLogger().info("Successfully put {} to Amazon S3 in {}
milliseconds", new Object[]{ff, millis});
try {
removeLocalState(cacheKey);
} catch (IOException e) {
getLogger().info("Error trying to delete key {} from cache:
{}",
new Object[]{cacheKey, e.getMessage()});
}
- } catch (final ProcessException | AmazonClientException pe) {
- extractExceptionDetails(pe, session, flowFile);
- if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
- getLogger().info(pe.getMessage());
+
+ } catch (final ProcessException | AmazonClientException | IOException
e) {
+ extractExceptionDetails(e, session, flowFile);
+ if (e.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
+ getLogger().info(e.getMessage());
session.rollback();
} else {
- getLogger().error("Failed to put {} to Amazon S3 due to {}",
new Object[]{flowFile, pe});
+ getLogger().error("Failed to put {} to Amazon S3 due to {}",
flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
-
}
private final Lock s3BucketLock = new ReentrantLock();
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 80b3950716..f8eb6dd5b2 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -41,9 +41,12 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
@@ -70,6 +73,8 @@ import static org.junit.jupiter.api.Assertions.fail;
* @see ITListS3
*/
public abstract class AbstractS3IT {
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractS3IT.class);
+
protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
protected final static String BUCKET_NAME = "test-bucket-" +
System.currentTimeMillis();
@@ -82,7 +87,6 @@ public abstract class AbstractS3IT {
private static final LocalStackContainer localstack = new
LocalStackContainer(localstackImage)
.withServices(LocalStackContainer.Service.S3,
LocalStackContainer.Service.KMS);
-
@BeforeAll
public static void oneTimeSetup() {
localstack.start();
@@ -101,6 +105,45 @@ public abstract class AbstractS3IT {
client.createBucket(request);
}
+ @BeforeEach
+ public void clearKeys() {
+ addedKeys.clear();
+ }
+
+ @AfterEach
+ public void emptyBucket() {
+ if (!client.doesBucketExistV2(BUCKET_NAME)) {
+ return;
+ }
+
+ ObjectListing objectListing = client.listObjects(BUCKET_NAME);
+ while (true) {
+ for (S3ObjectSummary objectSummary :
objectListing.getObjectSummaries()) {
+ client.deleteObject(BUCKET_NAME, objectSummary.getKey());
+ }
+
+ if (objectListing.isTruncated()) {
+ objectListing = client.listNextBatchOfObjects(objectListing);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @AfterAll
+ public static void oneTimeTearDown() {
+ try {
+ if (client == null || !client.doesBucketExistV2(BUCKET_NAME)) {
+ return;
+ }
+
+ DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME);
+ client.deleteBucket(dbr);
+ } catch (final AmazonS3Exception e) {
+ logger.error("Unable to delete bucket {}", BUCKET_NAME, e);
+ }
+ }
+
protected AmazonS3 getClient() {
return client;
}
@@ -121,44 +164,6 @@ public abstract class AbstractS3IT {
AuthUtils.enableAccessKey(runner, localstack.getAccessKey(),
localstack.getSecretKey());
}
- @BeforeEach
- public void clearKeys() {
- addedKeys.clear();
- }
-
- @AfterAll
- public static void oneTimeTearDown() {
- // Empty the bucket before deleting it.
- try {
- if (client == null) {
- return;
- }
-
- ObjectListing objectListing = client.listObjects(BUCKET_NAME);
-
- while (true) {
- for (S3ObjectSummary objectSummary :
objectListing.getObjectSummaries()) {
- client.deleteObject(BUCKET_NAME, objectSummary.getKey());
- }
-
- if (objectListing.isTruncated()) {
- objectListing =
client.listNextBatchOfObjects(objectListing);
- } else {
- break;
- }
- }
-
- DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME);
- client.deleteBucket(dbr);
- } catch (final AmazonS3Exception e) {
- System.err.println("Unable to delete bucket " + BUCKET_NAME +
e.toString());
- }
-
- if (client.doesBucketExistV2(BUCKET_NAME)) {
- fail("Incomplete teardown, subsequent tests might fail");
- }
- }
-
protected void putTestFile(String key, File file) throws AmazonS3Exception
{
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key,
file);
client.putObject(putRequest);
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index cda5e528db..43f936c9f0 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -24,13 +24,17 @@ import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import org.apache.commons.codec.binary.Base64;
+import org.apache.nifi.fileresource.service.StandardFileResourceService;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import
org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@@ -53,6 +57,12 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -88,6 +98,61 @@ public class ITPutS3Object extends AbstractS3IT {
kmsKeyId = getKMSKey();
}
+ @Test
+ public void testPutFromLocalFile() throws Exception {
+ TestRunner runner = initTestRunner();
+ String attributeName = "file.path";
+ Path resourcePath = getResourcePath(SAMPLE_FILE_RESOURCE_NAME);
+
+ String serviceId = FileResourceService.class.getSimpleName();
+ FileResourceService service = new StandardFileResourceService();
+ runner.addControllerService(serviceId, service);
+ runner.setProperty(service, StandardFileResourceService.FILE_PATH,
String.format("${%s}", attributeName));
+ runner.enableControllerService(service);
+
+ runner.setProperty(RESOURCE_TRANSFER_SOURCE,
ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+ runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(attributeName, resourcePath.toString());
+ runner.enqueue(resourcePath, attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).getFirst();
+
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+ List<S3ObjectSummary> objectSummaries =
getClient().listObjects(BUCKET_NAME).getObjectSummaries();
+ assertThat(objectSummaries, hasSize(1));
+ assertEquals(objectSummaries.getFirst().getKey(),
resourcePath.getFileName().toString());
+ assertThat(objectSummaries.getFirst().getSize(), greaterThan(0L));
+ }
+
+ @Test
+ public void testPutFromNonExistentLocalFile() throws Exception {
+ TestRunner runner = initTestRunner();
+ String attributeName = "file.path";
+
+ String serviceId = FileResourceService.class.getSimpleName();
+ FileResourceService service = new StandardFileResourceService();
+ runner.addControllerService(serviceId, service);
+ runner.setProperty(service, StandardFileResourceService.FILE_PATH,
String.format("${%s}", attributeName));
+ runner.enableControllerService(service);
+
+ runner.setProperty(RESOURCE_TRANSFER_SOURCE,
ResourceTransferSource.FILE_RESOURCE_SERVICE);
+ runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+ String filePath = "nonexistent.txt";
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(attributeName, filePath);
+
+ runner.enqueue("", attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_FAILURE, 1);
+ assertThat(getClient().listObjects(BUCKET_NAME).getObjectSummaries(),
empty());
+ }
@Test
public void testSimplePut() throws IOException {
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index fff03d7510..ab7bf59fb4 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -37,10 +37,13 @@ import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.signer.AwsSignerType;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -50,6 +53,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.File;
+import java.io.InputStream;
import java.net.URLEncoder;
import java.util.Date;
import java.util.HashMap;
@@ -57,11 +61,16 @@ import java.util.List;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestPutS3Object {
@@ -87,6 +96,33 @@ public class TestPutS3Object {
runner.setEnvironmentVariableValue("java.io.tmpdir",
System.getProperty("java.io.tmpdir"));
}
+ @Test
+ public void testPutSinglePartFromLocalFileSource() throws Exception {
+ prepareTest();
+
+ String serviceId = "fileresource";
+ FileResourceService service = mock(FileResourceService.class);
+ InputStream localFileInputStream = mock(InputStream.class);
+ when(service.getIdentifier()).thenReturn(serviceId);
+ long contentLength = 10L;
+ when(service.getFileResource(anyMap())).thenReturn(new
FileResource(localFileInputStream, contentLength));
+
+ runner.addControllerService(serviceId, service);
+ runner.enableControllerService(service);
+ runner.setProperty(RESOURCE_TRANSFER_SOURCE,
ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+ runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+ runner.run();
+
+ ArgumentCaptor<PutObjectRequest> captureRequest =
ArgumentCaptor.forClass(PutObjectRequest.class);
+ verify(mockS3Client).putObject(captureRequest.capture());
+ PutObjectRequest putObjectRequest = captureRequest.getValue();
+ assertEquals(localFileInputStream, putObjectRequest.getInputStream());
+ assertEquals(putObjectRequest.getMetadata().getContentLength(),
contentLength);
+
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+ }
+
@Test
public void testPutSinglePart() {
runner.setProperty("x-custom-prop", "hello");
@@ -95,7 +131,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest =
ArgumentCaptor.forClass(PutObjectRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).putObject(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
@@ -105,6 +141,7 @@ public class TestPutS3Object {
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(),
"testfile.txt");
+ ff0.assertContentEquals("Test Content");
ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag");
ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY,
"test-version");
}
@@ -113,7 +150,7 @@ public class TestPutS3Object {
public void testPutSinglePartException() {
prepareTest();
-
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new
AmazonS3Exception("TestFail"));
+
when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new
AmazonS3Exception("TestFail"));
runner.run(1);
@@ -150,7 +187,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest =
ArgumentCaptor.forClass(PutObjectRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).putObject(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
List<Tag> tagSet = request.getTagging().getTagSet();
@@ -169,7 +206,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest =
ArgumentCaptor.forClass(PutObjectRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).putObject(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals(storageClass.toString(), request.getStorageClass());
@@ -185,7 +222,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest =
ArgumentCaptor.forClass(PutObjectRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).putObject(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
ObjectMetadata objectMetadata = request.getMetadata();
@@ -241,10 +278,10 @@ public class TestPutS3Object {
putObjectResult.setVersionId("test-version");
putObjectResult.setETag("test-etag");
-
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
+
when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
MultipartUploadListing uploadListing = new MultipartUploadListing();
-
Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
+
when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
}
@Test