markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907002018
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for
demarcating multiple Kine
private static final Set<Relationship> RAW_FILE_RELATIONSHIPS =
Set.of(REL_SUCCESS);
private static final Set<Relationship> RECORD_FILE_RELATIONSHIPS =
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
- private volatile DynamoDbAsyncClient dynamoDbClient;
- private volatile CloudWatchAsyncClient cloudWatchClient;
- private volatile KinesisAsyncClient kinesisClient;
- private volatile Scheduler kinesisScheduler;
-
+ private volatile SdkHttpClient kinesisHttpClient;
+ private volatile SdkHttpClient dynamoHttpClient;
+ private volatile KinesisClient kinesisClient;
+ private volatile DynamoDbClient dynamoDbClient;
+ private volatile SdkAsyncHttpClient asyncHttpClient;
+ private volatile KinesisShardManager shardManager;
+ private volatile KinesisConsumerClient consumerClient;
private volatile String streamName;
- private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
-
- private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
- private volatile @Nullable byte[] demarcatorValue;
+ private volatile int maxRecordsPerRequest;
+ private volatile String initialStreamPosition;
+ private volatile long maxBatchNanos;
+ private volatile long maxBatchBytes;
- private volatile Future<InitializationResult> initializationResultFuture;
- private final AtomicBoolean initialized = new AtomicBoolean();
-
- // An instance filed, so that it can be read in getRelationships.
- private volatile ProcessingStrategy processingStrategy =
ProcessingStrategy.from(
- PROCESSING_STRATEGY.getDefaultValue());
+ private volatile ProcessingStrategy processingStrategy =
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
- @Override
- public void migrateProperties(final PropertyConfiguration config) {
- ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
- }
-
@Override
public Set<Relationship> getRelationships() {
return switch (processingStrategy) {
- case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+ case FLOW_FILE, LINE_DELIMITED, DEMARCATOR ->
RAW_FILE_RELATIONSHIPS;
case RECORD -> RECORD_FILE_RELATIONSHIPS;
};
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
if (descriptor.equals(PROCESSING_STRATEGY)) {
- processingStrategy = ProcessingStrategy.from(newValue);
+ processingStrategy = ProcessingStrategy.valueOf(newValue);
}
}
- @OnScheduled
- public void setup(final ProcessContext context) {
- readerRecordProcessor = switch (processingStrategy) {
- case FLOW_FILE, DEMARCATOR -> null;
- case RECORD -> createReaderRecordProcessor(context);
- };
- demarcatorValue = switch (processingStrategy) {
- case FLOW_FILE, RECORD -> null;
- case DEMARCATOR -> {
- final String demarcatorValue =
context.getProperty(MESSAGE_DEMARCATOR).getValue();
- yield demarcatorValue != null ?
demarcatorValue.getBytes(UTF_8) : new byte[0];
- }
- };
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+ config.removeProperty("Checkpoint Interval");
+ config.removeProperty("Metrics Publishing");
+ }
+
+ @Override
+ public void migrateRelationships(final RelationshipConfiguration config) {
+ config.renameRelationship("parse failure", "parse.failure");
Review Comment:
Yeah, that's an artifact of some refactoring that I did. Good catch. Will
remove.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]