This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 40d9750bb3 NIFI-12671 Added S3FileResourceService
40d9750bb3 is described below
commit 40d9750bb3dab405bd0ba7df14737837d9c3021c
Author: Balázs Gerner <[email protected]>
AuthorDate: Tue Feb 6 14:56:23 2024 +0100
NIFI-12671 Added S3FileResourceService
This closes #8368.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../AbstractAWSCredentialsProviderProcessor.java | 24 +--
.../processors/aws/s3/AbstractS3Processor.java | 50 +-----
.../nifi/processors/aws/util/RegionUtilV1.java | 94 ++++++++++++
.../processors/aws/v2/AbstractAwsProcessor.java | 4 +-
.../aws/v2/{RegionUtil.java => RegionUtilV2.java} | 7 +-
.../nifi/processors/aws/s3/DeleteS3Object.java | 2 +
.../nifi/processors/aws/s3/FetchS3Object.java | 2 +
.../org/apache/nifi/processors/aws/s3/ListS3.java | 14 +-
.../apache/nifi/processors/aws/s3/PutS3Object.java | 1 +
.../apache/nifi/processors/aws/s3/TagS3Object.java | 2 +
.../s3/encryption/StandardS3EncryptionService.java | 6 +-
.../aws/s3/service/S3FileResourceService.java | 154 +++++++++++++++++++
.../processors/aws/wag/InvokeAWSGatewayApi.java | 2 +
.../org.apache.nifi.controller.ControllerService | 1 +
.../nifi/processors/aws/s3/AbstractS3IT.java | 3 +-
.../nifi/processors/aws/s3/ITFetchS3Object.java | 3 +-
.../nifi/processors/aws/s3/ITPutS3Object.java | 5 +-
.../nifi/processors/aws/s3/TestDeleteS3Object.java | 11 +-
.../nifi/processors/aws/s3/TestFetchS3Object.java | 19 +--
.../apache/nifi/processors/aws/s3/TestListS3.java | 31 ++--
.../nifi/processors/aws/s3/TestPutS3Object.java | 5 +-
.../nifi/processors/aws/s3/TestTagS3Object.java | 21 +--
.../aws/s3/service/S3FileResourceServiceTest.java | 170 +++++++++++++++++++++
.../nifi/processors/aws/sqs/AbstractSQSIT.java | 6 +-
.../aws/wag/TestInvokeAWSGatewayApiCommon.java | 3 +-
.../aws/wag/TestInvokeAmazonGatewayApiMock.java | 3 +-
26 files changed, 513 insertions(+), 130 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
index f9a81ad208..5215186e36 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
@@ -29,7 +29,6 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
@@ -58,6 +57,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
+
/**
* Base class for AWS processors that uses AWSCredentialsProvider interface
for creating AWS clients.
*
@@ -92,14 +93,6 @@ public abstract class
AbstractAWSCredentialsProviderProcessor<ClientType extends
// Property Descriptors
- public static final PropertyDescriptor REGION = new
PropertyDescriptor.Builder()
- .name("Region")
- .description("The AWS Region to connect to.")
- .required(true)
- .allowableValues(getAvailableRegions())
-
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
- .build();
-
public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
.name("Communications Timeout")
.description("The amount of time to wait in order to establish a
connection to AWS or receive data from AWS before timing out.")
@@ -177,19 +170,6 @@ public abstract class
AbstractAWSCredentialsProviderProcessor<ClientType extends
this.clientCache.cleanUp();
}
- public static AllowableValue createAllowableValue(final Regions region) {
- return new AllowableValue(region.getName(), region.getDescription(),
"AWS Region Code : " + region.getName());
- }
-
- public static AllowableValue[] getAvailableRegions() {
- final List<AllowableValue> values = new ArrayList<>();
- for (final Regions region : Regions.values()) {
- values.add(createAllowableValue(region));
- }
- return values.toArray(new AllowableValue[0]);
- }
-
-
@Override
public void migrateProperties(final PropertyConfiguration config) {
migrateAuthenticationProperties(config);
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 9d8f643c44..d449218d4f 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -22,7 +22,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.Signer;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Builder;
import com.amazonaws.services.s3.AmazonS3Client;
@@ -35,10 +34,8 @@ import com.amazonaws.services.s3.model.EmailAddressGrantee;
import com.amazonaws.services.s3.model.Grantee;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
@@ -49,7 +46,6 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import org.apache.nifi.processors.aws.signer.AwsCustomSignerUtil;
@@ -62,11 +58,13 @@ import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
-import static java.lang.String.format;
import static
org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER;
import static
org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V4_SIGNER;
import static
org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER;
import static
org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER;
+import static
org.apache.nifi.processors.aws.util.RegionUtilV1.ATTRIBUTE_DEFINED_REGION;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveS3Region;
public abstract class AbstractS3Processor extends
AbstractAWSCredentialsProviderProcessor<AmazonS3Client> {
@@ -182,16 +180,6 @@ public abstract class AbstractS3Processor extends
AbstractAWSCredentialsProvider
.dynamicallyModifiesClasspath(true)
.build();
- public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
- static final AllowableValue ATTRIBUTE_DEFINED_REGION = new
AllowableValue("attribute-defined-region",
- "Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
- "Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as
region.");
-
- public static final PropertyDescriptor S3_REGION = new
PropertyDescriptor.Builder()
- .fromPropertyDescriptor(REGION)
- .allowableValues(getAvailableS3Regions())
- .build();
-
public static final PropertyDescriptor ENCRYPTION_SERVICE = new
PropertyDescriptor.Builder()
.name("encryption-service")
.displayName("Encryption Service")
@@ -292,7 +280,7 @@ public abstract class AbstractS3Processor extends
AbstractAWSCredentialsProvider
* @return The created S3 client
*/
protected AmazonS3Client getS3Client(final ProcessContext context, final
Map<String, String> attributes) {
- final Region region = resolveRegion(context, attributes);
+ final Region region = resolveS3Region(context, attributes);
return getClient(context, region);
}
@@ -303,7 +291,7 @@ public abstract class AbstractS3Processor extends
AbstractAWSCredentialsProvider
* @return The newly created S3 client
*/
protected AmazonS3Client createClient(final ProcessContext context, final
Map<String, String> attributes) {
- final Region region = resolveRegion(context, attributes);
+ final Region region = resolveS3Region(context, attributes);
return createClient(context, region);
}
@@ -451,36 +439,8 @@ public abstract class AbstractS3Processor extends
AbstractAWSCredentialsProvider
return cannedAcl;
}
- private Region parseRegionValue(String regionValue) {
- if (regionValue == null) {
- throw new ProcessException(format("[%s] was selected as region
source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION,
S3_REGION_ATTRIBUTE));
- }
-
- try {
- return Region.getRegion(Regions.fromName(regionValue));
- } catch (Exception e) {
- throw new ProcessException(format("The [%s] attribute contains an
invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
- }
- }
-
- private Region resolveRegion(final ProcessContext context, final
Map<String, String> attributes) {
- String regionValue = context.getProperty(S3_REGION).getValue();
-
- if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
- regionValue = attributes.get(S3_REGION_ATTRIBUTE);
- }
-
- return parseRegionValue(regionValue);
- }
-
private boolean isAttributeDefinedRegion(final ProcessContext context) {
String regionValue = context.getProperty(S3_REGION).getValue();
return ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue);
}
-
- private static AllowableValue[] getAvailableS3Regions() {
- final AllowableValue[] availableRegions = getAvailableRegions();
- return ArrayUtils.addAll(availableRegions, ATTRIBUTE_DEFINED_REGION);
- }
-
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java
new file mode 100644
index 0000000000..5bb534a577
--- /dev/null
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/util/RegionUtilV1.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.util;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Utility class for AWS region methods. This class uses AWS SDK v1.
+ *
+ */
+public final class RegionUtilV1 {
+
+ private RegionUtilV1() {
+ }
+
+ public static final String S3_REGION_ATTRIBUTE = "s3.region" ;
+ public static final AllowableValue ATTRIBUTE_DEFINED_REGION = new
AllowableValue("attribute-defined-region",
+ "Use '" + S3_REGION_ATTRIBUTE + "' Attribute",
+ "Uses '" + S3_REGION_ATTRIBUTE + "' FlowFile attribute as
region.");
+
+ public static final PropertyDescriptor REGION = new
PropertyDescriptor.Builder()
+ .name("Region")
+ .description("The AWS Region to connect to.")
+ .required(true)
+ .allowableValues(getAvailableRegions())
+
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
+ .build();
+
+ public static final PropertyDescriptor S3_REGION = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(REGION)
+ .allowableValues(getAvailableS3Regions())
+ .build();
+
+ public static Region resolveS3Region(final PropertyContext context, final
Map<String, String> attributes) {
+ String regionValue = context.getProperty(S3_REGION).getValue();
+
+ if (ATTRIBUTE_DEFINED_REGION.getValue().equals(regionValue)) {
+ regionValue = attributes.get(S3_REGION_ATTRIBUTE);
+ }
+
+ return parseS3RegionValue(regionValue);
+ }
+
+ public static AllowableValue[] getAvailableS3Regions() {
+ final AllowableValue[] availableRegions = getAvailableRegions();
+ return ArrayUtils.add(availableRegions, ATTRIBUTE_DEFINED_REGION);
+ }
+
+ public static AllowableValue createAllowableValue(final Regions region) {
+ return new AllowableValue(region.getName(), region.getDescription(),
"AWS Region Code : " + region.getName());
+ }
+
+ public static AllowableValue[] getAvailableRegions() {
+ return Arrays.stream(Regions.values())
+ .map(RegionUtilV1::createAllowableValue)
+ .toArray(AllowableValue[]::new);
+ }
+
+ private static Region parseS3RegionValue(String regionValue) {
+ if (regionValue == null) {
+ throw new ProcessException(String.format("[%s] was selected as
region source but [%s] attribute does not exist", ATTRIBUTE_DEFINED_REGION,
S3_REGION_ATTRIBUTE));
+ }
+
+ try {
+ return Region.getRegion(Regions.fromName(regionValue));
+ } catch (Exception e) {
+ throw new ProcessException(String.format("The [%s] attribute
contains an invalid region value [%s]", S3_REGION_ATTRIBUTE, regionValue), e);
+ }
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java
index a6a2a29bb1..7a0b6d9bda 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/AbstractAwsProcessor.java
@@ -109,8 +109,8 @@ public abstract class AbstractAwsProcessor<T extends
SdkClient> extends Abstract
public static final PropertyDescriptor REGION = new
PropertyDescriptor.Builder()
.name("Region")
.required(true)
- .allowableValues(RegionUtil.getAvailableRegions())
-
.defaultValue(RegionUtil.createAllowableValue(Region.US_WEST_2).getValue())
+ .allowableValues(RegionUtilV2.getAvailableRegions())
+
.defaultValue(RegionUtilV2.createAllowableValue(Region.US_WEST_2).getValue())
.build();
public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtilV2.java
similarity index 93%
rename from
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java
rename to
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtilV2.java
index 45814f5c93..40be091198 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtil.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/v2/RegionUtilV2.java
@@ -25,10 +25,13 @@ import java.util.Comparator;
import java.util.List;
/**
- * Utility class for AWS region methods.
+ * Utility class for AWS region methods. This class uses AWS SDK v2.
*
*/
-public abstract class RegionUtil {
+public final class RegionUtilV2 {
+
+ private RegionUtilV2() {
+ }
/**
* Creates an AllowableValue from a Region.
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 4c4ca71589..7e4694ed79 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -38,6 +38,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
@SupportsBatching
@WritesAttributes({
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index c0cae164b8..21f6371c61 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -62,6 +62,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
@SupportsBatching
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index b1b085ca38..b703b14c26 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -99,6 +99,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
+
@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@@ -518,7 +520,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
- writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(REGION).getValue());
}
try {
@@ -540,7 +542,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
ObjectMetadata objectMetadata = getObjectMetadata(context,
client, versionSummary);
// Write the entity to the listing
- writer.addToListing(versionSummary, taggingResult,
objectMetadata, context.getProperty(S3_REGION).getValue());
+ writer.addToListing(versionSummary, taggingResult,
objectMetadata, context.getProperty(REGION).getValue());
listCount++;
}
@@ -605,7 +607,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
- writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(REGION).getValue());
}
try {
@@ -626,7 +628,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
// Write the entity to the listing
final GetObjectTaggingResult taggingResult =
getTaggingResult(context, client, versionSummary);
final ObjectMetadata objectMetadata =
getObjectMetadata(context, client, versionSummary);
- writer.addToListing(versionSummary, taggingResult,
objectMetadata, context.getProperty(S3_REGION).getValue());
+ writer.addToListing(versionSummary, taggingResult,
objectMetadata, context.getProperty(REGION).getValue());
// Track the latest lastModified timestamp and keys having
that timestamp.
// NOTE: Amazon S3 lists objects in UTF-8 character
encoding in lexicographical order. Not ordered by timestamps.
@@ -736,7 +738,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
- writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(REGION).getValue());
}
try {
@@ -750,7 +752,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
final AmazonS3Client s3Client = getClient(context);
final GetObjectTaggingResult taggingResult =
getTaggingResult(context, s3Client, s3VersionSummary);
final ObjectMetadata objectMetadata =
getObjectMetadata(context, s3Client, s3VersionSummary);
- writer.addToListing(s3VersionSummary, taggingResult,
objectMetadata, context.getProperty(S3_REGION).getValue());
+ writer.addToListing(s3VersionSummary, taggingResult,
objectMetadata, context.getProperty(REGION).getValue());
listCount++;
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 26c759417a..9237f77774 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -89,6 +89,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
index cb06bf735d..8665c6378b 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
@@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
@SupportsBatching
@WritesAttributes({
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
index c47911abd5..254435299c 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
@@ -36,8 +36,8 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
@@ -108,8 +108,8 @@ public class StandardS3EncryptionService extends
AbstractControllerService imple
.displayName("KMS Region")
.description("The Region of the AWS Key Management Service. Only
used in case of Client-side KMS.")
.required(false)
- .allowableValues(AbstractS3Processor.getAvailableRegions())
-
.defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue())
+ .allowableValues(RegionUtilV1.getAvailableRegions())
+
.defaultValue(RegionUtilV1.createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();
private String keyValue = "";
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceService.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceService.java
new file mode 100644
index 0000000000..c797d854d1
--- /dev/null
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceService.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.service;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.S3Object;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
+import org.apache.nifi.processor.exception.ProcessException;
+import
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
+
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.resolveS3Region;
+import static org.apache.nifi.util.StringUtils.isBlank;
+
+@Tags({"Amazon", "S3", "AWS", "file", "resource"})
+@SeeAlso({FetchS3Object.class})
+@CapabilityDescription("Provides an Amazon Web Services (AWS) S3 file resource
for other components.")
+@UseCase(
+ description = "Fetch a specific file from S3. " +
+ "The service provides higher performance compared to fetch
processors when the data should be moved between different storages without any
transformation.",
+ configuration = """
+ "Bucket" = "${s3.bucket}"
+ "Object Key" = "${filename}"
+
+ The "Region" property must be set to denote the S3 region that
the Bucket resides in.
+
+ The "AWS Credentials Provider Service" property should specify
an instance of the AWSCredentialsProviderService in order to provide
credentials for accessing the bucket.
+ """
+)
+public class S3FileResourceService extends AbstractControllerService
implements FileResourceService {
+
+ public static final PropertyDescriptor BUCKET_WITH_DEFAULT_VALUE = new
PropertyDescriptor.Builder()
+
.fromPropertyDescriptor(AbstractS3Processor.BUCKET_WITH_DEFAULT_VALUE)
+ .build();
+
+ public static final PropertyDescriptor KEY = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AbstractS3Processor.KEY)
+ .build();
+
+ public static final PropertyDescriptor S3_REGION = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(RegionUtilV1.S3_REGION)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = List.of(
+ BUCKET_WITH_DEFAULT_VALUE,
+ KEY,
+ S3_REGION,
+ AWS_CREDENTIALS_PROVIDER_SERVICE);
+
+ private final Cache<Region, AmazonS3> clientCache =
Caffeine.newBuilder().build();
+
+ private volatile PropertyContext context;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.context = context;
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ this.context = null;
+ clientCache.asMap().values().forEach(AmazonS3::shutdown);
+ clientCache.invalidateAll();
+ clientCache.cleanUp();
+ }
+
+ @Override
+ public FileResource getFileResource(Map<String, String> attributes) {
+ final AWSCredentialsProviderService awsCredentialsProviderService =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
+ .asControllerService(AWSCredentialsProviderService.class);
+ final AmazonS3 client = getS3Client(attributes,
awsCredentialsProviderService.getCredentialsProvider());
+
+ try {
+ return fetchObject(client, attributes);
+ } catch (final ProcessException | SdkClientException e) {
+ throw new ProcessException("Failed to fetch s3 object", e);
+ }
+ }
+
+ /**
+ * Fetches s3 object from the provided bucket and returns it as
FileResource
+ *
+ * @param client amazon s3 client
+ * @param attributes configuration attributes
+ * @return fetched s3 object as FileResource
+ * @throws ProcessException if the object 'bucketName/key' does not exist
+ */
+ private FileResource fetchObject(final AmazonS3 client, final Map<String,
String> attributes) throws ProcessException,
+ SdkClientException {
+ final String bucketName =
context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(attributes).getValue();
+ final String key =
context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+
+ if (isBlank(bucketName) || isBlank(key)) {
+ throw new ProcessException("Bucket name or key value is missing");
+ }
+
+ if (!client.doesObjectExist(bucketName, key)) {
+ throw new ProcessException(String.format("Object '%s/%s' does not
exist in s3", bucketName, key));
+ }
+
+ final S3Object object = client.getObject(bucketName, key);
+ return new FileResource(object.getObjectContent(),
object.getObjectMetadata().getContentLength());
+ }
+
+ protected AmazonS3 getS3Client(Map<String, String> attributes,
AWSCredentialsProvider credentialsProvider) {
+ final Region region = resolveS3Region(context, attributes);
+ return clientCache.get(region, ignored -> AmazonS3Client.builder()
+ .withRegion(region.getName())
+ .withCredentials(credentialsProvider)
+ .build());
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
index 7fab520043..20311eb977 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
@@ -56,6 +56,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
+
@SupportsBatching
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"Amazon", "AWS", "Client", "Gateway-API", "Rest", "http", "https"})
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 390d4c908b..ca338c3972 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -15,3 +15,4 @@
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService
org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService
+org.apache.nifi.processors.aws.s3.service.S3FileResourceService
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index f8eb6dd5b2..3dff94a4fa 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -37,6 +37,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.Tag;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -251,7 +252,7 @@ public abstract class AbstractS3IT {
Assertions.fail("Could not set security properties");
}
- runner.setProperty(AbstractS3Processor.S3_REGION, getRegion());
+ runner.setProperty(RegionUtilV1.S3_REGION, getRegion());
runner.setProperty(AbstractS3Processor.ENDPOINT_OVERRIDE,
getEndpointOverride());
runner.setProperty(AbstractS3Processor.BUCKET_WITHOUT_DEFAULT_VALUE,
BUCKET_NAME);
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
index 265de4f45f..7d57ff007b 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -91,7 +92,7 @@ public class ITFetchS3Object extends AbstractS3IT {
final TestRunner runner = TestRunners.newTestRunner(new
FetchS3Object());
setSecureProperties(runner);
- runner.setProperty(FetchS3Object.S3_REGION, getRegion());
+ runner.setProperty(RegionUtilV1.S3_REGION, getRegion());
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index 43f936c9f0..cf2b660977 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -34,6 +34,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import
org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@@ -447,7 +448,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = initTestRunner();
setSecureProperties(runner);
- runner.setProperty(PutS3Object.S3_REGION, getRegion());
+ runner.setProperty(RegionUtilV1.S3_REGION, getRegion());
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
BUCKET_NAME);
runner.setProperty(PutS3Object.KEY, "${filename}");
@@ -764,7 +765,7 @@ public class ITPutS3Object extends AbstractS3IT {
final TestRunner runner = initTestRunner();
setSecureProperties(runner);
- runner.setProperty(PutS3Object.S3_REGION, getRegion());
+ runner.setProperty(RegionUtilV1.S3_REGION, getRegion());
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
BUCKET_NAME);
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD,
TEST_PARTSIZE_STRING);
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE,
TEST_PARTSIZE_STRING);
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
index c434c012fb..58fb0c2d1b 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -22,6 +22,7 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteVersionRequest;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@@ -56,7 +57,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteObjectSimple() {
- runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
@@ -75,7 +76,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteObjectSimpleRegionFromFlowFileAttribute() {
- runner.setProperty(DeleteS3Object.S3_REGION,
"attribute-defined-region");
+ runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
@@ -89,7 +90,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteObjectS3Exception() {
- runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
@@ -105,7 +106,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteVersionSimple() {
- runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(DeleteS3Object.VERSION_ID, "test-version");
final Map<String, String> attrs = new HashMap<>();
@@ -126,7 +127,7 @@ public class TestDeleteS3Object {
@Test
public void testDeleteVersionFromExpressions() {
- runner.setProperty(DeleteS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"${s3.bucket}");
runner.setProperty(DeleteS3Object.VERSION_ID, "${s3.version}");
final Map<String, String> attrs = new HashMap<>();
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index db1406a0d5..9d30ac0770 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -32,6 +32,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -80,7 +81,7 @@ public class TestFetchS3Object {
@Test
public void testGetObject() throws IOException {
- runner.setProperty(FetchS3Object.S3_REGION,
"attribute-defined-region");
+ runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@@ -147,7 +148,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectWithRequesterPays() throws IOException {
- runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket");
runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true");
final Map<String, String> attrs = new HashMap<>();
@@ -205,7 +206,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectVersion() throws IOException {
- runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket");
runner.setProperty(FetchS3Object.VERSION_ID, "${s3.version}");
final Map<String, String> attrs = new HashMap<>();
@@ -245,7 +246,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectExceptionGoesToFailure() {
- runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@@ -259,7 +260,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesBucketName() {
- runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@@ -288,7 +289,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesAuthentication() {
- runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@@ -312,7 +313,7 @@ public class TestFetchS3Object {
@Test
public void testFetchObject_FailAdditionalAttributesNetworkFailure() {
- runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@@ -330,7 +331,7 @@ public class TestFetchS3Object {
@Test
public void testGetObjectReturnsNull() {
- runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
@@ -344,7 +345,7 @@ public class TestFetchS3Object {
@Test
public void testFlowFileAccessExceptionGoesToFailure() {
- runner.setProperty(FetchS3Object.S3_REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index e5655ebdf0..52cf531f44 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -40,6 +40,7 @@ import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.state.MockStateManager;
@@ -102,7 +103,7 @@ public class TestListS3 {
@Test
public void testList() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
Date lastModified = new Date();
@@ -154,7 +155,7 @@ public class TestListS3 {
@Test
public void testListWithRecords() throws InitializationException {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
final MockRecordWriter recordWriter = new MockRecordWriter(null,
false);
@@ -205,7 +206,7 @@ public class TestListS3 {
@Test
public void testListWithRequesterPays() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
@@ -251,7 +252,7 @@ public class TestListS3 {
@Test
public void testListWithRequesterPays_invalid() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.USE_VERSIONS, "true"); // requester pays
cannot be used with versions
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
@@ -261,7 +262,7 @@ public class TestListS3 {
@Test
public void testListVersion2() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.LIST_TYPE, "2");
@@ -307,7 +308,7 @@ public class TestListS3 {
@Test
public void testListVersion2WithRequesterPays() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
runner.setProperty(ListS3.LIST_TYPE, "2");
@@ -354,7 +355,7 @@ public class TestListS3 {
@Test
public void testListVersions() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.USE_VERSIONS, "true");
@@ -398,7 +399,7 @@ public class TestListS3 {
@Test
public void testListObjectsNothingNew() throws IOException {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
Calendar calendar = Calendar.getInstance();
@@ -434,7 +435,7 @@ public class TestListS3 {
@Test
public void testListIgnoreByMinAge() throws IOException {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.MIN_AGE, "30 sec");
@@ -485,7 +486,7 @@ public class TestListS3 {
@Test
public void testListIgnoreByMaxAge() throws IOException {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.MAX_AGE, "30 sec");
Date lastModifiedNow = new Date();
@@ -533,7 +534,7 @@ public class TestListS3 {
@Test
public void testWriteObjectTags() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true");
@@ -560,7 +561,7 @@ public class TestListS3 {
@Test
public void testWriteUserMetadata() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
@@ -588,7 +589,7 @@ public class TestListS3 {
@Test
public void testNoTrackingList() {
- runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(RegionUtilV1.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
@@ -676,7 +677,7 @@ public class TestListS3 {
assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
- runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
+ runner.setProperty(RegionUtilV1.REGION,
Regions.EU_CENTRAL_1.getName());
assertTrue(listS3.isResetTracking());
@@ -765,7 +766,7 @@ public class TestListS3 {
assertNotNull(listS3.getListedEntityTracker());
- runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
+ runner.setProperty(RegionUtilV1.REGION,
Regions.EU_CENTRAL_1.getName());
assertTrue(listS3.isResetTracking());
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index ab7bf59fb4..de42e4cc20 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -43,6 +43,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.signer.AwsSignerType;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -245,7 +246,7 @@ public class TestPutS3Object {
}
private void prepareTest(String filename) {
- runner.setProperty(PutS3Object.S3_REGION, "ap-northeast-1");
+ runner.setProperty(RegionUtilV1.S3_REGION, "ap-northeast-1");
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.assertValid();
@@ -258,7 +259,7 @@ public class TestPutS3Object {
}
private void prepareTestWithRegionInAttributes(String filename, String
region) {
- runner.setProperty(PutS3Object.S3_REGION, "attribute-defined-region");
+ runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region");
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.assertValid();
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java
index 263214f584..0968fbceba 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java
@@ -27,6 +27,7 @@ import
com.amazonaws.services.s3.model.SetObjectTaggingRequest;
import com.amazonaws.services.s3.model.Tag;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -70,7 +71,7 @@ public class TestTagS3Object {
public void testTagObjectSimple() {
final String tagKey = "k";
final String tagVal = "v";
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@@ -98,7 +99,7 @@ public class TestTagS3Object {
@Test
public void testTagObjectSimpleRegionFromFlowFileAttribute() {
- runner.setProperty(TagS3Object.S3_REGION, "attribute-defined-region");
+ runner.setProperty(RegionUtilV1.S3_REGION, "attribute-defined-region");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "k");
runner.setProperty(TagS3Object.TAG_VALUE, "v");
@@ -118,7 +119,7 @@ public class TestTagS3Object {
public void testTagObjectVersion() {
final String tagKey = "k";
final String tagVal = "v";
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.VERSION_ID, "test-version");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
@@ -148,7 +149,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@@ -183,7 +184,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@@ -212,7 +213,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@@ -247,7 +248,7 @@ public class TestTagS3Object {
final String tagKey = "nk";
final String tagVal = "nv";
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, tagKey);
runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
@@ -263,7 +264,7 @@ public class TestTagS3Object {
@Test
public void testBucketEvaluatedAsBlank() {
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"${not.existant.attribute}");
runner.setProperty(TagS3Object.TAG_KEY, "key");
runner.setProperty(TagS3Object.TAG_VALUE, "val");
@@ -278,7 +279,7 @@ public class TestTagS3Object {
@Test
public void testTagKeyEvaluatedAsBlank() {
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "${not.existant.attribute}");
runner.setProperty(TagS3Object.TAG_VALUE, "val");
@@ -293,7 +294,7 @@ public class TestTagS3Object {
@Test
public void testTagValEvaluatedAsBlank() {
- runner.setProperty(TagS3Object.S3_REGION, "us-west-2");
+ runner.setProperty(RegionUtilV1.S3_REGION, "us-west-2");
runner.setProperty(TagS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
"test-bucket");
runner.setProperty(TagS3Object.TAG_KEY, "tagKey");
runner.setProperty(TagS3Object.TAG_VALUE, "${not.existant.attribute}");
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceServiceTest.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceServiceTest.java
new file mode 100644
index 0000000000..674bd5504a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/service/S3FileResourceServiceTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3.service;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.processor.exception.ProcessException;
+import
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
+import
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Map;
+
+import static
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class S3FileResourceServiceTest {
+ private static final String CONTROLLER_SERVICE = "AWSCredentialsService";
+ private static final String BUCKET_NAME = "test-bucket";
+ private static final String KEY = "key";
+ private static final long CONTENT_LENGTH = 10L;
+
+ @Mock
+ private AmazonS3 client;
+
+ @Mock
+ private S3Object s3Object;
+
+ @Mock
+ private ObjectMetadata metadata;
+
+ @Mock
+ private S3ObjectInputStream inputStream;
+
+ @InjectMocks
+ private TestS3FileResourceService service;
+ private TestRunner runner;
+
+ @BeforeEach
+ public void setup() throws InitializationException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService("S3FileResourceService", service);
+ }
+
+ @Test
+ void testGetFileResourceHappyPath() throws InitializationException {
+ setupS3Client();
+ setupService();
+
+ FileResource fileResource = service.getFileResource(Map.of());
+ assertFileResource(fileResource);
+ }
+
+ @Test
+ void testNonExistingObject() throws InitializationException {
+ when(client.doesObjectExist(BUCKET_NAME, KEY)).thenReturn(false);
+ setupService();
+
+ assertThrows(ProcessException.class, () ->
service.getFileResource(Map.of()), "Failed to fetch s3 object");
+ verify(client).doesObjectExist(BUCKET_NAME, KEY);
+ verifyNoMoreInteractions(client);
+ }
+
+ @Test
+ void testValidBlobUsingELButMissingAttribute() throws
InitializationException {
+ setupService("${s3.bucket}", "${key}");
+
+ assertThrows(ProcessException.class,
+ () -> service.getFileResource(Map.of()), "Bucket name or key
value is missing");
+ verifyNoInteractions(client);
+ }
+
+ @Test
+ void testValidBlobUsingEL() throws InitializationException {
+ String bucketProperty = "s3.bucket";
+ String keyProperty = "key";
+ setupService("${" + bucketProperty + "}", "${" + keyProperty + "}");
+ setupS3Client();
+
+ FileResource fileResource = service.getFileResource(Map.of(
+ bucketProperty, BUCKET_NAME,
+ keyProperty, KEY));
+ assertFileResource(fileResource);
+ }
+
+ private void assertFileResource(FileResource fileResource) {
+ assertNotNull(fileResource);
+ assertEquals(fileResource.getInputStream(), inputStream);
+ assertEquals(fileResource.getSize(), CONTENT_LENGTH);
+ verify(client).doesObjectExist(BUCKET_NAME, KEY);
+ verify(client).getObject(BUCKET_NAME, KEY);
+ verify(s3Object).getObjectMetadata();
+ verify(metadata).getContentLength();
+ verify(s3Object).getObjectContent();
+ }
+
+ private void setupService() throws InitializationException {
+ setupService(BUCKET_NAME, KEY);
+ }
+
+ private void setupService(String bucket, String key) throws
InitializationException {
+ final AWSCredentialsProviderService credentialsService = new
AWSCredentialsProviderControllerService();
+
+ runner.addControllerService(CONTROLLER_SERVICE, credentialsService);
+ runner.enableControllerService(credentialsService);
+
+ runner.setProperty(service, AWS_CREDENTIALS_PROVIDER_SERVICE,
CONTROLLER_SERVICE);
+ runner.setProperty(service, S3FileResourceService.KEY, key);
+ runner.setProperty(service,
S3FileResourceService.BUCKET_WITH_DEFAULT_VALUE, bucket);
+
+ runner.enableControllerService(service);
+ }
+
+ private void setupS3Client() {
+ when(client.doesObjectExist(BUCKET_NAME, KEY)).thenReturn(true);
+ when(client.getObject(BUCKET_NAME, KEY)).thenReturn(s3Object);
+ when(s3Object.getObjectContent()).thenReturn(inputStream);
+ when(s3Object.getObjectMetadata()).thenReturn(metadata);
+ when(metadata.getContentLength()).thenReturn(CONTENT_LENGTH);
+ }
+
+ private static class TestS3FileResourceService extends
S3FileResourceService {
+
+ private final AmazonS3 client;
+
+ private TestS3FileResourceService(AmazonS3 client) {
+ this.client = client;
+ }
+
+ @Override
+ protected AmazonS3 getS3Client(Map<String, String> attributes,
AWSCredentialsProvider credentialsProvider) {
+ return client;
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/AbstractSQSIT.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/AbstractSQSIT.java
index c0c7981b79..378b38a07f 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/AbstractSQSIT.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/AbstractSQSIT.java
@@ -18,8 +18,8 @@
package org.apache.nifi.processors.aws.sqs;
import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
@@ -84,8 +84,8 @@ public abstract class AbstractSQSIT {
TestRunner runner = TestRunners.newTestRunner(processorClass);
AuthUtils.enableAccessKey(runner, localstack.getAccessKey(),
localstack.getSecretKey());
- runner.setProperty(AbstractS3Processor.S3_REGION,
localstack.getRegion());
- runner.setProperty(AbstractS3Processor.ENDPOINT_OVERRIDE,
localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString());
+ runner.setProperty(AbstractAwsProcessor.REGION,
localstack.getRegion());
+ runner.setProperty(AbstractAwsProcessor.ENDPOINT_OVERRIDE,
localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString());
runner.setProperty("Queue URL", queueUrl);
return runner;
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java
index 2506524d67..4d4adf5bde 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAWSGatewayApiCommon.java
@@ -22,6 +22,7 @@ import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.proxy.StandardProxyConfigurationService;
@@ -66,7 +67,7 @@ public abstract class TestInvokeAWSGatewayApiCommon {
}
public void setupEndpointAndRegion() {
- runner.setProperty(InvokeAWSGatewayApi.REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.REGION, "us-east-1");
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd");
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT,
mockWebServer.url("/").toString());
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
index 8a62496efc..d10a7346c8 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
@@ -30,6 +30,7 @@ import org.apache.http.message.BasicStatusLine;
import org.apache.http.protocol.HttpContext;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.aws.util.RegionUtilV1;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -64,7 +65,7 @@ public class TestInvokeAmazonGatewayApiMock {
AuthUtils.enableAccessKey(runner, "awsAccessKey", "awsSecretKey");
- runner.setProperty(InvokeAWSGatewayApi.REGION, "us-east-1");
+ runner.setProperty(RegionUtilV1.REGION, "us-east-1");
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd");
runner.setProperty(InvokeAWSGatewayApi.PROP_RESOURCE_NAME, "/TEST");
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT,
"https://foobar.execute-api.us-east-1.amazonaws.com");