This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new aa8f13ee88 NIFI-15335 Added unit tests for property migration in
Airtable, AMQP, Asana, ASN.1 and AWS bundles (#10653)
aa8f13ee88 is described below
commit aa8f13ee88b3ea3df9c0509e095de35f3487d8e4
Author: dan-s1 <[email protected]>
AuthorDate: Tue Dec 23 14:36:46 2025 -0500
NIFI-15335 Added unit tests for property migration in Airtable, AMQP,
Asana, ASN.1 and AWS bundles (#10653)
Signed-off-by: David Handermann <[email protected]>
---
.../airtable/TestQueryAirtableTable.java | 22 ++++
.../nifi/amqp/processors/ConsumeAMQPTest.java | 46 +++++--
.../nifi/amqp/processors/PublishAMQPTest.java | 20 ++++
.../asana/GetAsanaObjectConfigurationTest.java | 22 ++++
.../StandardAsanaClientProviderServiceTest.java | 26 ++++
.../java/org/apache/nifi/jasn1/JASN1Reader.java | 91 +++++++-------
.../org/apache/nifi/jasn1/JASN1ReaderTest.java | 36 +++++-
.../ObsoleteAbstractAwsProcessorProperties.java | 37 ++++++
.../aws/cloudwatch/TestPutCloudWatchMetric.java | 43 ++++++-
...WSCredentialsProviderControllerServiceTest.java | 53 ++++++++
.../aws/dynamodb/PutDynamoDBRecordTest.java | 42 +++++++
.../kinesis/stream/TestConsumeKinesisStream.java | 133 +++++++++++++++------
.../aws/kinesis/stream/TestPutKinesisStream.java | 46 ++++++-
.../apache/nifi/processors/aws/s3/TestListS3.java | 41 ++++++-
.../TestStandardS3EncryptionService.java | 27 ++++-
.../apache/nifi/processors/aws/sqs/TestPutSQS.java | 46 +++++--
16 files changed, 613 insertions(+), 118 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/TestQueryAirtableTable.java
b/nifi-extension-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/TestQueryAirtableTable.java
index 941f8fb926..bded710627 100644
---
a/nifi-extension-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/TestQueryAirtableTable.java
+++
b/nifi-extension-bundles/nifi-airtable-bundle/nifi-airtable-processors/src/test/java/org/apache/nifi/processors/airtable/TestQueryAirtableTable.java
@@ -22,6 +22,7 @@ import mockwebserver3.MockWebServer;
import okhttp3.HttpUrl;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
@@ -31,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -168,4 +170,24 @@ public class TestQueryAirtableTable {
assertTrue(results.isEmpty());
}
}
+
+ @Test
+ void testMigration() {
+ final Map<String, String> expected = Map.ofEntries(
+ Map.entry("api-key", QueryAirtableTable.PAT.getName()),
+ Map.entry("api-url", QueryAirtableTable.API_URL.getName()),
+ Map.entry("pat", QueryAirtableTable.PAT.getName()),
+ Map.entry("base-id", QueryAirtableTable.BASE_ID.getName()),
+ Map.entry("table-id", QueryAirtableTable.TABLE_ID.getName()),
+ Map.entry("fields", QueryAirtableTable.FIELDS.getName()),
+ Map.entry("custom-filter",
QueryAirtableTable.CUSTOM_FILTER.getName()),
+ Map.entry("query-time-window-lag",
QueryAirtableTable.QUERY_TIME_WINDOW_LAG.getName()),
+ Map.entry("web-client-service-provider",
QueryAirtableTable.WEB_CLIENT_SERVICE_PROVIDER.getName()),
+ Map.entry("query-page-size",
QueryAirtableTable.QUERY_PAGE_SIZE.getName()),
+ Map.entry("max-records-per-flowfile",
QueryAirtableTable.MAX_RECORDS_PER_FLOWFILE.getName())
+ );
+
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+ assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
+ }
}
diff --git
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 4ae2e43f32..66361d7cec 100644
---
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -26,6 +26,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
@@ -36,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
@@ -130,7 +132,7 @@ public class ConsumeAMQPTest {
runner.assertTransferCount(ConsumeAMQP.REL_SUCCESS, 1);
- final MockFlowFile helloFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile helloFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).getFirst();
helloFF.assertContentEquals("hello");
@@ -159,7 +161,7 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).getFirst();
assertNotNull(successFF);
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE,
"myExchange");
@@ -190,7 +192,7 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_FORMAT,
OutputHeaderFormat.JSON_STRING);
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).getFirst();
assertNotNull(successFF);
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE,
"myExchange");
@@ -226,14 +228,12 @@ public class ConsumeAMQPTest {
runner.setProperty(ConsumeAMQP.HEADER_FORMAT,
OutputHeaderFormat.ATTRIBUTES);
runner.setProperty(ConsumeAMQP.HEADER_KEY_PREFIX, headerPrefix);
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).getFirst();
assertNotNull(successFF);
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE,
"myExchange");
successFF.assertAttributeNotExists(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE);
- expectedHeadersMap.forEach((key, value) -> {
- successFF.assertAttributeEquals(headerPrefix + "." + key,
value.toString());
- } );
+ expectedHeadersMap.forEach((key, value) ->
successFF.assertAttributeEquals(headerPrefix + "." + key, value.toString()));
}
}
@Test
@@ -259,7 +259,7 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|");
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).getFirst();
assertNotNull(successFF);
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE,
"myExchange");
@@ -296,7 +296,7 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES, "True");
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).getFirst();
assertNotNull(successFF);
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE,
"myExchange");
@@ -328,7 +328,7 @@ public class ConsumeAMQPTest {
runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|");
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).getFirst();
assertNotNull(successFF);
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE,
"myExchange");
@@ -361,7 +361,7 @@ public class ConsumeAMQPTest {
TestRunner runner = initTestRunner(proc);
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).getFirst();
assertNotNull(successFF);
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, "key1");
successFF.assertAttributeEquals(ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE,
"myExchange");
@@ -370,6 +370,30 @@ public class ConsumeAMQPTest {
}
}
+ @Test
+ void testMigration() {
+ final TestRunner runner = initTestRunner(new ConsumeAMQP());
+ final Map<String, String> expectedRenamed = Map.ofEntries(
+ Map.entry("User Name", AbstractAMQPProcessor.USER.getName()),
+ Map.entry("ssl-context-service",
AbstractAMQPProcessor.SSL_CONTEXT_SERVICE.getName()),
+ Map.entry("cert-authentication",
AbstractAMQPProcessor.CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED.getName()),
+ Map.entry("auto.acknowledge",
ConsumeAMQP.AUTO_ACKNOWLEDGE.getName()),
+ Map.entry("batch.size", ConsumeAMQP.BATCH_SIZE.getName()),
+ Map.entry("prefetch.count",
ConsumeAMQP.PREFETCH_COUNT.getName()),
+ Map.entry("header.format",
ConsumeAMQP.HEADER_FORMAT.getName()),
+ Map.entry("header.key.prefix",
ConsumeAMQP.HEADER_KEY_PREFIX.getName()),
+ Map.entry("header.separator",
ConsumeAMQP.HEADER_SEPARATOR.getName()),
+ Map.entry("remove.curly.braces",
ConsumeAMQP.REMOVE_CURLY_BRACES.getName())
+ );
+
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+ assertEquals(expectedRenamed,
propertyMigrationResult.getPropertiesRenamed());
+
+ final Set<String> expectedRemoved = Set.of("ssl-client-auth");
+ assertEquals(expectedRemoved,
propertyMigrationResult.getPropertiesRemoved());
+
+ }
+
private TestRunner initTestRunner(ConsumeAMQP proc) {
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");
diff --git
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index 172390fe07..f73850cc7e 100644
---
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -22,6 +22,7 @@ import com.rabbitmq.client.GetResponse;
import org.apache.nifi.amqp.processors.PublishAMQP.InputHeaderSource;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
@@ -32,6 +33,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -233,6 +235,24 @@ public class PublishAMQPTest {
runner.assertNotValid();
}
+ @Test
+ void testMigration() {
+ final TestRunner runner = TestRunners.newTestRunner(PublishAMQP.class);
+ setConnectionProperties(runner);
+ final Map<String, String> expectedRenamed = Map.ofEntries(
+ Map.entry("User Name", AbstractAMQPProcessor.USER.getName()),
+ Map.entry("ssl-context-service",
AbstractAMQPProcessor.SSL_CONTEXT_SERVICE.getName()),
+ Map.entry("cert-authentication",
AbstractAMQPProcessor.CLIENT_CERTIFICATE_AUTHENTICATION_ENABLED.getName()),
+ Map.entry("header.separator",
PublishAMQP.HEADER_SEPARATOR.getName())
+ );
+
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+ assertEquals(expectedRenamed,
propertyMigrationResult.getPropertiesRenamed());
+
+ final Set<String> expectedRemoved = Set.of("ssl-client-auth");
+ assertEquals(expectedRemoved,
propertyMigrationResult.getPropertiesRemoved());
+ }
+
private void setConnectionProperties(TestRunner runner) {
runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.USER, "user");
diff --git
a/nifi-extension-bundles/nifi-asana-bundle/nifi-asana-processors/src/test/java/org/apache/nifi/processors/asana/GetAsanaObjectConfigurationTest.java
b/nifi-extension-bundles/nifi-asana-bundle/nifi-asana-processors/src/test/java/org/apache/nifi/processors/asana/GetAsanaObjectConfigurationTest.java
index 3f2c6cfe1c..dfe90df4b6 100644
---
a/nifi-extension-bundles/nifi-asana-bundle/nifi-asana-processors/src/test/java/org/apache/nifi/processors/asana/GetAsanaObjectConfigurationTest.java
+++
b/nifi-extension-bundles/nifi-asana-bundle/nifi-asana-processors/src/test/java/org/apache/nifi/processors/asana/GetAsanaObjectConfigurationTest.java
@@ -30,10 +30,12 @@ import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processors.asana.mocks.MockAsanaClientProviderService;
import org.apache.nifi.processors.asana.mocks.MockDistributedMapCacheClient;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Map;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
@@ -53,10 +55,13 @@ import static
org.apache.nifi.processors.asana.GetAsanaObject.PROP_ASANA_CLIENT_
import static
org.apache.nifi.processors.asana.GetAsanaObject.PROP_ASANA_OBJECT_TYPE;
import static
org.apache.nifi.processors.asana.GetAsanaObject.PROP_ASANA_OUTPUT_BATCH_SIZE;
import static
org.apache.nifi.processors.asana.GetAsanaObject.PROP_ASANA_PROJECT;
+import static
org.apache.nifi.processors.asana.GetAsanaObject.PROP_ASANA_SECTION;
+import static org.apache.nifi.processors.asana.GetAsanaObject.PROP_ASANA_TAG;
import static
org.apache.nifi.processors.asana.GetAsanaObject.PROP_ASANA_TEAM_NAME;
import static
org.apache.nifi.processors.asana.GetAsanaObject.PROP_DISTRIBUTED_CACHE_SERVICE;
import static org.apache.nifi.processors.asana.GetAsanaObject.REL_NEW;
import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;
@@ -431,6 +436,23 @@ public class GetAsanaObjectConfigurationTest {
verifyNoMoreInteractions(mockService.client);
}
+ @Test
+ void testMigration() {
+ final Map<String, String> expected = Map.ofEntries(
+ Map.entry("asana-controller-service",
PROP_ASANA_CLIENT_SERVICE.getName()),
+ Map.entry("distributed-cache-service",
PROP_DISTRIBUTED_CACHE_SERVICE.getName()),
+ Map.entry("asana-object-type",
PROP_ASANA_OBJECT_TYPE.getName()),
+ Map.entry("asana-project-name", PROP_ASANA_PROJECT.getName()),
+ Map.entry("asana-section-name", PROP_ASANA_SECTION.getName()),
+ Map.entry("asana-tag-name", PROP_ASANA_TAG.getName()),
+ Map.entry("asana-team-name", PROP_ASANA_TEAM_NAME.getName()),
+ Map.entry("asana-output-batch-size",
PROP_ASANA_OUTPUT_BATCH_SIZE.getName())
+ );
+
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+ assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
+ }
+
private void withMockAsanaClientService() throws InitializationException {
final String serviceIdentifier =
AsanaClientProviderService.class.getName();
runner.addControllerService(serviceIdentifier, mockService);
diff --git
a/nifi-extension-bundles/nifi-asana-bundle/nifi-asana-services/src/test/java/org/apache/nifi/controller/asana/StandardAsanaClientProviderServiceTest.java
b/nifi-extension-bundles/nifi-asana-bundle/nifi-asana-services/src/test/java/org/apache/nifi/controller/asana/StandardAsanaClientProviderServiceTest.java
index b75df42d24..6bf7535588 100644
---
a/nifi-extension-bundles/nifi-asana-bundle/nifi-asana-services/src/test/java/org/apache/nifi/controller/asana/StandardAsanaClientProviderServiceTest.java
+++
b/nifi-extension-bundles/nifi-asana-bundle/nifi-asana-services/src/test/java/org/apache/nifi/controller/asana/StandardAsanaClientProviderServiceTest.java
@@ -20,7 +20,9 @@ import com.asana.models.Project;
import mockwebserver3.MockResponse;
import mockwebserver3.MockWebServer;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockPropertyConfiguration;
import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.AfterEach;
@@ -187,6 +189,30 @@ public class StandardAsanaClientProviderServiceTest {
assertThrows(RuntimeException.class, service::createClient);
}
+ @Test
+ void testMigration() {
+ final Map<String, String> propertyValues = Map.of(
+ PROP_ASANA_API_BASE_URL.getName(), "someUrl",
+ PROP_ASANA_PERSONAL_ACCESS_TOKEN.getName(), "someAccessToken",
+ PROP_ASANA_WORKSPACE_NAME.getName(), "someWorkspace"
+ );
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(propertyValues);
+ StandardAsanaClientProviderService standardAsanaClientProviderService
= new StandardAsanaClientProviderService();
+ standardAsanaClientProviderService.migrateProperties(configuration);
+
+ Map<String, String> expected = Map.ofEntries(
+ Map.entry("asana-api-url", PROP_ASANA_API_BASE_URL.getName()),
+ Map.entry("asana-personal-access-token",
PROP_ASANA_PERSONAL_ACCESS_TOKEN.getName()),
+ Map.entry("asana-workspace-name",
PROP_ASANA_WORKSPACE_NAME.getName())
+ );
+
+ final PropertyMigrationResult result =
configuration.toPropertyMigrationResult();
+ final Map<String, String> propertiesRenamed =
result.getPropertiesRenamed();
+
+ assertEquals(expected, propertiesRenamed);
+ }
+
private String getMockWebServerUrl() {
return
mockWebServer.url("asana").newBuilder().host(LOCALHOST).build().toString();
}
diff --git
a/nifi-extension-bundles/nifi-asn1-bundle/nifi-asn1-services/src/main/java/org/apache/nifi/jasn1/JASN1Reader.java
b/nifi-extension-bundles/nifi-asn1-bundle/nifi-asn1-services/src/main/java/org/apache/nifi/jasn1/JASN1Reader.java
index 2d126004f1..8f91f8fe68 100644
---
a/nifi-extension-bundles/nifi-asn1-bundle/nifi-asn1-services/src/main/java/org/apache/nifi/jasn1/JASN1Reader.java
+++
b/nifi-extension-bundles/nifi-asn1-bundle/nifi-asn1-services/src/main/java/org/apache/nifi/jasn1/JASN1Reader.java
@@ -70,51 +70,52 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
@Tags({"asn", "ans1", "jasn.1", "jasn1", "record", "reader", "parser"})
@CapabilityDescription("Reads ASN.1 content and creates NiFi records. " +
"NOTE: ASN.1 schema preparation requires the JDK at runtime for model
compilation.")
public class JASN1Reader extends AbstractConfigurableComponent implements
RecordReaderFactory {
- private static final PropertyDescriptor ROOT_MODEL_NAME = new
PropertyDescriptor.Builder()
- .name("Root Model Name")
- .description("The model name in the form of 'MODULE-NAME.ModelType'. "
+
- "Mutually exclusive with and should be preferred to 'Root Model
Class Name'. (See additional details for more information.)")
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .build();
-
- private static final PropertyDescriptor ROOT_CLASS_NAME = new
PropertyDescriptor.Builder()
- .name("Root Model Class Name")
- .description("A canonical class name that is generated by the ASN.1
compiler to encode the ASN.1 input data. Mutually exclusive with 'Root Model
Name'." +
- " Should be used when the former cannot be set properly. See
additional details for more information.")
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .build();
+ static final PropertyDescriptor ROOT_MODEL_NAME = new
PropertyDescriptor.Builder()
+ .name("Root Model Name")
+ .description("The model name in the form of
'MODULE-NAME.ModelType'. " +
+ "Mutually exclusive with and should be preferred to 'Root
Model Class Name'. (See additional details for more information.)")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor ROOT_CLASS_NAME = new
PropertyDescriptor.Builder()
+ .name("Root Model Class Name")
+ .description("A canonical class name that is generated by the
ASN.1 compiler to encode the ASN.1 input data. Mutually exclusive with 'Root
Model Name'." +
+ " Should be used when the former cannot be set properly.
See additional details for more information.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
/**
* Not included!
* To make this service as simple as possible, records are to be expected
to correspond to a concrete ASN type.
* Not removing though, should it be required in the future.
*/
- private static final PropertyDescriptor RECORD_FIELD = new
PropertyDescriptor.Builder()
- .name("Record Field")
- .description("Optional field name pointing an instance field
containing record elements.")
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .build();
+ static final PropertyDescriptor RECORD_FIELD = new
PropertyDescriptor.Builder()
+ .name("Record Field")
+ .description("Optional field name pointing an instance field
containing record elements.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
static final PropertyDescriptor ASN_FILES = new
PropertyDescriptor.Builder()
- .name("ASN.1 Files")
- .description("Comma-separated list of ASN.1 files.")
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("ASN.1 Files")
+ .description("Comma-separated list of ASN.1 files.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
/**
* Not included!
@@ -122,13 +123,13 @@ public class JASN1Reader extends
AbstractConfigurableComponent implements Record
* benefit would be questionable.
* Not removing though, should it be required in the future.
*/
- private static final PropertyDescriptor ITERATOR_PROVIDER_CLASS_NAME = new
PropertyDescriptor.Builder()
- .name("Iterator Provider Class Name")
- .description("A canonical class name implementing record iteration
logic.")
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .build();
+ static final PropertyDescriptor ITERATOR_PROVIDER_CLASS_NAME = new
PropertyDescriptor.Builder()
+ .name("Iterator Provider Class Name")
+ .description("A canonical class name implementing record iteration
logic.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
private static final AllowableValue DEFAULT = new AllowableValue(
"DEFAULT",
@@ -142,7 +143,7 @@ public class JASN1Reader extends
AbstractConfigurableComponent implements Record
"Perform additional preprocessing, resulting in potentially
modified schema. (See additional details for more information.)"
);
- private static final PropertyDescriptor SCHEMA_PREPARATION_STRATEGY = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor SCHEMA_PREPARATION_STRATEGY = new
PropertyDescriptor.Builder()
.name("Schema Preparation Strategy")
.description("When set, NiFi will do additional preprocessing steps
that creates modified versions of the provided ASN files," +
" removing unsupported features in a way that makes them less
strict but otherwise should still be compatible with incoming data." +
@@ -153,7 +154,7 @@ public class JASN1Reader extends
AbstractConfigurableComponent implements Record
.defaultValue(DEFAULT.getValue())
.build();
- private static final PropertyDescriptor SCHEMA_PREPARATION_DIRECTORY = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor SCHEMA_PREPARATION_DIRECTORY = new
PropertyDescriptor.Builder()
.name("Schema Preparation Directory")
.description("When the processor is configured to do additional
preprocessing, new modified schema files will be created in this directory." +
" For more information about additional preprocessing please
see description of the 'Do Additional Preprocessing' property or Additional
Details - Additional Preprocessing.")
@@ -313,8 +314,8 @@ public class JASN1Reader extends
AbstractConfigurableComponent implements Record
}
List<File> javaFiles;
- try {
- javaFiles = Files.walk(asnOutDir)
+ try (Stream<Path> stream = Files.walk(asnOutDir)) {
+ javaFiles = stream
.filter(Files::isRegularFile)
.map(Object::toString)
.filter(filePath -> filePath.endsWith(".java"))
@@ -327,8 +328,7 @@ public class JASN1Reader extends
AbstractConfigurableComponent implements Record
JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
StandardJavaFileManager fileManager =
javaCompiler.getStandardFileManager(null, null, null);
- List<String> optionList = new ArrayList<>();
- optionList.addAll(Arrays.asList("-classpath",
com.beanit.asn1bean.ber.types.BerType.class.getProtectionDomain().getCodeSource().getLocation().getFile()));
+ List<String> optionList = new ArrayList<>(Arrays.asList("-classpath",
com.beanit.asn1bean.ber.types.BerType.class.getProtectionDomain().getCodeSource().getLocation().getFile()));
Iterable<? extends JavaFileObject> units;
units = fileManager.getJavaFileObjectsFromFiles(javaFiles);
@@ -354,9 +354,8 @@ public class JASN1Reader extends
AbstractConfigurableComponent implements Record
void deleteAsnOutDir() {
if (asnOutDir != null) {
- try {
- Files.walk(asnOutDir)
- .sorted(Comparator.reverseOrder())
+ try (Stream<Path> stream = Files.walk(asnOutDir)) {
+ stream.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException e) {
diff --git
a/nifi-extension-bundles/nifi-asn1-bundle/nifi-asn1-services/src/test/java/org/apache/nifi/jasn1/JASN1ReaderTest.java
b/nifi-extension-bundles/nifi-asn1-bundle/nifi-asn1-services/src/test/java/org/apache/nifi/jasn1/JASN1ReaderTest.java
index a9eb9198dc..ff168c934a 100644
---
a/nifi-extension-bundles/nifi-asn1-bundle/nifi-asn1-services/src/test/java/org/apache/nifi/jasn1/JASN1ReaderTest.java
+++
b/nifi-extension-bundles/nifi-asn1-bundle/nifi-asn1-services/src/test/java/org/apache/nifi/jasn1/JASN1ReaderTest.java
@@ -21,11 +21,14 @@ import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.StringJoiner;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockPropertyConfiguration;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -73,9 +76,10 @@ public class JASN1ReaderTest {
mockCloseable.close();
}
- assertTrue(testSubject.asnOutDir.toFile().exists());
- testSubject.deleteAsnOutDir();
- assertFalse(testSubject.asnOutDir.toFile().exists());
+ if (testSubject.asnOutDir != null &&
testSubject.asnOutDir.toFile().exists()) {
+ testSubject.deleteAsnOutDir();
+ assertFalse(testSubject.asnOutDir.toFile().exists());
+ }
}
@DisabledOnOs({ OS.WINDOWS })
@@ -165,6 +169,32 @@ public class JASN1ReaderTest {
testCompileError(asnFiles, expectedErrorMessages);
}
+ @Test
+ void testMigration() {
+ final Map<String, String> propertyValues = Map.of(
+ JASN1Reader.ROOT_MODEL_NAME.getName(), "someModel",
+ JASN1Reader.ROOT_CLASS_NAME.getName(), "someClass",
+ JASN1Reader.ASN_FILES.getName(), "someFile",
+ JASN1Reader.SCHEMA_PREPARATION_STRATEGY.getName(),
"someSchema",
+ JASN1Reader.SCHEMA_PREPARATION_DIRECTORY.getName(),
"someSchemDir"
+ );
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(propertyValues);
+ final JASN1Reader jasn1Reader = new JASN1Reader();
+ jasn1Reader.migrateProperties(configuration);
+
+ Map<String, String> expected = Map.ofEntries(
+ Map.entry("root-model-name",
JASN1Reader.ROOT_MODEL_NAME.getName()),
+ Map.entry("root-model-class-name",
JASN1Reader.ROOT_CLASS_NAME.getName()),
+ Map.entry("asn-files", JASN1Reader.ASN_FILES.getName())
+ );
+
+ final PropertyMigrationResult result =
configuration.toPropertyMigrationResult();
+ final Map<String, String> propertiesRenamed =
result.getPropertiesRenamed();
+
+ assertEquals(expected, propertiesRenamed);
+ }
+
private void testParseError(String asnFile, List<String>
expectedErrorMessages) {
ConfigurationContext context = mock(ConfigurationContext.class,
RETURNS_DEEP_STUBS);
when(context.getProperty(ASN_FILES).isSet()).thenReturn(true);
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ObsoleteAbstractAwsProcessorProperties.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ObsoleteAbstractAwsProcessorProperties.java
new file mode 100644
index 0000000000..e9d8e7155e
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/ObsoleteAbstractAwsProcessorProperties.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws;
+
+public enum ObsoleteAbstractAwsProcessorProperties {
+ OBSOLETE_ACCESS_KEY("Access Key"),
+ OBSOLETE_SECRET_KEY("Secret Key"),
+ OBSOLETE_CREDENTIALS_FILE("Credentials File"),
+ OBSOLETE_PROXY_HOST("Proxy Host"),
+ OBSOLETE_PROXY_PORT("Proxy Host Port"),
+ OBSOLETE_PROXY_USERNAME("proxy-user-name"),
+ OBSOLETE_PROXY_PASSWORD("proxy-user-password");
+
+ ObsoleteAbstractAwsProcessorProperties(String value) {
+ this.value = value;
+ }
+
+ private final String value;
+
+ public String getValue() {
+ return value;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
index 3fecb107e2..4232392ab1 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
@@ -17,8 +17,12 @@
package org.apache.nifi.processors.aws.cloudwatch;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processors.aws.AbstractAwsProcessor;
+import org.apache.nifi.processors.aws.ObsoleteAbstractAwsProcessorProperties;
import org.apache.nifi.processors.aws.region.RegionUtil;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@@ -35,8 +39,10 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Stream;
+import static org.apache.nifi.processors.aws.region.RegionUtil.REGION;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestPutCloudWatchMetric {
@@ -67,7 +73,7 @@ public class TestPutCloudWatchMetric {
runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS,
1);
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
- MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
+ MetricDatum datum =
mockPutCloudWatchMetric.actualMetricData.getFirst();
assertEquals("TestMetric", datum.metricName());
assertEquals(1d, datum.value(), 0.0001d);
}
@@ -128,7 +134,7 @@ public class TestPutCloudWatchMetric {
runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS,
1);
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
- MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
+ MetricDatum datum =
mockPutCloudWatchMetric.actualMetricData.getFirst();
assertEquals("TestMetric", datum.metricName());
assertEquals(1.23d, datum.value(), 0.0001d);
}
@@ -152,7 +158,7 @@ public class TestPutCloudWatchMetric {
runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS,
1);
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
- MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
+ MetricDatum datum =
mockPutCloudWatchMetric.actualMetricData.getFirst();
assertEquals("TestMetric", datum.metricName());
assertEquals(1.0d, datum.statisticValues().minimum(), 0.0001d);
assertEquals(2.0d, datum.statisticValues().maximum(), 0.0001d);
@@ -177,7 +183,7 @@ public class TestPutCloudWatchMetric {
runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS,
1);
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
- MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
+ MetricDatum datum =
mockPutCloudWatchMetric.actualMetricData.getFirst();
assertEquals("TestMetric", datum.metricName());
assertEquals(1d, datum.value(), 0.0001d);
@@ -284,4 +290,33 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS,
1);
}
+
+ @Test
+ void testMigration() {
+ final Map<String, String> expectedRenamed = Map.ofEntries(
+ Map.entry("aws-region", REGION.getName()),
+
Map.entry(AbstractAwsProcessor.OBSOLETE_AWS_CREDENTIALS_PROVIDER_SERVICE_PROPERTY_NAME,
AbstractAwsProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE.getName()),
+
Map.entry(ProxyConfigurationService.OBSOLETE_PROXY_CONFIGURATION_SERVICE,
AbstractAwsProcessor.PROXY_CONFIGURATION_SERVICE.getName()),
+ Map.entry("MetricName",
PutCloudWatchMetric.METRIC_NAME.getName()),
+ Map.entry("maximum", PutCloudWatchMetric.MAXIMUM.getName()),
+ Map.entry("minimum", PutCloudWatchMetric.MINIMUM.getName()),
+ Map.entry("sampleCount",
PutCloudWatchMetric.SAMPLECOUNT.getName()),
+ Map.entry("sum", PutCloudWatchMetric.SUM.getName())
+ );
+
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+ assertEquals(expectedRenamed,
propertyMigrationResult.getPropertiesRenamed());
+
+ final Set<String> expectedRemoved = Set.of(
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_ACCESS_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_SECRET_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_CREDENTIALS_FILE.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_HOST.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PORT.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_USERNAME.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PASSWORD.getValue());
+
+ assertEquals(expectedRemoved,
+ propertyMigrationResult.getPropertiesRemoved());
+ }
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java
index 8aa9baced0..6717e399d3 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java
@@ -17,11 +17,14 @@
package org.apache.nifi.processors.aws.credentials.provider.service;
import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.oauth2.AccessToken;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import
org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService;
import
org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.MockPropertyConfiguration;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
@@ -30,9 +33,23 @@ import
software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.ACCESS_KEY_ID;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.ASSUME_ROLE_EXTERNAL_ID;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.ASSUME_ROLE_PROXY_CONFIGURATION_SERVICE;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.ASSUME_ROLE_SSL_CONTEXT_SERVICE;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.ASSUME_ROLE_STS_ENDPOINT;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.ASSUME_ROLE_STS_REGION;
import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.CREDENTIALS_FILE;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.MAX_SESSION_TIME;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.PROFILE_NAME;
import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.SECRET_KEY;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.USE_ANONYMOUS_CREDENTIALS;
+import static
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService.USE_DEFAULT_CREDENTIALS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -392,6 +409,42 @@ public class AWSCredentialsProviderControllerServiceTest {
"Derived AssumeRole should not be chained when OAuth2 (Web
Identity) is configured");
}
+ @Test
+ void testMigration() {
+ final Map<String, String> propertyValues = Map.of(
+
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME.getName(), "nifi-test"
+ );
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(propertyValues);
+ final AWSCredentialsProviderControllerService
awsCredentialsProviderControllerService = new
AWSCredentialsProviderControllerService();
+ final Map<String, String> expectedRenamed = Map.ofEntries(
+ Map.entry("default-credentials",
USE_DEFAULT_CREDENTIALS.getName()),
+ Map.entry("profile-name", PROFILE_NAME.getName()),
+ Map.entry("Access Key", ACCESS_KEY_ID.getName()),
+ Map.entry("Secret Key", SECRET_KEY.getName()),
+ Map.entry("anonymous-credentials",
USE_ANONYMOUS_CREDENTIALS.getName()),
+ Map.entry("assume-role-sts-region",
ASSUME_ROLE_STS_REGION.getName()),
+ Map.entry("assume-role-external-id",
ASSUME_ROLE_EXTERNAL_ID.getName()),
+ Map.entry("assume-role-ssl-context-service",
ASSUME_ROLE_SSL_CONTEXT_SERVICE.getName()),
+ Map.entry("assume-role-proxy-configuration-service",
ASSUME_ROLE_PROXY_CONFIGURATION_SERVICE.getName()),
+ Map.entry("assume-role-sts-endpoint",
ASSUME_ROLE_STS_ENDPOINT.getName()),
+ Map.entry("Session Time", MAX_SESSION_TIME.getName()),
+
Map.entry(ProxyServiceMigration.OBSOLETE_PROXY_CONFIGURATION_SERVICE,
ProxyServiceMigration.PROXY_CONFIGURATION_SERVICE)
+ );
+
+
awsCredentialsProviderControllerService.migrateProperties(configuration);
+ final PropertyMigrationResult result =
configuration.toPropertyMigrationResult();
+ final Map<String, String> propertiesRenamed =
result.getPropertiesRenamed();
+
+ assertEquals(expectedRenamed, propertiesRenamed);
+
+ final Set<String> expectedRemoved = new HashSet<>(Arrays.asList(null,
"assume-role-proxy-host", "assume-role-proxy-port",
"assume-role-sts-signer-override",
+ "Assume Role STS Signer Override", "custom-signer-class-name",
"Custom Signer Class Name",
+ "custom-signer-module-location", "Custom Signer Module
Location"));
+
+ assertEquals(expectedRemoved, result.getPropertiesRemoved());
+ }
+
private static final class MockOAuth2AccessTokenProvider extends
AbstractControllerService implements OAuth2AccessTokenProvider {
@Override
public AccessToken getAccessDetails() {
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecordTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecordTest.java
index b931c6f164..fbe0675ae6 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecordTest.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecordTest.java
@@ -18,9 +18,13 @@ package org.apache.nifi.processors.aws.dynamodb;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.AbstractAwsProcessor;
+import org.apache.nifi.processors.aws.ObsoleteAbstractAwsProcessorProperties;
import
org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService;
+import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
@@ -49,8 +53,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.nifi.processors.aws.region.RegionUtil.REGION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -276,6 +283,41 @@ public class PutDynamoDBRecordTest {
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
}
+ @Test
+ void testMigration() throws InitializationException {
+ final Map<String, String> expected = Map.ofEntries(
+ Map.entry("aws-region", REGION.getName()),
+
Map.entry(AbstractAwsProcessor.OBSOLETE_AWS_CREDENTIALS_PROVIDER_SERVICE_PROPERTY_NAME,
AbstractAwsProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE.getName()),
+
Map.entry(ProxyConfigurationService.OBSOLETE_PROXY_CONFIGURATION_SERVICE,
AbstractAwsProcessor.PROXY_CONFIGURATION_SERVICE.getName()),
+ Map.entry("Batch items for each request (between 1 and 50)",
AbstractDynamoDBProcessor.BATCH_SIZE.getName()),
+ Map.entry("Json Document attribute",
AbstractDynamoDBProcessor.JSON_DOCUMENT.getName()),
+ Map.entry("Character set of document",
AbstractDynamoDBProcessor.DOCUMENT_CHARSET.getName()),
+ Map.entry("record-reader",
PutDynamoDBRecord.RECORD_READER.getName()),
+ Map.entry("partition-key-strategy",
PutDynamoDBRecord.PARTITION_KEY_STRATEGY.getName()),
+ Map.entry("partition-key-field",
PutDynamoDBRecord.PARTITION_KEY_FIELD.getName()),
+ Map.entry("partition-key-attribute",
PutDynamoDBRecord.PARTITION_KEY_ATTRIBUTE.getName()),
+ Map.entry("sort-key-strategy",
PutDynamoDBRecord.SORT_KEY_STRATEGY.getName()),
+ Map.entry("sort-key-field",
PutDynamoDBRecord.SORT_KEY_FIELD.getName())
+ );
+
+ final TestRunner runner = getTestRunner();
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+
+ assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
+
+ Set<String> expectedRemoved = Set.of(
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_ACCESS_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_SECRET_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_CREDENTIALS_FILE.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_HOST.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PORT.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_USERNAME.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PASSWORD.getValue()
+ );
+
+ assertEquals(expectedRemoved,
propertyMigrationResult.getPropertiesRemoved());
+ }
+
private TestRunner getTestRunner() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(testSubject);
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestConsumeKinesisStream.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestConsumeKinesisStream.java
index f37ba66e06..7071e053dd 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestConsumeKinesisStream.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestConsumeKinesisStream.java
@@ -22,24 +22,33 @@ import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAwsProcessor;
+import org.apache.nifi.processors.aws.ObsoleteAbstractAwsProcessorProperties;
import
org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService;
import
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.region.RegionUtil;
+import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.Scheduler;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Set;
+import static org.apache.nifi.processors.aws.region.RegionUtil.REGION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -73,10 +82,12 @@ public class TestConsumeKinesisStream {
runner.assertNotValid();
final AssertionError assertionError =
assertThrows(AssertionError.class, runner::run);
- assertEquals(assertionError.getMessage(), String.format("Processor has
3 validation failures:\n" +
- "'%s' is invalid because %s is required\n" +
- "'%s' is invalid because %s is required\n" +
- "'%s' is invalid because %s is required\n",
+ assertEquals(assertionError.getMessage(), String.format("""
+ Processor has 3 validation failures:
+ '%s' is invalid because %s is required
+ '%s' is invalid because %s is required
+ '%s' is invalid because %s is required
+ """,
ConsumeKinesisStream.KINESIS_STREAM_NAME.getDisplayName(),
ConsumeKinesisStream.KINESIS_STREAM_NAME.getDisplayName(),
ConsumeKinesisStream.APPLICATION_NAME.getDisplayName(),
ConsumeKinesisStream.APPLICATION_NAME.getDisplayName(),
ConsumeKinesisStream.AWS_CREDENTIALS_PROVIDER_SERVICE.getDisplayName(),
ConsumeKinesisStream.AWS_CREDENTIALS_PROVIDER_SERVICE.getDisplayName()
@@ -100,25 +111,27 @@ public class TestConsumeKinesisStream {
runner.assertNotValid();
final AssertionError assertionError =
assertThrows(AssertionError.class, runner::run);
- assertEquals(assertionError.getMessage(), String.format("Processor has
14 validation failures:\n" +
- "'%s' validated against ' ' is invalid because %s must
contain at least one character that is not white space\n" +
- "'%s' validated against 'not-a-reader' is invalid
because Property references a Controller Service that does not exist\n" +
- "'%s' validated against 'not-a-writer' is invalid
because Property references a Controller Service that does not exist\n" +
- "'%s' validated against 'not-a-url' is invalid because
Not a valid URL\n" +
- "'%s' validated against 'not-an-enum-match' is invalid
because Given value not found in allowed set '%s, %s, %s'\n" +
- "'%s' validated against 'not-valid-format' is invalid
because Must be a valid java.time.DateTimeFormatter pattern, e.g. %s\n" +
- "'%s' validated against 'not-a-period' is invalid
because Must be of format <duration> <TimeUnit> where <duration> is a
non-negative integer and " +
- "TimeUnit is a supported Time Unit, such as: nanos,
millis, secs, mins, hrs, days\n" +
- "'%s' validated against 'not-a-period' is invalid
because Must be of format <duration> <TimeUnit> where <duration> is a
non-negative integer and " +
- "TimeUnit is a supported Time Unit, such as: nanos,
millis, secs, mins, hrs, days\n" +
- "'%s' validated against 'not-a-long' is invalid
because Must be of format <duration> <TimeUnit> where <duration> is a
non-negative integer and " +
- "TimeUnit is a supported Time Unit, such as: nanos,
millis, secs, mins, hrs, days\n" +
- "'%s' validated against 'not-an-int' is invalid
because not a valid integer\n" +
- "'%s' validated against 'not-a-long' is invalid
because Must be of format <duration> <TimeUnit> where <duration> is a
non-negative integer and " +
- "TimeUnit is a supported Time Unit, such as: nanos,
millis, secs, mins, hrs, days\n" +
- "'%s' validated against 'not-a-boolean' is invalid
because Given value not found in allowed set 'true, false'\n" +
- "'%s' validated against 'not-a-reader' is invalid
because Invalid Controller Service: not-a-reader is not a valid Controller
Service Identifier\n" +
- "'%s' validated against 'not-a-writer' is invalid
because Invalid Controller Service: not-a-writer is not a valid Controller
Service Identifier\n",
+ assertEquals(assertionError.getMessage(), String.format("""
+ Processor has 14 validation failures:
+ '%s' validated against ' ' is invalid because %s must
contain at least one character that is not white space
+ '%s' validated against 'not-a-reader' is invalid
because Property references a Controller Service that does not exist
+ '%s' validated against 'not-a-writer' is invalid
because Property references a Controller Service that does not exist
+ '%s' validated against 'not-a-url' is invalid because
Not a valid URL
+ '%s' validated against 'not-an-enum-match' is invalid
because Given value not found in allowed set '%s, %s, %s'
+ '%s' validated against 'not-valid-format' is invalid
because Must be a valid java.time.DateTimeFormatter pattern, e.g. %s
+ '%s' validated against 'not-a-period' is invalid
because Must be of format <duration> <TimeUnit> where <duration> is a
non-negative integer and \
+ TimeUnit is a supported Time Unit, such as: nanos,
millis, secs, mins, hrs, days
+ '%s' validated against 'not-a-period' is invalid
because Must be of format <duration> <TimeUnit> where <duration> is a
non-negative integer and \
+ TimeUnit is a supported Time Unit, such as: nanos,
millis, secs, mins, hrs, days
+ '%s' validated against 'not-a-long' is invalid because
Must be of format <duration> <TimeUnit> where <duration> is a non-negative
integer and \
+ TimeUnit is a supported Time Unit, such as: nanos,
millis, secs, mins, hrs, days
+ '%s' validated against 'not-an-int' is invalid because
not a valid integer
+ '%s' validated against 'not-a-long' is invalid because
Must be of format <duration> <TimeUnit> where <duration> is a non-negative
integer and \
+ TimeUnit is a supported Time Unit, such as: nanos,
millis, secs, mins, hrs, days
+ '%s' validated against 'not-a-boolean' is invalid
because Given value not found in allowed set 'true, false'
+ '%s' validated against 'not-a-reader' is invalid
because Invalid Controller Service: not-a-reader is not a valid Controller
Service Identifier
+ '%s' validated against 'not-a-writer' is invalid
because Invalid Controller Service: not-a-writer is not a valid Controller
Service Identifier
+ """,
ConsumeKinesisStream.APPLICATION_NAME.getName(),
ConsumeKinesisStream.APPLICATION_NAME.getName(),
ConsumeKinesisStream.RECORD_READER.getDisplayName(),
ConsumeKinesisStream.RECORD_WRITER.getDisplayName(),
@@ -144,8 +157,10 @@ public class TestConsumeKinesisStream {
runner.assertNotValid();
final AssertionError assertionError =
assertThrows(AssertionError.class, runner::run);
- assertEquals(assertionError.getMessage(), String.format("Processor has
1 validation failures:\n" +
- "'%s' is invalid because %s must be provided when %s
is %s\n",
+ assertEquals(assertionError.getMessage(), String.format("""
+ Processor has 1 validation failures:
+ '%s' is invalid because %s must be provided when %s is
%s
+ """,
ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP.getName(),
ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP.getDisplayName(),
ConsumeKinesisStream.INITIAL_STREAM_POSITION.getDisplayName(),
InitialPositionInStream.AT_TIMESTAMP
));
@@ -159,8 +174,10 @@ public class TestConsumeKinesisStream {
runner.assertNotValid();
final AssertionError assertionError =
assertThrows(AssertionError.class, runner::run);
- assertEquals(assertionError.getMessage(), String.format("Processor has
1 validation failures:\n" +
- "'%s' is invalid because %s must be parsable by %s\n",
+ assertEquals(assertionError.getMessage(), String.format("""
+ Processor has 1 validation failures:
+ '%s' is invalid because %s must be parsable by %s
+ """,
ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP.getName(),
ConsumeKinesisStream.STREAM_POSITION_TIMESTAMP.getDisplayName(),
ConsumeKinesisStream.TIMESTAMP_FORMAT.getDisplayName()
@@ -177,8 +194,10 @@ public class TestConsumeKinesisStream {
runner.assertNotValid();
final AssertionError assertionError =
assertThrows(AssertionError.class, runner::assertValid);
- assertEquals(assertionError.getMessage(), String.format("Processor has
1 validation failures:\n" +
- "'%s' is invalid because %s must be set if %s is set
in order to write FlowFiles as Records.\n",
+ assertEquals(assertionError.getMessage(), String.format("""
+ Processor has 1 validation failures:
+ '%s' is invalid because %s must be set if %s is set in
order to write FlowFiles as Records.
+ """,
ConsumeKinesisStream.RECORD_WRITER.getName(),
ConsumeKinesisStream.RECORD_WRITER.getDisplayName(),
ConsumeKinesisStream.RECORD_READER.getDisplayName()
@@ -195,8 +214,10 @@ public class TestConsumeKinesisStream {
runner.assertNotValid();
final AssertionError assertionError =
assertThrows(AssertionError.class, runner::assertValid);
- assertEquals(assertionError.getMessage(), String.format("Processor has
1 validation failures:\n" +
- "'%s' is invalid because %s must be set if %s is set
in order to write FlowFiles as Records.\n",
+ assertEquals(assertionError.getMessage(), String.format("""
+ Processor has 1 validation failures:
+ '%s' is invalid because %s must be set if %s is set in
order to write FlowFiles as Records.
+ """,
ConsumeKinesisStream.RECORD_READER.getName(),
ConsumeKinesisStream.RECORD_READER.getDisplayName(),
ConsumeKinesisStream.RECORD_WRITER.getDisplayName()
@@ -257,6 +278,45 @@ public class TestConsumeKinesisStream {
runner.assertValid();
}
+ @Test
+ void testMigration() {
+ final Map<String, String> expected = Map.ofEntries(
+ Map.entry("aws-region", REGION.getName()),
+
Map.entry(AbstractAwsProcessor.OBSOLETE_AWS_CREDENTIALS_PROVIDER_SERVICE_PROPERTY_NAME,
AbstractAwsProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE.getName()),
+
Map.entry(ProxyConfigurationService.OBSOLETE_PROXY_CONFIGURATION_SERVICE,
AbstractAwsProcessor.PROXY_CONFIGURATION_SERVICE.getName()),
+ Map.entry("kinesis-stream-name", "Amazon Kinesis Stream Name"),
+ Map.entry("amazon-kinesis-stream-application-name",
"Application Name"),
+ Map.entry("amazon-kinesis-stream-initial-position", "Initial
Stream Position"),
+ Map.entry("amazon-kinesis-stream-position-timestamp", "Stream
Position Timestamp"),
+ Map.entry("amazon-kinesis-stream-timestamp-format", "Timestamp
Format"),
+ Map.entry("amazon-kinesis-stream-failover-timeout", "Failover
Timeout"),
+ Map.entry("amazon-kinesis-stream-graceful-shutdown-timeout",
"Graceful Shutdown Timeout"),
+ Map.entry("amazon-kinesis-stream-checkpoint-interval",
"Checkpoint Interval"),
+ Map.entry("amazon-kinesis-stream-retry-count", "Retry Count"),
+ Map.entry("amazon-kinesis-stream-retry-wait", "Retry Wait"),
+ Map.entry("amazon-kinesis-stream-dynamodb-override", "DynamoDB
Override"),
+ Map.entry("amazon-kinesis-stream-cloudwatch-flag", "Report
Metrics to CloudWatch"),
+ Map.entry("amazon-kinesis-stream-record-reader", "Record
Reader"),
+ Map.entry("amazon-kinesis-stream-record-writer", "Record
Writer")
+ );
+
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+ assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
+
+ final Set<String> expectedRemoved = Set.of(
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_ACCESS_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_SECRET_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_CREDENTIALS_FILE.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_HOST.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PORT.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_USERNAME.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PASSWORD.getValue()
+ );
+
+ assertEquals(expectedRemoved,
+ propertyMigrationResult.getPropertiesRemoved());
+ }
+
/*
* Trigger a run of the ConsumeKinesisStream processor, but expect the KCL
Worker to fail (it needs connections to AWS resources)
* Assert that our code is being called by checking log output. The
ITConsumeKinesisStream integration tests prove actual AWS connectivity
@@ -318,16 +378,19 @@ public class TestConsumeKinesisStream {
}
private void assertConfigsBuilder(final ConfigsBuilder configsBuilder) {
-
assertEquals(configsBuilder.kinesisClient().serviceClientConfiguration().region().id(),
Region.EU_WEST_2.id());
-
assertTrue(configsBuilder.dynamoDBClient().serviceClientConfiguration().endpointOverride().isEmpty());
-
assertTrue(configsBuilder.kinesisClient().serviceClientConfiguration().endpointOverride().isEmpty());
+ try (KinesisAsyncClient kinesisAsyncClient =
configsBuilder.kinesisClient();
+ DynamoDbAsyncClient dynamoDbAsyncClient =
configsBuilder.dynamoDBClient()) {
+
assertEquals(kinesisAsyncClient.serviceClientConfiguration().region().id(),
Region.EU_WEST_2.id());
+
assertTrue(dynamoDbAsyncClient.serviceClientConfiguration().endpointOverride().isEmpty());
+
assertTrue(kinesisAsyncClient.serviceClientConfiguration().endpointOverride().isEmpty());
+ }
}
private void assertSchedulerConfigs(final Scheduler scheduler, final
String hostname) {
assertTrue(scheduler.leaseManagementConfig().workerIdentifier().startsWith(hostname));
assertEquals(scheduler.coordinatorConfig().applicationName(),
"test-application");
assertEquals(scheduler.leaseManagementConfig().streamName(),
"test-stream");
-
assertEquals(scheduler.retrievalConfig().streamTracker().streamConfigList().get(0).initialPositionInStreamExtended().getInitialPositionInStream(),
+
assertEquals(scheduler.retrievalConfig().streamTracker().streamConfigList().getFirst().initialPositionInStreamExtended().getInitialPositionInStream(),
InitialPositionInStream.TRIM_HORIZON );
assertEquals(scheduler.coordinatorConfig().parentShardPollIntervalMillis(), 1);
}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java
index 422e3cfc8b..2ca93f3f0f 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java
@@ -16,22 +16,30 @@
*/
package org.apache.nifi.processors.aws.kinesis.stream;
+import org.apache.nifi.processors.aws.AbstractAwsProcessor;
+import org.apache.nifi.processors.aws.ObsoleteAbstractAwsProcessorProperties;
import org.apache.nifi.processors.aws.kinesis.KinesisProcessorUtils;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import static org.apache.nifi.processors.aws.region.RegionUtil.REGION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class TestPutKinesisStream {
private TestRunner runner;
- protected final static String CREDENTIALS_FILE =
System.getProperty("user.home") + "/aws-credentials.properties";
@BeforeEach
public void setUp() throws Exception {
@@ -42,7 +50,7 @@ public class TestPutKinesisStream {
}
@AfterEach
- public void tearDown() throws Exception {
+ public void tearDown() {
runner = null;
}
@@ -68,15 +76,41 @@ public class TestPutKinesisStream {
runner.setProperty(PutKinesisStream.BATCH_SIZE, "1");
runner.assertValid();
byte[] bytes = new byte[(KinesisProcessorUtils.MAX_MESSAGE_SIZE + 1)];
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] = 'a';
- }
+ Arrays.fill(bytes, (byte) 'a');
runner.enqueue(bytes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
-
assertNotNull(flowFiles.get(0).getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
+
assertNotNull(flowFiles.getFirst().getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
+ }
+
+ @Test
+ void testMigration() {
+ final Map<String, String> expected = Map.ofEntries(
+ Map.entry("aws-region", REGION.getName()),
+
Map.entry(AbstractAwsProcessor.OBSOLETE_AWS_CREDENTIALS_PROVIDER_SERVICE_PROPERTY_NAME,
AbstractAwsProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE.getName()),
+
Map.entry(ProxyConfigurationService.OBSOLETE_PROXY_CONFIGURATION_SERVICE,
AbstractAwsProcessor.PROXY_CONFIGURATION_SERVICE.getName()),
+ Map.entry("amazon-kinesis-stream-partition-key",
PutKinesisStream.KINESIS_PARTITION_KEY.getName()),
+ Map.entry("message-batch-size",
PutKinesisStream.BATCH_SIZE.getName()),
+ Map.entry("max-message-buffer-size",
PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB.getName()),
+ Map.entry("kinesis-stream-name",
PutKinesisStream.KINESIS_STREAM_NAME.getName())
+ );
+
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+ assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
+
+ final Set<String> expectedRemoved = Set.of(
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_ACCESS_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_SECRET_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_CREDENTIALS_FILE.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_HOST.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PORT.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_USERNAME.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PASSWORD.getValue());
+
+ assertEquals(expectedRemoved,
+ propertyMigrationResult.getPropertiesRemoved());
}
}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 0fcc69df42..17062d3a6e 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -20,8 +20,12 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.migration.ProxyServiceMigration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
+import org.apache.nifi.processors.aws.AbstractAwsProcessor;
+import org.apache.nifi.processors.aws.ObsoleteAbstractAwsProcessorProperties;
import org.apache.nifi.processors.aws.region.RegionUtil;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.reporting.InitializationException;
@@ -57,7 +61,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static org.apache.nifi.processors.aws.region.RegionUtil.REGION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -860,9 +866,36 @@ public class TestListS3 {
Map.entry("write-s3-object-tags",
ListS3.WRITE_OBJECT_TAGS.getName()),
Map.entry("requester-pays",
ListS3.REQUESTER_PAYS.getName()),
Map.entry("write-s3-user-metadata",
ListS3.WRITE_USER_METADATA.getName()),
- Map.entry("record-writer",
ListS3.RECORD_WRITER.getName()));
-
-
- expectedRenamed.forEach((key, value) -> assertEquals(value,
propertyMigrationResult.getPropertiesRenamed().get(key)));
+ Map.entry("record-writer",
ListS3.RECORD_WRITER.getName()),
+ Map.entry("canned-acl",
AbstractS3Processor.CANNED_ACL.getName()),
+ Map.entry("encryption-service",
AbstractS3Processor.ENCRYPTION_SERVICE.getName()),
+ Map.entry("use-chunked-encoding",
AbstractS3Processor.USE_CHUNKED_ENCODING.getName()),
+ Map.entry("use-path-style-access",
AbstractS3Processor.USE_PATH_STYLE_ACCESS.getName()),
+ Map.entry("aws-region", REGION.getName()),
+
Map.entry(AbstractAwsProcessor.OBSOLETE_AWS_CREDENTIALS_PROVIDER_SERVICE_PROPERTY_NAME,
AbstractAwsProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE.getName()),
+
Map.entry(ListedEntityTracker.OLD_TRACKING_STATE_CACHE_PROPERTY_NAME,
ListS3.TRACKING_STATE_CACHE.getName()),
+
Map.entry(ListedEntityTracker.OLD_TRACKING_TIME_WINDOW_PROPERTY_NAME,
ListS3.TRACKING_TIME_WINDOW.getName()),
+
Map.entry(ListedEntityTracker.OLD_INITIAL_LISTING_TARGET_PROPERTY_NAME,
ListS3.INITIAL_LISTING_TARGET.getName()),
+
Map.entry(ProxyServiceMigration.OBSOLETE_PROXY_CONFIGURATION_SERVICE,
ProxyServiceMigration.PROXY_CONFIGURATION_SERVICE)
+ );
+
+ assertEquals(expectedRenamed,
propertyMigrationResult.getPropertiesRenamed());
+
+ Set<String> expectedRemoved = Set.of(
+ "Signer Override",
+ "custom-signer-class-name",
+ "Custom Signer Class Name",
+ "custom-signer-module-location",
+ "Custom Signer Module Location",
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_ACCESS_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_SECRET_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_CREDENTIALS_FILE.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_HOST.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PORT.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_USERNAME.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PASSWORD.getValue()
+ );
+
+ assertEquals(expectedRemoved,
propertyMigrationResult.getPropertiesRemoved());
}
}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
index ea81c805fd..f15945242f 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
@@ -20,7 +20,9 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockPropertyConfiguration;
import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -30,6 +32,7 @@ import
software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import java.util.List;
+import java.util.Map;
import static
org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createCustomerKeySpec;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,7 +41,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
public class TestStandardS3EncryptionService {
private StandardS3EncryptionService service;
- private ConfigurationContext context;
private String strategyName;
private String keyMaterial;
private String keyMaterialMd5;
@@ -47,7 +49,7 @@ public class TestStandardS3EncryptionService {
@BeforeEach
public void setup() throws InitializationException {
service = new StandardS3EncryptionService();
- context = Mockito.mock(ConfigurationContext.class);
+ ConfigurationContext context =
Mockito.mock(ConfigurationContext.class);
S3EncryptionKeySpec keySpec = createCustomerKeySpec(256);
@@ -112,4 +114,25 @@ public class TestStandardS3EncryptionService {
assertEquals(properties.get(2).getName(),
StandardS3EncryptionService.KEY_MATERIAL.getName());
assertEquals(properties.get(3).getName(),
StandardS3EncryptionService.COMMITMENT_POLICY.getName());
}
+
+ @Test
+ void testMigration() {
+ final Map<String, String> propertyValues = Map.of(
+ StandardS3EncryptionService.ENCRYPTION_STRATEGY.getName(),
strategyName,
+ StandardS3EncryptionService.KEY_MATERIAL.getName(), keyMaterial
+ );
+
+ final MockPropertyConfiguration configuration = new
MockPropertyConfiguration(propertyValues);
+ final StandardS3EncryptionService standardS3EncryptionService = new
StandardS3EncryptionService();
+ standardS3EncryptionService.migrateProperties(configuration);
+
+ Map<String, String> expected = Map.of(
+ "encryption-strategy",
StandardS3EncryptionService.ENCRYPTION_STRATEGY.getName()
+ );
+
+ final PropertyMigrationResult result =
configuration.toPropertyMigrationResult();
+ final Map<String, String> propertiesRenamed =
result.getPropertiesRenamed();
+
+ assertEquals(expected, propertiesRenamed);
+ }
}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
index 233605d8a8..1f9dff3c9f 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
@@ -17,7 +17,11 @@
package org.apache.nifi.processors.aws.sqs;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.AbstractAwsProcessor;
+import org.apache.nifi.processors.aws.ObsoleteAbstractAwsProcessorProperties;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.util.PropertyMigrationResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@@ -30,7 +34,9 @@ import
software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import static org.apache.nifi.processors.aws.region.RegionUtil.REGION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -71,8 +77,8 @@ public class TestPutSQS {
Mockito.verify(mockSQSClient,
Mockito.times(1)).sendMessageBatch(captureRequest.capture());
SendMessageBatchRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000",
request.queueUrl());
- assertEquals("hello",
request.entries().get(0).messageAttributes().get("x-custom-prop").stringValue());
- assertEquals("TestMessageBody",
request.entries().get(0).messageBody());
+ assertEquals("hello",
request.entries().getFirst().messageAttributes().get("x-custom-prop").stringValue());
+ assertEquals("TestMessageBody",
request.entries().getFirst().messageBody());
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
}
@@ -93,7 +99,7 @@ public class TestPutSQS {
Mockito.verify(mockSQSClient,
Mockito.times(1)).sendMessageBatch(captureRequest.capture());
SendMessageBatchRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000",
request.queueUrl());
- assertEquals("TestMessageBody",
request.entries().get(0).messageBody());
+ assertEquals("TestMessageBody",
request.entries().getFirst().messageBody());
runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1);
}
@@ -119,11 +125,37 @@ public class TestPutSQS {
Mockito.verify(mockSQSClient,
Mockito.times(1)).sendMessageBatch(captureRequest.capture());
SendMessageBatchRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000",
request.queueUrl());
- assertEquals("hello",
request.entries().get(0).messageAttributes().get("x-custom-prop").stringValue());
- assertEquals("TestMessageBody",
request.entries().get(0).messageBody());
- assertEquals("test1234", request.entries().get(0).messageGroupId());
- assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236",
request.entries().get(0).messageDeduplicationId());
+ assertEquals("hello",
request.entries().getFirst().messageAttributes().get("x-custom-prop").stringValue());
+ assertEquals("TestMessageBody",
request.entries().getFirst().messageBody());
+ assertEquals("test1234",
request.entries().getFirst().messageGroupId());
+ assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236",
request.entries().getFirst().messageDeduplicationId());
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
}
+
+ @Test
+ void testMigration() {
+ final Map<String, String> expected = Map.ofEntries(
+ Map.entry("aws-region", REGION.getName()),
+
Map.entry(AbstractAwsProcessor.OBSOLETE_AWS_CREDENTIALS_PROVIDER_SERVICE_PROPERTY_NAME,
AbstractAwsProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE.getName()),
+
Map.entry(ProxyConfigurationService.OBSOLETE_PROXY_CONFIGURATION_SERVICE,
ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE.getName()),
+ Map.entry("message-group-id", PutSQS.MESSAGEGROUPID.getName()),
+ Map.entry("deduplication-message-id",
PutSQS.MESSAGEDEDUPLICATIONID.getName())
+ );
+
+ final PropertyMigrationResult propertyMigrationResult =
runner.migrateProperties();
+ assertEquals(expected, propertyMigrationResult.getPropertiesRenamed());
+
+ final Set<String> expectedRemoved = Set.of(
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_ACCESS_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_SECRET_KEY.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_CREDENTIALS_FILE.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_HOST.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PORT.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_USERNAME.getValue(),
+
ObsoleteAbstractAwsProcessorProperties.OBSOLETE_PROXY_PASSWORD.getValue());
+
+ assertEquals(expectedRemoved,
+ propertyMigrationResult.getPropertiesRemoved());
+ }
}