This is an automated email from the ASF dual-hosted git repository.

kdoran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e603b017 NIFI-7865 amqp$header is splitted in the wrong way for "," 
and "}"
e603b017 is described below

commit e603b0179bf46ef26bbf4713ae68553b19d4c485
Author: sedadgn <seda.do...@abas.de>
AuthorDate: Thu Oct 14 13:24:35 2021 +0200

    NIFI-7865 amqp$header is splitted in the wrong way for "," and "}"
    
    This PR introduces 2 new properties for the ConsumeAMQP processor
    And one new property for PublishAMQP
    
    This allows to configure the processors to use escaping for commas and to 
consistently not use curly braces in the amqp$header attribute.
    
    The default values ensure backwards compatibility.
    
    This closes #5458.
    
    Signed-off-by: Kevin Doran <kdo...@apache.org>
---
 .../nifi/processor/util/StandardValidators.java    |  20 +++
 .../apache/nifi/amqp/processors/ConsumeAMQP.java   |  54 +++++++-
 .../apache/nifi/amqp/processors/PublishAMQP.java   |  28 ++--
 .../nifi/amqp/processors/ConsumeAMQPTest.java      | 144 +++++++++++++++++++++
 .../nifi/amqp/processors/PublishAMQPTest.java      |  49 +++++++
 5 files changed, 283 insertions(+), 12 deletions(-)

diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index 1dad5d4..7c2276a 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -375,6 +375,25 @@ public class StandardValidators {
         }
     };
 
+    public static final Validator SINGLE_CHAR_VALIDATOR = (subject, input, 
context) -> {
+        if (input == null) {
+            return new ValidationResult.Builder()
+                    .input(input)
+                    .subject(subject)
+                    .valid(false)
+                    .explanation("Input is null for this property")
+                    .build();
+        }
+        if (input.length() != 1) {
+            return new ValidationResult.Builder()
+                    .input(input)
+                    .subject(subject)
+                    .valid(false)
+                    .explanation("Value must be exactly 1 character but was " 
+ input.length() + " in length")
+                    .build();
+        }
+        return new 
ValidationResult.Builder().input(input).subject(subject).valid(true).build();
+    };
     /**
      * URL Validator that does not allow the Expression Language to be used
      */
@@ -980,4 +999,5 @@ public class StandardValidators {
             return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
         }
     }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 7a1885c..734d3ef 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -98,6 +98,24 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         .defaultValue("10")
         .required(true)
         .build();
+    public static final PropertyDescriptor HEADER_SEPARATOR = new 
PropertyDescriptor.Builder()
+       .name("header.separator")
+       .displayName("Header Separator")
+       .description("The character that is used to separate key-value for 
header in String. The value must only one character."
+               + "Otherwise you will get an error message")
+       .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
+       .defaultValue(",")
+       .required(false)
+       .build();
+    static final PropertyDescriptor REMOVE_CURLY_BRACES = new 
PropertyDescriptor.Builder()
+        .name("remove.curly.braces")
+        .displayName("Remove Curly Braces")
+        .description("If true Remove Curly Braces, Curly Braces in the header 
will be automatically remove.")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .defaultValue("False")
+        .allowableValues("True", "False")
+        .required(false)
+        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -112,6 +130,8 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         properties.add(QUEUE);
         properties.add(AUTO_ACKNOWLEDGE);
         properties.add(BATCH_SIZE);
+        properties.add(REMOVE_CURLY_BRACES);
+        properties.add(HEADER_SEPARATOR);
         properties.addAll(getCommonPropertyDescriptors());
         propertyDescriptors = Collections.unmodifiableList(properties);
 
@@ -149,7 +169,8 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
 
             final BasicProperties amqpProperties = response.getProps();
             final Envelope envelope = response.getEnvelope();
-            final Map<String, String> attributes = 
buildAttributes(amqpProperties, envelope);
+            final Map<String, String> attributes = 
buildAttributes(amqpProperties, envelope, 
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
+                    
context.getProperty(HEADER_SEPARATOR).toString().charAt(0));
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, 
connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@@ -163,12 +184,12 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         }
     }
 
-    private Map<String, String> buildAttributes(final BasicProperties 
properties, final Envelope envelope) {
+    private Map<String, String> buildAttributes(final BasicProperties 
properties, final Envelope envelope, boolean removeCurlyBraces,  Character 
valueSeperatorForHeaders) {
         final Map<String, String> attributes = new HashMap<>();
         addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", 
properties.getAppId());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", 
properties.getContentEncoding());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", 
properties.getContentType());
-        addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", 
properties.getHeaders());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", 
buildHeaders(properties.getHeaders(), 
removeCurlyBraces,valueSeperatorForHeaders));
         addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", 
properties.getDeliveryMode());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", 
properties.getPriority());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", 
properties.getCorrelationId());
@@ -192,6 +213,33 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         attributes.put(attributeName, value.toString());
     }
 
+    private String buildHeaders(Map<String, Object> headers,  boolean 
removeCurlyBraces,Character valueSeparatorForHeaders) {
+        if (headers == null) {
+            return null;
+        }
+        String headerString = 
convertMapToString(headers,valueSeparatorForHeaders);
+
+        if (!removeCurlyBraces) {
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append("{").append(headerString).append("}");
+            headerString = stringBuilder.toString();
+        }
+        return headerString;
+    }
+
+    public static String convertMapToString(Map<String, Object> 
headers,Character valueSeparatorForHeaders) {
+        StringBuilder stringBuilder = new StringBuilder();
+        boolean notFirst = false;
+        for (Map.Entry<String, Object> entry : headers.entrySet()) {
+            if (notFirst) {
+                stringBuilder.append(valueSeparatorForHeaders);
+            }
+            
stringBuilder.append(entry.getKey()).append("=").append(entry.getValue().toString());
+            notFirst = true;
+        }
+        return stringBuilder.toString();
+    }
+
     @Override
     protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext 
context, final Connection connection) {
         try {
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
index 1520d1b..4bd94f3 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -98,7 +99,15 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
-
+    public static final PropertyDescriptor HEADER_SEPARATOR = new 
PropertyDescriptor.Builder()
+            .name("header.separator")
+            .displayName("Header Separator")
+            .description("The character that is used to split key-value for 
headers. The value must only one character. "
+                    + "Otherwise you will get an error message")
+            .defaultValue(",")
+            .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
+            .required(false)
+            .build();
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are sent to the AMQP destination 
are routed to this relationship")
@@ -116,6 +125,7 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
         List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(EXCHANGE);
         properties.add(ROUTING_KEY);
+        properties.add(HEADER_SEPARATOR);
         properties.addAll(getCommonPropertyDescriptors());
         propertyDescriptors = Collections.unmodifiableList(properties);
 
@@ -143,7 +153,8 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
             return;
         }
 
-        final BasicProperties amqpProperties = 
extractAmqpPropertiesFromFlowFile(flowFile);
+        final BasicProperties amqpProperties = 
extractAmqpPropertiesFromFlowFile(flowFile,
+                context.getProperty(HEADER_SEPARATOR).toString().charAt(0));
         final String routingKey = 
context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
         if (routingKey == null) {
             throw new IllegalArgumentException("Failed to determine 'routing 
key' with provided value '"
@@ -224,7 +235,7 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
      * {@link AMQPUtils#validateAMQPPriorityProperty}
      * {@link AMQPUtils#validateAMQPTimestampProperty}
      */
-    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile) {
+    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile,Character headerSeparator) {
         final AMQP.BasicProperties.Builder builder = new 
AMQP.BasicProperties.Builder();
 
         updateBuilderFromAttribute(flowFile, "contentType", 
builder::contentType);
@@ -240,20 +251,20 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
         updateBuilderFromAttribute(flowFile, "userId", builder::userId);
         updateBuilderFromAttribute(flowFile, "appId", builder::appId);
         updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
-        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers)));
+        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers,headerSeparator)));
 
         return builder.build();
     }
 
     /**
      * Will validate if provided amqpPropValue can be converted to a {@link 
Map}.
-     * Should be passed in the format: amqp$headers=key=value,key=value etc.
-     *
+     * Should be passed in the format: amqp$headers=key=value
+     * @param splitValue is used to split for property
      * @param amqpPropValue the value of the property
      * @return {@link Map} if valid otherwise null
      */
-    private Map<String, Object> validateAMQPHeaderProperty(String 
amqpPropValue) {
-        String[] strEntries = amqpPropValue.split(",");
+    private Map<String, Object> validateAMQPHeaderProperty(String 
amqpPropValue,Character splitValue) {
+        String[] strEntries = 
amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));
         Map<String, Object> headers = new HashMap<>();
         for (String strEntry : strEntries) {
             String[] kv = strEntry.split("=");
@@ -263,7 +274,6 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
                 getLogger().warn("Malformed key value pair for AMQP header 
property: " + amqpPropValue);
             }
         }
-
         return headers;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 125593d..3704c9e 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -24,10 +24,12 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -35,8 +37,10 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
 import org.junit.Test;
 
+import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.MessageProperties;
 
@@ -161,6 +165,146 @@ public class ConsumeAMQPTest {
         }
     }
 
+    @Test
+    public void 
validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("foo1","bar,bar");
+        headersMap.put("foo2","bar,bar");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|");
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = 
convertStringToMap(headers.substring(1,headers.length()-1),"|");
+            Assert.assertEquals(headersMap,properties);
+        }
+    }
+    @Test
+    public void validateWithNotValidHeaderSeparatorParameter() {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+        ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+        TestRunner runner = initTestRunner(proc);
+        runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|,");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void 
validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","(bar,bar)");
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            successFF.assertAttributeEquals("amqp$headers", "key1=(bar,bar)");
+
+        }
+    }
+
+    @Test
+    public void 
validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","(bar,bar)");
+        headersMap.put("key2","(bar,bar)");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
+            runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|");
+
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = convertStringToMap(headers,"|");
+            Assert.assertEquals(headersMap,properties);
+        }
+    }
+
+    @Test
+    public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() 
throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","bar");
+        headersMap.put("key2","bar2");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = 
convertStringToMap(headers.substring(1,headers.length()-1),",");
+            Assert.assertEquals(headersMap,properties);
+        }
+    }
+
+
+    private Map<String,String> convertStringToMap(String map,String 
splitCharacter){
+        Map<String, String> headers = new HashMap<>();
+        String[] pairs = 
map.split(Pattern.quote(String.valueOf(splitCharacter)));
+        for (String pair : pairs) {
+            String[] keyValue = pair.split("=", 2);
+            Assert.assertEquals(2,keyValue.length);
+            headers.put(keyValue[0].trim(), keyValue[1].trim());
+        }
+        return headers;
+    }
     private TestRunner initTestRunner(ConsumeAMQP proc) {
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index 345372a..3d7e8f7 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -107,6 +107,55 @@ public class PublishAMQPTest {
     }
 
     @Test
+    public void validateSuccessWithHeaderWithCommaPublishToSuccess() throws 
Exception {
+        final PublishAMQP pubProc = new LocalPublishAMQP();
+        final TestRunner runner = TestRunners.newTestRunner(pubProc);
+        runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
+        runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
+        runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
+        runner.setProperty(PublishAMQP.USER, "user");
+        runner.setProperty(PublishAMQP.PASSWORD, "password");
+        runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|");
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3");
+
+        runner.enqueue("Hello Joe".getBytes(), attributes);
+
+        runner.run();
+
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        final Channel channel = ((LocalPublishAMQP) 
pubProc).getConnection().createChannel();
+        final GetResponse msg1 = channel.basicGet("queue1", true);
+        assertNotNull(msg1);
+
+        final Map<String, Object> headerMap = msg1.getProps().getHeaders();
+
+        final Object foo = headerMap.get("foo");
+        final Object foo2 = headerMap.get("foo2");
+        final Object foo3 = headerMap.get("foo3");
+
+        assertEquals("(bar,bar)", foo.toString());
+        assertEquals("bar2", foo2.toString());
+        assertNull(foo3);
+
+
+        assertNotNull(channel.basicGet("queue2", true));
+    }
+
+    @Test
+    public void validateWithNotValidHeaderSeparatorParameter()  {
+        final PublishAMQP pubProc = new LocalPublishAMQP();
+        final TestRunner runner = TestRunners.newTestRunner(pubProc);
+        runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|,");
+        runner.assertNotValid();
+
+    }
+
+    @Test
     public void validateFailedPublishAndTransferToFailure() throws Exception {
         PublishAMQP pubProc = new LocalPublishAMQP();
         TestRunner runner = TestRunners.newTestRunner(pubProc);

Reply via email to