This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 71d244de37 NIFI-14284 ConsumeKinesisStream migrate property names to
new convention (#9741)
71d244de37 is described below
commit 71d244de37bda169deaec9d1cdb3201e45074095
Author: Dariusz Seweryn <[email protected]>
AuthorDate: Tue Feb 25 18:44:31 2025 +0100
NIFI-14284 ConsumeKinesisStream migrate property names to new convention
(#9741)
Signed-off-by: David Handermann <[email protected]>
---
.../aws/kinesis/stream/ConsumeKinesisStream.java | 61 ++++++++++++----------
1 file changed, 33 insertions(+), 28 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
index 40439d53a3..d069314c89 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java
@@ -41,6 +41,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
@@ -169,31 +170,27 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
);
static final PropertyDescriptor KINESIS_STREAM_NAME = new
PropertyDescriptor.Builder()
- .name("kinesis-stream-name")
- .displayName("Amazon Kinesis Stream Name")
+ .name("Amazon Kinesis Stream Name")
.description("The name of Kinesis Stream")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor APPLICATION_NAME = new
PropertyDescriptor.Builder()
- .displayName("Application Name")
- .name("amazon-kinesis-stream-application-name")
+ .name("Application Name")
.description("The Kinesis stream reader application name.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true).build();
public static final PropertyDescriptor INITIAL_STREAM_POSITION = new
PropertyDescriptor.Builder()
- .displayName("Initial Stream Position")
- .name("amazon-kinesis-stream-initial-position")
+ .name("Initial Stream Position")
.description("Initial position to read Kinesis streams.")
.allowableValues(LATEST, TRIM_HORIZON, AT_TIMESTAMP)
.defaultValue(LATEST.getValue())
.required(true).build();
public static final PropertyDescriptor STREAM_POSITION_TIMESTAMP = new
PropertyDescriptor.Builder()
- .displayName("Stream Position Timestamp")
- .name("amazon-kinesis-stream-position-timestamp")
+ .name("Stream Position Timestamp")
.description("Timestamp position in stream from which to start
reading Kinesis Records. " +
"Required if " + INITIAL_STREAM_POSITION.getDescription()
+ " is " + AT_TIMESTAMP.getDisplayName() + ". " +
"Uses the Timestamp Format to parse value into a Date.")
@@ -202,8 +199,7 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
.required(false).build();
public static final PropertyDescriptor TIMESTAMP_FORMAT = new
PropertyDescriptor.Builder()
- .displayName("Timestamp Format")
- .name("amazon-kinesis-stream-timestamp-format")
+ .name("Timestamp Format")
.description("Format to use for parsing the " +
STREAM_POSITION_TIMESTAMP.getDisplayName() + " into a Date " +
"and converting the Kinesis Record's Approximate Arrival
Timestamp into a FlowFile attribute.")
.addValidator((subject, input, context) -> {
@@ -223,56 +219,49 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
.required(true).build();
public static final PropertyDescriptor FAILOVER_TIMEOUT = new
PropertyDescriptor.Builder()
- .displayName("Failover Timeout")
- .name("amazon-kinesis-stream-failover-timeout")
+ .name("Failover Timeout")
.description("Kinesis Client Library failover timeout")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
.required(true).build();
public static final PropertyDescriptor GRACEFUL_SHUTDOWN_TIMEOUT = new
PropertyDescriptor.Builder()
- .displayName("Graceful Shutdown Timeout")
- .name("amazon-kinesis-stream-graceful-shutdown-timeout")
+ .name("Graceful Shutdown Timeout")
.description("Kinesis Client Library graceful shutdown timeout")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("20 secs")
.required(true).build();
public static final PropertyDescriptor CHECKPOINT_INTERVAL = new
PropertyDescriptor.Builder()
- .displayName("Checkpoint Interval")
- .name("amazon-kinesis-stream-checkpoint-interval")
+ .name("Checkpoint Interval")
.description("Interval between Kinesis checkpoints")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("3 secs")
.required(true).build();
public static final PropertyDescriptor NUM_RETRIES = new
PropertyDescriptor.Builder()
- .displayName("Retry Count")
- .name("amazon-kinesis-stream-retry-count")
+ .name("Retry Count")
.description("Number of times to retry a Kinesis operation
(process record, checkpoint, shutdown)")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("10")
.required(true).build();
public static final PropertyDescriptor RETRY_WAIT = new
PropertyDescriptor.Builder()
- .displayName("Retry Wait")
- .name("amazon-kinesis-stream-retry-wait")
+ .name("Retry Wait")
.description("Interval between Kinesis operation retries (process
record, checkpoint, shutdown)")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 sec")
.required(true).build();
public static final PropertyDescriptor DYNAMODB_ENDPOINT_OVERRIDE = new
PropertyDescriptor.Builder()
- .displayName("DynamoDB Override")
- .name("amazon-kinesis-stream-dynamodb-override")
+ .name("DynamoDB Override")
.description("DynamoDB override to use non-AWS deployments")
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false).build();
public static final PropertyDescriptor REPORT_CLOUDWATCH_METRICS = new
PropertyDescriptor.Builder()
- .displayName("Report Metrics to CloudWatch")
- .name("amazon-kinesis-stream-cloudwatch-flag")
+ .name("Report Metrics to CloudWatch")
.description("Whether to report Kinesis usage metrics to
CloudWatch.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
@@ -280,8 +269,7 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
.required(true).build();
public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
- .name("amazon-kinesis-stream-record-reader")
- .displayName("Record Reader")
+ .name("Record Reader")
.description("The Record Reader to use for reading received
messages." +
" The Kinesis Stream name can be referred to by Expression
Language '${" +
AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY +
"}' to access a schema." +
@@ -292,8 +280,7 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
.build();
public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
- .name("amazon-kinesis-stream-record-writer")
- .displayName("Record Writer")
+ .name("Record Writer")
.description("The Record Writer to use for serializing Records to
an output FlowFile." +
" The Kinesis Stream name can be referred to by Expression
Language '${" +
AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY +
"}' to access a schema." +
@@ -405,6 +392,24 @@ public class ConsumeKinesisStream extends
AbstractAwsAsyncProcessor<KinesisAsync
}
}
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ config.renameProperty("kinesis-stream-name", "Amazon Kinesis Stream
Name");
+ config.renameProperty("amazon-kinesis-stream-application-name",
"Application Name");
+ config.renameProperty("amazon-kinesis-stream-initial-position",
"Initial Stream Position");
+ config.renameProperty("amazon-kinesis-stream-position-timestamp",
"Stream Position Timestamp");
+ config.renameProperty("amazon-kinesis-stream-timestamp-format",
"Timestamp Format");
+ config.renameProperty("amazon-kinesis-stream-failover-timeout",
"Failover Timeout");
+
config.renameProperty("amazon-kinesis-stream-graceful-shutdown-timeout",
"Graceful Shutdown Timeout");
+ config.renameProperty("amazon-kinesis-stream-checkpoint-interval",
"Checkpoint Interval");
+ config.renameProperty("amazon-kinesis-stream-retry-count", "Retry
Count");
+ config.renameProperty("amazon-kinesis-stream-retry-wait", "Retry
Wait");
+ config.renameProperty("amazon-kinesis-stream-dynamodb-override",
"DynamoDB Override");
+ config.renameProperty("amazon-kinesis-stream-cloudwatch-flag", "Report
Metrics to CloudWatch");
+ config.renameProperty("amazon-kinesis-stream-record-reader", "Record
Reader");
+ config.renameProperty("amazon-kinesis-stream-record-writer", "Record
Writer");
+ }
+
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
final PropertyDescriptor.Builder builder = new
PropertyDescriptor.Builder()