exceptionfactory commented on code in PR #8105:
URL: https://github.com/apache/nifi/pull/8105#discussion_r1436675089
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -191,6 +204,19 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
objectMapper = new ObjectMapper();
}
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ super.onScheduled(context);
+ batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ queueName = context.getProperty(QUEUE).getValue();
+ headerFormat = context.getProperty(HEADER_FORMAT).getValue();
+ headerAttributePrefix =
context.getProperty(HEADER_KEY_PREFIX).getValue();
+ removeCurlyBraces=context.getProperty(REMOVE_CURLY_BRACES).asBoolean();
+ valueSeparatorForHeaders =
context.getProperty(HEADER_SEPARATOR).getValue();
+ autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
+ prefetchCount = context.getProperty(PREFETCH_COUNT).asInteger();
Review Comment:
Is there a particular reason for moving these properties to class member
variables? In general, it is best to keep that as method-local variables unless
there is some other reason to move them around.
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -121,30 +159,48 @@ public class PublishAMQP extends
AbstractAMQPProcessor<AMQPPublisher> {
private final static Set<Relationship> relationships;
+ private String selectedHeaderSource;
+ private String headerSourcePrecedence;
+ private Character headerSeparator;
Review Comment:
See note on relocating property values.
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -58,36 +59,42 @@
+ "that happens you will see a log in both app-log and bulletin stating to
that effect, and the FlowFile will be routed to the 'failure' relationship.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@ReadsAttributes({
- @ReadsAttribute(attribute = "amqp$appId", description = "The App ID field
to set on the AMQP Message"),
- @ReadsAttribute(attribute = "amqp$contentEncoding", description = "The
Content Encoding to set on the AMQP Message"),
- @ReadsAttribute(attribute = "amqp$contentType", description = "The Content
Type to set on the AMQP Message"),
- @ReadsAttribute(attribute = "amqp$headers", description = "The headers to
set on the AMQP Message"),
- @ReadsAttribute(attribute = "amqp$deliveryMode", description = "The
numeric indicator for the Message's Delivery Mode"),
- @ReadsAttribute(attribute = "amqp$priority", description = "The Message
priority"),
- @ReadsAttribute(attribute = "amqp$correlationId", description = "The
Message's Correlation ID"),
- @ReadsAttribute(attribute = "amqp$replyTo", description = "The value of
the Message's Reply-To field"),
- @ReadsAttribute(attribute = "amqp$expiration", description = "The Message
Expiration"),
- @ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID
of the Message"),
- @ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp
of the Message, as the number of milliseconds since epoch"),
- @ReadsAttribute(attribute = "amqp$type", description = "The type of
message"),
- @ReadsAttribute(attribute = "amqp$userId", description = "The ID of the
user"),
- @ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the
AMQP Cluster"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE,
description = "The App ID field to set on the AMQP Message"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The
Content Encoding to set on the AMQP Message"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content
Type to set on the AMQP Message"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE,
description = "The headers to set on the AMQP Message"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric
indicator for the Message's Delivery Mode"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE,
description = "The Message priority"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The
Message's Correlation ID"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE,
description = "The value of the Message's Reply-To field"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message
Expiration"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID
of the Message"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of
the Message, as the number of milliseconds since epoch"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE,
description = "The type of message"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE,
description = "The ID of the user"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the
AMQP Cluster"),
})
public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
- private static final String ATTRIBUTES_PREFIX = "amqp$";
+ public static final AllowableValue HEADERS_FROM_ATTRIBUTES = new
AllowableValue("headersFromAttributes", "Attributes Matching Regex",
+ "Select attributes based on regex pattern to put in rabbitmq
headers. Key of the attribute will be used as header key");
+ public static final AllowableValue HEADERS_FROM_STRING = new
AllowableValue("headersFromString", "Attribute 'amp$headers' Value",
+ "Prepare headers from 'amp$headers' attribute string");
+ public static final AllowableValue HEADERS_FROM_BOTH = new
AllowableValue("headersFromBoth", "Regex Match And 'amp$headers' Value",
+ "Take headers from both sources: 'amp$headers' attribute and
attributes matching Regex. In case of key duplication precedence property will
define which value to take.");
public static final PropertyDescriptor EXCHANGE = new
PropertyDescriptor.Builder()
- .name("Exchange Name")
+ .name("exchange.name")
+ .displayName("Exchange Name")
.description("The name of the AMQP Exchange the messages will be
sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). "
+ "It is an optional property. If kept empty the messages
will be sent to a default AMQP exchange.")
.required(true)
.defaultValue("")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();
-
public static final PropertyDescriptor ROUTING_KEY = new
PropertyDescriptor.Builder()
- .name("Routing Key")
+ .name("routing.key")
Review Comment:
Property Names cannot be changed with introducing an automated migration
method, or following a deprecation process. In general, the display name and
property name should match for new properties, so these property name changes
should be reverted.
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -58,36 +59,42 @@
+ "that happens you will see a log in both app-log and bulletin stating to
that effect, and the FlowFile will be routed to the 'failure' relationship.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@ReadsAttributes({
- @ReadsAttribute(attribute = "amqp$appId", description = "The App ID field
to set on the AMQP Message"),
- @ReadsAttribute(attribute = "amqp$contentEncoding", description = "The
Content Encoding to set on the AMQP Message"),
- @ReadsAttribute(attribute = "amqp$contentType", description = "The Content
Type to set on the AMQP Message"),
- @ReadsAttribute(attribute = "amqp$headers", description = "The headers to
set on the AMQP Message"),
- @ReadsAttribute(attribute = "amqp$deliveryMode", description = "The
numeric indicator for the Message's Delivery Mode"),
- @ReadsAttribute(attribute = "amqp$priority", description = "The Message
priority"),
- @ReadsAttribute(attribute = "amqp$correlationId", description = "The
Message's Correlation ID"),
- @ReadsAttribute(attribute = "amqp$replyTo", description = "The value of
the Message's Reply-To field"),
- @ReadsAttribute(attribute = "amqp$expiration", description = "The Message
Expiration"),
- @ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID
of the Message"),
- @ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp
of the Message, as the number of milliseconds since epoch"),
- @ReadsAttribute(attribute = "amqp$type", description = "The type of
message"),
- @ReadsAttribute(attribute = "amqp$userId", description = "The ID of the
user"),
- @ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the
AMQP Cluster"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE,
description = "The App ID field to set on the AMQP Message"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The
Content Encoding to set on the AMQP Message"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content
Type to set on the AMQP Message"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE,
description = "The headers to set on the AMQP Message"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric
indicator for the Message's Delivery Mode"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE,
description = "The Message priority"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The
Message's Correlation ID"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE,
description = "The value of the Message's Reply-To field"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message
Expiration"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID
of the Message"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of
the Message, as the number of milliseconds since epoch"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE,
description = "The type of message"),
+ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE,
description = "The ID of the user"),
+ @ReadsAttribute(attribute =
AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the
AMQP Cluster"),
})
public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
- private static final String ATTRIBUTES_PREFIX = "amqp$";
+ public static final AllowableValue HEADERS_FROM_ATTRIBUTES = new
AllowableValue("headersFromAttributes", "Attributes Matching Regex",
+ "Select attributes based on regex pattern to put in rabbitmq
headers. Key of the attribute will be used as header key");
+ public static final AllowableValue HEADERS_FROM_STRING = new
AllowableValue("headersFromString", "Attribute 'amp$headers' Value",
+ "Prepare headers from 'amp$headers' attribute string");
+ public static final AllowableValue HEADERS_FROM_BOTH = new
AllowableValue("headersFromBoth", "Regex Match And 'amp$headers' Value",
+ "Take headers from both sources: 'amp$headers' attribute and
attributes matching Regex. In case of key duplication precedence property will
define which value to take.");
Review Comment:
Instead of declaring instances of `AllowableValue`, it would be helpful to
define an enum of options that implemented `DescribedValue`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]