exceptionfactory commented on code in PR #7652:
URL: https://github.com/apache/nifi/pull/7652#discussion_r1379386210


##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -68,15 +72,24 @@
     @WritesAttribute(attribute = "amqp$exchange", description = "The exchange 
from which AMQP Message was received")
 })
 public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
+
     private static final String ATTRIBUTES_PREFIX = "amqp$";
+    public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
+
+    public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = 
new AllowableValue("Comma Separated String", "Comma Separated String",

Review Comment:
   ```suggestion
       public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING 
= new AllowableValue("Comma-Separated String", "Comma-Separated String",
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -68,15 +72,24 @@
     @WritesAttribute(attribute = "amqp$exchange", description = "The exchange 
from which AMQP Message was received")
 })
 public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
+
     private static final String ATTRIBUTES_PREFIX = "amqp$";
+    public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
+
+    public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING = 
new AllowableValue("Comma Separated String", "Comma Separated String",
+            "Put all headers as a string with the specified separator in the 
attribute 'amqp$headers'.");
+    public static final AllowableValue HEADERS_FORMAT_JSON_STRING = new 
AllowableValue("JSON String", "JSON String",
+            "Format all headers as JSON string and output in the attribute 
'amqp$headers'. It will include keys with null value as well.");
+    public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new 
AllowableValue("Flow file attributes", "Flow file attributes",

Review Comment:
   ```suggestion
       public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new 
AllowableValue("FlowFile Attributes", "FlowFile Attributes",
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java:
##########
@@ -156,13 +158,83 @@ public void 
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
             TestRunner runner = initTestRunner(proc);
 
             runner.run();
-            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
             assertNotNull(successFF);
             successFF.assertAttributeEquals("amqp$routingKey", "key1");
             successFF.assertAttributeEquals("amqp$exchange", "myExchange");
         }
     }
 
+    @Test
+    public void 
validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("foo1", "bar,bar");
+        headersMap.put("foo2", "bar,bar");
+        headersMap.put("foo3", "null");
+        headersMap.put("foo4", null);
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode EXPECTED_JSON = objectMapper.valueToTree(headersMap);
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HEADER_FORMAT, 
ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            JsonNode jsonNode = objectMapper.readTree(headers);
+            assertEquals(EXPECTED_JSON, jsonNode);

Review Comment:
   ```suggestion
               assertEquals(expectedJson, jsonNode);
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java:
##########
@@ -156,13 +158,83 @@ public void 
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
             TestRunner runner = initTestRunner(proc);
 
             runner.run();
-            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
             assertNotNull(successFF);
             successFF.assertAttributeEquals("amqp$routingKey", "key1");
             successFF.assertAttributeEquals("amqp$exchange", "myExchange");
         }
     }
 
+    @Test
+    public void 
validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("foo1", "bar,bar");
+        headersMap.put("foo2", "bar,bar");
+        headersMap.put("foo3", "null");
+        headersMap.put("foo4", null);
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode EXPECTED_JSON = objectMapper.valueToTree(headersMap);
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HEADER_FORMAT, 
ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            JsonNode jsonNode = objectMapper.readTree(headers);
+            assertEquals(EXPECTED_JSON, jsonNode);
+        }
+    }
+
+    @Test
+    public void 
validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String,Object> expectedHeadersMap = new HashMap<>();
+        expectedHeadersMap.put("foo1", "bar,bar");
+        expectedHeadersMap.put("foo2", "bar,bar");
+        expectedHeadersMap.put("foo3", "null");
+        final Map<String, Object> headersMap = new 
HashMap<>(expectedHeadersMap);
+        headersMap.put("foo4", null);
+
+        final String HEADER_PREFIX = "test.header";

Review Comment:
   ```suggestion
           final String headerPrefix = "test.header";
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -170,8 +208,8 @@ protected void processResource(final Connection connection, 
final AMQPConsumer c
 
             final BasicProperties amqpProperties = response.getProps();
             final Envelope envelope = response.getEnvelope();
-            final Map<String, String> attributes = 
buildAttributes(amqpProperties, envelope, 
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
-                    context.getProperty(HEADER_SEPARATOR).toString());
+            final Map<String, String> attributes = 
buildAttributes(amqpProperties, envelope, 
context.getProperty(HEADER_FORMAT).toString(), 
context.getProperty(HEADER_KEY_PREFIX).toString(),

Review Comment:
   It would be helpful to declare the values for `headerFormat` and 
`headerKeyPrefix`. In addition, `PropertyValue.getValue()` should be used 
instead of `PropertyValue.toString()`:
   ```suggestion
               final String headerFormat = 
context.getProperty(HEADER_FORMAT).getValue();
               final String headerKeyPrefix = 
context.getProperty(HEADER_KEY_PREFIX).getValue();
               final Map<String, String> attributes = 
buildAttributes(amqpProperties, envelope, headerFormat, headerKeyPrefix,
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -99,22 +112,43 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         .defaultValue("10")
         .required(true)
         .build();
+
+    public static final PropertyDescriptor HEADER_FORMAT = new 
PropertyDescriptor.Builder()
+        .name("header.format")
+        .displayName("Header Output Format")
+        .description("Defines how to output headers from the received message")
+        .allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, 
HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
+        .defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
+        .required(true)
+        .build();
+    public static final PropertyDescriptor HEADER_KEY_PREFIX = new 
PropertyDescriptor.Builder()
+        .name("header.key.prefix")
+        .displayName("Header Key Prefix")
+        .description("Text to be prefixed to header keys as the are added to 
the flowfile attributes. Processor will append '.' to the value")

Review Comment:
   ```suggestion
           .description("Text to be prefixed to header keys as the are added to 
the FlowFile attributes. Processor will append '.' to the value of this 
property")
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -99,22 +112,43 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         .defaultValue("10")
         .required(true)
         .build();
+
+    public static final PropertyDescriptor HEADER_FORMAT = new 
PropertyDescriptor.Builder()
+        .name("header.format")
+        .displayName("Header Output Format")
+        .description("Defines how to output headers from the received message")
+        .allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, 
HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
+        .defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
+        .required(true)
+        .build();
+    public static final PropertyDescriptor HEADER_KEY_PREFIX = new 
PropertyDescriptor.Builder()
+        .name("header.key.prefix")
+        .displayName("Header Key Prefix")
+        .description("Text to be prefixed to header keys as the are added to 
the flowfile attributes. Processor will append '.' to the value")
+        .defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
+        .required(true)
+        .build();
+
     public static final PropertyDescriptor HEADER_SEPARATOR = new 
PropertyDescriptor.Builder()
-       .name("header.separator")
-       .displayName("Header Separator")
-       .description("The character that is used to separate key-value for 
header in String. The value must only one character."
-               + "Otherwise you will get an error message")
-       .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
-       .defaultValue(",")
-       .required(false)
-       .build();
+        .name("header.separator")
+        .displayName("Header Separator")
+        .description("The character that is used to separate key-value for 
header in String. The value must only one character."
+                + "Otherwise you will get an error message")

Review Comment:
   The use of `you` should be avoided, and invalid values generally produce 
errors, so this can be removed.
   ```suggestion
                   )
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java:
##########
@@ -156,13 +158,83 @@ public void 
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
             TestRunner runner = initTestRunner(proc);
 
             runner.run();
-            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
             assertNotNull(successFF);
             successFF.assertAttributeEquals("amqp$routingKey", "key1");
             successFF.assertAttributeEquals("amqp$exchange", "myExchange");
         }
     }
 
+    @Test
+    public void 
validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("foo1", "bar,bar");
+        headersMap.put("foo2", "bar,bar");
+        headersMap.put("foo3", "null");
+        headersMap.put("foo4", null);
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode EXPECTED_JSON = objectMapper.valueToTree(headersMap);

Review Comment:
   Local variables should be camelCased.
   ```suggestion
           JsonNode expectedJson = objectMapper.valueToTree(headersMap);
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -99,22 +112,43 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         .defaultValue("10")
         .required(true)
         .build();
+
+    public static final PropertyDescriptor HEADER_FORMAT = new 
PropertyDescriptor.Builder()
+        .name("header.format")
+        .displayName("Header Output Format")
+        .description("Defines how to output headers from the received message")
+        .allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING, 
HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
+        .defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
+        .required(true)
+        .build();
+    public static final PropertyDescriptor HEADER_KEY_PREFIX = new 
PropertyDescriptor.Builder()
+        .name("header.key.prefix")
+        .displayName("Header Key Prefix")
+        .description("Text to be prefixed to header keys as the are added to 
the flowfile attributes. Processor will append '.' to the value")
+        .defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
+        .required(true)
+        .build();
+
     public static final PropertyDescriptor HEADER_SEPARATOR = new 
PropertyDescriptor.Builder()
-       .name("header.separator")
-       .displayName("Header Separator")
-       .description("The character that is used to separate key-value for 
header in String. The value must only one character."
-               + "Otherwise you will get an error message")
-       .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
-       .defaultValue(",")
-       .required(false)
-       .build();
+        .name("header.separator")
+        .displayName("Header Separator")
+        .description("The character that is used to separate key-value for 
header in String. The value must only one character."

Review Comment:
   ```suggestion
           .description("The character that is used to separate key-value for 
header in String. The value must be only one character."
   ```



##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java:
##########
@@ -156,13 +158,83 @@ public void 
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
             TestRunner runner = initTestRunner(proc);
 
             runner.run();
-            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
             assertNotNull(successFF);
             successFF.assertAttributeEquals("amqp$routingKey", "key1");
             successFF.assertAttributeEquals("amqp$exchange", "myExchange");
         }
     }
 
+    @Test
+    public void 
validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("foo1", "bar,bar");
+        headersMap.put("foo2", "bar,bar");
+        headersMap.put("foo3", "null");
+        headersMap.put("foo4", null);
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode EXPECTED_JSON = objectMapper.valueToTree(headersMap);
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HEADER_FORMAT, 
ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            JsonNode jsonNode = objectMapper.readTree(headers);
+            assertEquals(EXPECTED_JSON, jsonNode);
+        }
+    }
+
+    @Test
+    public void 
validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAndTransferToSuccess()
 throws Exception {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+        final Map<String,Object> expectedHeadersMap = new HashMap<>();
+        expectedHeadersMap.put("foo1", "bar,bar");
+        expectedHeadersMap.put("foo2", "bar,bar");
+        expectedHeadersMap.put("foo3", "null");
+        final Map<String, Object> headersMap = new 
HashMap<>(expectedHeadersMap);
+        headersMap.put("foo4", null);
+
+        final String HEADER_PREFIX = "test.header";
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), 
"key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HEADER_FORMAT, 
ConsumeAMQP.HEADERS_FORMAT_ATTRIBUTES);
+            runner.setProperty(ConsumeAMQP.HEADER_KEY_PREFIX,HEADER_PREFIX);
+            runner.run();
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            successFF.assertAttributeNotExists("amqp$headers");
+            expectedHeadersMap.forEach((key, value) ->{
+                
successFF.assertAttributeEquals(HEADER_PREFIX+"."+key,value.toString());

Review Comment:
   The spacing should be adjusted.
   ```suggestion
                   successFF.assertAttributeEquals(HEADER_PREFIX + "." + key, 
value.toString());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to