[
https://issues.apache.org/jira/browse/NIFI-1686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213524#comment-15213524
]
ASF GitHub Bot commented on NIFI-1686:
--------------------------------------
Github user steveyh25 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/305#discussion_r57531275
--- Diff:
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
---
@@ -195,29 +193,85 @@ public void process(final InputStream in) throws
IOException {
* Extracts AMQP properties from the {@link FlowFile} attributes.
Attributes
* extracted from {@link FlowFile} are considered candidates for AMQP
* properties if their names are prefixed with
- * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
+ * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g.,
amqp$contentType=text/xml).
+ * The header property is an exception and requires a {@link Map}, so
should
+ * be passed in the following format: amqp$headers=key$value$key$value
etc.
*/
private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile
flowFile) {
Map<String, String> attributes = flowFile.getAttributes();
AMQP.BasicProperties.Builder builder = new
AMQP.BasicProperties.Builder();
for (Entry<String, String> attributeEntry : attributes.entrySet())
{
if
(attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
- String amqpPropName = attributeEntry.getKey().split("\\" +
AMQPUtils.AMQP_PROP_DELIMITER)[1];
+ String amqpPropName = attributeEntry.getKey();
String amqpPropValue = attributeEntry.getValue();
- try {
- if
(amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) {
- Method m =
builder.getClass().getDeclaredMethod(amqpPropName, String.class);
- m.invoke(builder, amqpPropValue);
- } else {
- getLogger().warn("Unrecogninsed AMQP property '" +
amqpPropName + "', will ignore.");
+
+ AMQPUtils.PropertyNames propertyNames =
AMQPUtils.PropertyNames.fromValue(amqpPropName);
+
+ if (propertyNames != null) {
+ try {
+ switch (propertyNames){
+ case CONTENT_TYPE:
+ builder.contentType(amqpPropValue);
+ break;
+ case CONTENT_ENCODING:
+ builder.contentEncoding(amqpPropValue);
+ break;
+ case HEADERS:
+ String[] s = amqpPropValue.split("\\" +
AMQPUtils.AMQP_PROP_DELIMITER);
+ Map<String, Object> headers = new
HashMap<>();
+
+ for (int i = 0; i < s.length ; i += 2){
+ if (i + 2 <= s.length){
+ headers.put(s[i], s[i + 1]);
+ }
+ }
+
+ builder.headers(headers);
+ break;
--- End diff --
This seems more appropriate. What should happen if the format is
incomplete? i.e [key1=value1,key2=]. In the committed code I was making sure
that a key and a value existed before populating the headers - because there's
no point in sending a header without a value - it may as well not exist?
> NiFi is unable to populate over 1/4 of AMQP properties from flow properties
> ---------------------------------------------------------------------------
>
> Key: NIFI-1686
> URL: https://issues.apache.org/jira/browse/NIFI-1686
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 0.5.1
> Reporter: Stephen Harper
>
> When creating a flow (we used ListenHTTP, but this bug will affect all) that
> forwards on to a rabbit queue, org.apache.nifi.amqp.processors.PublishAMQP
> uses the method extractAmqpPropertiesFromFlowFile to populate the AMQP
> BasicProperties if the flow attributes match a certain format (i.e
> amqp$contentType=text/xml).
> The method in question uses reflection to find a matching method name from
> the AMQP.BasicProperties class, and tries to populate accordingly.
> This works fine for all properties that take a String argument - however
> there are some that don't (specifically, headers takes a Map<String, Object>,
> deliveryMode and priority take Integer, and timestamp takes a Date), and it
> is impossible to populate these values because the invocation assumes a
> String is required, and fails on line 210.
> Whatsmore, the comment underneath (line 215) states that "this should really
> never happen since it should be caught by the above IF" - however the author
> of the code mustn't have tested all cases because this error is consistently
> present when trying to forward flow attributes in over a quarter of the
> available amqp properties.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)