exceptionfactory commented on code in PR #8105:
URL: https://github.com/apache/nifi/pull/8105#discussion_r1669267566
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -85,9 +85,9 @@ public class PublishAMQP extends
AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();
-
public static final PropertyDescriptor ROUTING_KEY = new
PropertyDescriptor.Builder()
.name("Routing Key")
+ .displayName("Routing Key")
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -206,46 +247,94 @@ private byte[] extractMessage(FlowFile flowFile,
ProcessSession session) {
}
- private void updateBuilderFromAttribute(final FlowFile flowFile, final
String attribute, final Consumer<String> updater) {
- final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX
+ attribute);
+ /**
+ * Reads an attribute from flowFile and pass it to the consumer function
+ *
+ * @param flowFile FlowFile for reading the attribute
+ * @param attributeKey Name of the attribute
+ * @param updater Consumer function which will use the attribute
value
+ */
+ private void readAmqpAttribute(final FlowFile flowFile, final String
attributeKey, final Consumer<String> updater) {
+ final String attributeValue = flowFile.getAttribute(attributeKey);
if (attributeValue == null) {
return;
}
try {
updater.accept(attributeValue);
} catch (final Exception e) {
- getLogger().warn("Failed to update AMQP Message Property {}",
attribute, e);
+ getLogger().warn("Failed to update AMQP Message Property [{}]",
attributeKey, e);
}
}
/**
* Extracts AMQP properties from the {@link FlowFile} attributes.
Attributes
* extracted from {@link FlowFile} are considered candidates for AMQP
- * properties if their names are prefixed with
- * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml).
+ * properties if their names are prefixed with "amq$" (e.g.,
amqp$contentType=text/xml).
*/
- private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile
flowFile, Character headerSeparator) {
+ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile
flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character
separator, Pattern pattern) {
final AMQP.BasicProperties.Builder builder = new
AMQP.BasicProperties.Builder();
- updateBuilderFromAttribute(flowFile, "contentType",
builder::contentType);
- updateBuilderFromAttribute(flowFile, "contentEncoding",
builder::contentEncoding);
- updateBuilderFromAttribute(flowFile, "deliveryMode", mode ->
builder.deliveryMode(Integer.parseInt(mode)));
- updateBuilderFromAttribute(flowFile, "priority", pri ->
builder.priority(Integer.parseInt(pri)));
- updateBuilderFromAttribute(flowFile, "correlationId",
builder::correlationId);
- updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo);
- updateBuilderFromAttribute(flowFile, "expiration",
builder::expiration);
- updateBuilderFromAttribute(flowFile, "messageId", builder::messageId);
- updateBuilderFromAttribute(flowFile, "timestamp", ts ->
builder.timestamp(new Date(Long.parseLong(ts))));
- updateBuilderFromAttribute(flowFile, "type", builder::type);
- updateBuilderFromAttribute(flowFile, "userId", builder::userId);
- updateBuilderFromAttribute(flowFile, "appId", builder::appId);
- updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
- updateBuilderFromAttribute(flowFile, "headers", headers ->
builder.headers(validateAMQPHeaderProperty(headers, headerSeparator)));
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE,
builder::contentEncoding);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode ->
builder.deliveryMode(Integer.parseInt(mode)));
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri ->
builder.priority(Integer.parseInt(pri)));
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new
Date(Long.parseLong(ts))));
+ readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE,
builder::type);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, builder::userId);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, builder::appId);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId);
+
+ Map<String, Object> headers = prepareAMQPHeaders(flowFile,
selectedHeaderSource, headerSourcePrecedence, separator, pattern);
+ builder.headers(headers);
return builder.build();
}
+ /**
+ * Extract AMQP headers from incoming {@link FlowFile} based on selected
headers source value.
+ *
+ * @param flowFile used to extract headers
+ * @return {@link Map}
+ */
+ private Map<String, Object> prepareAMQPHeaders(FlowFile flowFile, String
selectedHeaderSource, String headerSourcePrecedence, Character headerSeparator,
Pattern pattern) {
Review Comment:
This should be changed to take the `HeaderSource` enum instead of the string
value and all arguments should be declared as `final`.
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends
AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
-
+ public static final PropertyDescriptor HEADERS_SOURCE = new
PropertyDescriptor.Builder()
+ .name("Headers Source")
+ .displayName("Headers Source")
Review Comment:
```suggestion
```
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -158,10 +188,21 @@ protected void processResource(final Connection
connection, final AMQPPublisher
throw new IllegalArgumentException("Failed to determine 'routing
key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as
expression against incoming FlowFile.");
}
+ String selectedHeaderSource =
context.getProperty(HEADERS_SOURCE).getValue();
Review Comment:
This should use `asDescribedValue()` and return `HeaderSource` instead of
the string value.
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends
AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
-
+ public static final PropertyDescriptor HEADERS_SOURCE = new
PropertyDescriptor.Builder()
+ .name("Headers Source")
+ .displayName("Headers Source")
+ .description(
+ "The source of the headers which will be put in the
published message. They can come either from the processor property as a string
or they can be " +
+ "picked from flow file attributes based on Regex
expression.")
+ .required(true)
+ .allowableValues(InputHeaderSource.getAllowedValues())
+ .defaultValue(InputHeaderSource.STRING)
+ .build();
+ public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new
PropertyDescriptor.Builder()
Review Comment:
```suggestion
public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new
PropertyDescriptor.Builder()
```
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends
AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
-
+ public static final PropertyDescriptor HEADERS_SOURCE = new
PropertyDescriptor.Builder()
+ .name("Headers Source")
+ .displayName("Headers Source")
+ .description(
+ "The source of the headers which will be put in the
published message. They can come either from the processor property as a string
or they can be " +
+ "picked from flow file attributes based on Regex
expression.")
Review Comment:
```suggestion
.description("The source of the headers which will be applied to
the published message.")
```
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -125,26 +156,25 @@ public class PublishAMQP extends
AbstractAMQPProcessor<AMQPPublisher> {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(EXCHANGE);
properties.add(ROUTING_KEY);
+ properties.add(HEADERS_SOURCE);
+ properties.add(HEADERS_SOURCE_PRECEDENCE);
+ properties.add(HEADERS_ATTRIBUTES_REGEX);
properties.add(HEADER_SEPARATOR);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);
-
- Set<Relationship> rels = new HashSet<>();
- rels.add(REL_SUCCESS);
- rels.add(REL_FAILURE);
- relationships = Collections.unmodifiableSet(rels);
+ relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}
-
/**
* Will construct AMQP message by extracting its body from the incoming
{@link FlowFile}. AMQP Properties will be extracted from the
* {@link FlowFile} and converted to {@link BasicProperties} to be sent
along with the message. Upon success the incoming {@link FlowFile} is
* transferred to 'success' {@link Relationship} and upon failure FlowFile
is penalized and transferred to the 'failure' {@link Relationship}
* <br>
- *
- * NOTE: Attributes extracted from {@link FlowFile} are considered
- * candidates for AMQP properties if their names are prefixed with
- * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml)
+ * <p>
+ * NOTE: Attributes extracted from {@link FlowFile} are considered
candidates for AMQP properties if their names are prefixed with
+ * "amq$" (e.g., amqp$contentType=text/xml). For "amqp$headers" it depends
on the value of
Review Comment:
```suggestion
* "amqp$" (e.g., amqp$contentType=text/xml). For "amqp$headers" it
depends on the value of
```
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends
AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
-
+ public static final PropertyDescriptor HEADERS_SOURCE = new
PropertyDescriptor.Builder()
+ .name("Headers Source")
+ .displayName("Headers Source")
+ .description(
+ "The source of the headers which will be put in the
published message. They can come either from the processor property as a string
or they can be " +
+ "picked from flow file attributes based on Regex
expression.")
+ .required(true)
+ .allowableValues(InputHeaderSource.getAllowedValues())
Review Comment:
```suggestion
.allowableValues(InputHeaderSource.class)
```
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -268,4 +357,43 @@ private Map<String, Object>
validateAMQPHeaderProperty(String amqpPropValue, Cha
}
return headers;
}
+ public enum InputHeaderSource implements DescribedValue {
+
+ ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex",
+ "Select attributes based on regex pattern to put in rabbitmq
headers. Key of the attribute will be used as header key"),
+ STRING("headersFromString", "Attribute 'amp$headers' Value",
+ "Prepare headers from 'amp$headers' attribute string"),
+ BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value",
Review Comment:
The value of `BOTH` is confusing and would not translate well if some new
value were introduced. It seems like it should be removed, along with the
Precedence property.
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -268,4 +357,43 @@ private Map<String, Object>
validateAMQPHeaderProperty(String amqpPropValue, Cha
}
return headers;
}
+ public enum InputHeaderSource implements DescribedValue {
+
+ ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex",
+ "Select attributes based on regex pattern to put in rabbitmq
headers. Key of the attribute will be used as header key"),
+ STRING("headersFromString", "Attribute 'amp$headers' Value",
+ "Prepare headers from 'amp$headers' attribute string"),
+ BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value",
+ "Take headers from both sources: 'amp$headers' attribute and
attributes matching Regex. In case of key duplication precedence property will
define which value to take.");
+
+ private final String value;
+ private final String displayName;
+ private final String description;
+
+ InputHeaderSource(String value, String displayName, String
description) {
+
+ this.value = value;
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ public static EnumSet<InputHeaderSource> getAllowedValues() {
+ return EnumSet.of(STRING, ATTRIBUTES, BOTH);
+ }
+
+ @Override
+ public String getValue() {
+ return value;
Review Comment:
```suggestion
return name();
```
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -268,4 +357,43 @@ private Map<String, Object>
validateAMQPHeaderProperty(String amqpPropValue, Cha
}
return headers;
}
+ public enum InputHeaderSource implements DescribedValue {
+
+ ATTRIBUTES("headersFromAttributes", "Attributes Matching Regex",
+ "Select attributes based on regex pattern to put in rabbitmq
headers. Key of the attribute will be used as header key"),
+ STRING("headersFromString", "Attribute 'amp$headers' Value",
+ "Prepare headers from 'amp$headers' attribute string"),
+ BOTH("headersFromBoth", "Regex Match And 'amp$headers' Value",
+ "Take headers from both sources: 'amp$headers' attribute and
attributes matching Regex. In case of key duplication precedence property will
define which value to take.");
+
+ private final String value;
+ private final String displayName;
+ private final String description;
+
+ InputHeaderSource(String value, String displayName, String
description) {
Review Comment:
The `value` is unnecessary and the `name()` from the enum can be used
instead.
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -96,13 +96,44 @@ public class PublishAMQP extends
AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
-
+ public static final PropertyDescriptor HEADERS_SOURCE = new
PropertyDescriptor.Builder()
+ .name("Headers Source")
+ .displayName("Headers Source")
+ .description(
+ "The source of the headers which will be put in the
published message. They can come either from the processor property as a string
or they can be " +
+ "picked from flow file attributes based on Regex
expression.")
+ .required(true)
+ .allowableValues(InputHeaderSource.getAllowedValues())
+ .defaultValue(InputHeaderSource.STRING)
+ .build();
+ public static final PropertyDescriptor HEADERS_SOURCE_PRECEDENCE = new
PropertyDescriptor.Builder()
+ .name("Headers Source Precedence")
Review Comment:
It seems like it would be better to remove this property and just prefer one
option or the other.
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -206,46 +247,94 @@ private byte[] extractMessage(FlowFile flowFile,
ProcessSession session) {
}
- private void updateBuilderFromAttribute(final FlowFile flowFile, final
String attribute, final Consumer<String> updater) {
- final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX
+ attribute);
+ /**
+ * Reads an attribute from flowFile and pass it to the consumer function
+ *
+ * @param flowFile FlowFile for reading the attribute
+ * @param attributeKey Name of the attribute
+ * @param updater Consumer function which will use the attribute
value
+ */
+ private void readAmqpAttribute(final FlowFile flowFile, final String
attributeKey, final Consumer<String> updater) {
+ final String attributeValue = flowFile.getAttribute(attributeKey);
if (attributeValue == null) {
return;
}
try {
updater.accept(attributeValue);
} catch (final Exception e) {
- getLogger().warn("Failed to update AMQP Message Property {}",
attribute, e);
+ getLogger().warn("Failed to update AMQP Message Property [{}]",
attributeKey, e);
}
}
/**
* Extracts AMQP properties from the {@link FlowFile} attributes.
Attributes
* extracted from {@link FlowFile} are considered candidates for AMQP
- * properties if their names are prefixed with
- * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml).
+ * properties if their names are prefixed with "amq$" (e.g.,
amqp$contentType=text/xml).
*/
- private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile
flowFile, Character headerSeparator) {
+ private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile
flowFile, String selectedHeaderSource, String headerSourcePrecedence, Character
separator, Pattern pattern) {
final AMQP.BasicProperties.Builder builder = new
AMQP.BasicProperties.Builder();
- updateBuilderFromAttribute(flowFile, "contentType",
builder::contentType);
- updateBuilderFromAttribute(flowFile, "contentEncoding",
builder::contentEncoding);
- updateBuilderFromAttribute(flowFile, "deliveryMode", mode ->
builder.deliveryMode(Integer.parseInt(mode)));
- updateBuilderFromAttribute(flowFile, "priority", pri ->
builder.priority(Integer.parseInt(pri)));
- updateBuilderFromAttribute(flowFile, "correlationId",
builder::correlationId);
- updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo);
- updateBuilderFromAttribute(flowFile, "expiration",
builder::expiration);
- updateBuilderFromAttribute(flowFile, "messageId", builder::messageId);
- updateBuilderFromAttribute(flowFile, "timestamp", ts ->
builder.timestamp(new Date(Long.parseLong(ts))));
- updateBuilderFromAttribute(flowFile, "type", builder::type);
- updateBuilderFromAttribute(flowFile, "userId", builder::userId);
- updateBuilderFromAttribute(flowFile, "appId", builder::appId);
- updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
- updateBuilderFromAttribute(flowFile, "headers", headers ->
builder.headers(validateAMQPHeaderProperty(headers, headerSeparator)));
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, builder::contentType);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE,
builder::contentEncoding);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, mode ->
builder.deliveryMode(Integer.parseInt(mode)));
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, pri ->
builder.priority(Integer.parseInt(pri)));
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, builder::correlationId);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, builder::replyTo);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, builder::expiration);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, builder::messageId);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, ts -> builder.timestamp(new
Date(Long.parseLong(ts))));
+ readAmqpAttribute(flowFile, AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE,
builder::type);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, builder::userId);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, builder::appId);
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, builder::clusterId);
+
+ Map<String, Object> headers = prepareAMQPHeaders(flowFile,
selectedHeaderSource, headerSourcePrecedence, separator, pattern);
+ builder.headers(headers);
return builder.build();
}
+ /**
+ * Extract AMQP headers from incoming {@link FlowFile} based on selected
headers source value.
+ *
+ * @param flowFile used to extract headers
+ * @return {@link Map}
+ */
+ private Map<String, Object> prepareAMQPHeaders(FlowFile flowFile, String
selectedHeaderSource, String headerSourcePrecedence, Character headerSeparator,
Pattern pattern) {
+ final Map<String, Object> headers = new HashMap<>();
+ if
(InputHeaderSource.ATTRIBUTES.getValue().equals(selectedHeaderSource)) {
+ headers.putAll(attributesToHeaders(flowFile.getAttributes(),
pattern));
+ } else if
(InputHeaderSource.STRING.getValue().equals(selectedHeaderSource)) {
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value ->
headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
+ } else {
+ // When precedence matches, put values in the last so it can
override keys from other source
+ if
(InputHeaderSource.ATTRIBUTES.getValue().equals(headerSourcePrecedence)) {
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value ->
headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
+ headers.putAll(attributesToHeaders(flowFile.getAttributes(),
pattern));
+ } else {
+ headers.putAll(attributesToHeaders(flowFile.getAttributes(),
pattern));
+ readAmqpAttribute(flowFile,
AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, value ->
headers.putAll(validateAMQPHeaderProperty(value, headerSeparator)));
+ }
+ }
+ return headers;
+ }
+
+ /**
+ * Matches the pattern to keys of input attributes and output the amqp
headers map
+ * @param attributes flowFile attributes to scan for match
+ * @return Map with entries matching the pattern
+ */
+ private Map<String, String> attributesToHeaders(Map<String, String>
attributes, Pattern pattern) {
Review Comment:
```suggestion
private Map<String, String> getMatchedAttributes(final Map<String,
String> attributes, final Pattern pattern) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]