This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 747b5d4d9e NIFI-10317 Taking care of NullPointerException if AMQP 
header value is null NIFI-10317 refactoring to remove repeated conversions to 
string from char NIFI-10317 correctly handle null values: null vs "null" 
NIFI-10317 adding test NIFI-10317 - Updated ConsumeAMQPTest to test for null 
and empty header values. NIFI-10317 - Updated ConsumeAMQPTest to use hard coded 
string values when testing. Made convertMapToString() private.
747b5d4d9e is described below

commit 747b5d4d9e933f6ced31c8735f7e8eb791449a71
Author: SaumyaGurtu <[email protected]>
AuthorDate: Mon Sep 19 11:57:27 2022 +0530

    NIFI-10317 Taking care of NullPointerException if AMQP header value is null
    NIFI-10317 refactoring to remove repeated conversions to string from char
    NIFI-10317 correctly handle null values: null vs "null"
    NIFI-10317 adding test
    NIFI-10317 - Updated ConsumeAMQPTest to test for null and empty header 
values.
    NIFI-10317 - Updated ConsumeAMQPTest to use hard coded string values when 
testing. Made convertMapToString() private.
    
    Signed-off-by: Nathan Gough <[email protected]>
    
    This closes #6382.
---
 .../apache/nifi/amqp/processors/ConsumeAMQP.java   | 27 +++------
 .../nifi/amqp/processors/ConsumeAMQPTest.java      | 64 ++++++++++------------
 2 files changed, 37 insertions(+), 54 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 734d3efd40..87cefc7922 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -43,6 +43,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 @Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -170,7 +171,7 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
             final BasicProperties amqpProperties = response.getProps();
             final Envelope envelope = response.getEnvelope();
             final Map<String, String> attributes = 
buildAttributes(amqpProperties, envelope, 
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
-                    
context.getProperty(HEADER_SEPARATOR).toString().charAt(0));
+                    context.getProperty(HEADER_SEPARATOR).toString());
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, 
connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@@ -184,12 +185,12 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         }
     }
 
-    private Map<String, String> buildAttributes(final BasicProperties 
properties, final Envelope envelope, boolean removeCurlyBraces,  Character 
valueSeperatorForHeaders) {
+    private Map<String, String> buildAttributes(final BasicProperties 
properties, final Envelope envelope, boolean removeCurlyBraces, String 
valueSeperatorForHeaders) {
         final Map<String, String> attributes = new HashMap<>();
         addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", 
properties.getAppId());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", 
properties.getContentEncoding());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", 
properties.getContentType());
-        addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", 
buildHeaders(properties.getHeaders(), 
removeCurlyBraces,valueSeperatorForHeaders));
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", 
buildHeaders(properties.getHeaders(), removeCurlyBraces, 
valueSeperatorForHeaders));
         addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", 
properties.getDeliveryMode());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", 
properties.getPriority());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", 
properties.getCorrelationId());
@@ -213,31 +214,21 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
         attributes.put(attributeName, value.toString());
     }
 
-    private String buildHeaders(Map<String, Object> headers,  boolean 
removeCurlyBraces,Character valueSeparatorForHeaders) {
+    private String buildHeaders(Map<String, Object> headers,  boolean 
removeCurlyBraces, String valueSeparatorForHeaders) {
         if (headers == null) {
             return null;
         }
         String headerString = 
convertMapToString(headers,valueSeparatorForHeaders);
 
         if (!removeCurlyBraces) {
-            StringBuilder stringBuilder = new StringBuilder();
-            stringBuilder.append("{").append(headerString).append("}");
-            headerString = stringBuilder.toString();
+            headerString = "{" + headerString + "}";
         }
         return headerString;
     }
 
-    public static String convertMapToString(Map<String, Object> 
headers,Character valueSeparatorForHeaders) {
-        StringBuilder stringBuilder = new StringBuilder();
-        boolean notFirst = false;
-        for (Map.Entry<String, Object> entry : headers.entrySet()) {
-            if (notFirst) {
-                stringBuilder.append(valueSeparatorForHeaders);
-            }
-            
stringBuilder.append(entry.getKey()).append("=").append(entry.getValue().toString());
-            notFirst = true;
-        }
-        return stringBuilder.toString();
+    private static String convertMapToString(Map<String, Object> headers, 
String valueSeparatorForHeaders) {
+        return headers.entrySet().stream().map(e -> (e.getValue()!= null) ? 
e.getKey() + "=" + e.getValue(): e.getKey())
+                .collect(Collectors.joining(valueSeparatorForHeaders));
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 6daf5d979c..c382e730dd 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -16,11 +16,16 @@
  */
 package org.apache.nifi.amqp.processors;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.MessageProperties;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -30,19 +35,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
-import java.util.regex.Pattern;
 
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.MessageProperties;
-import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class ConsumeAMQPTest {
 
@@ -170,8 +168,11 @@ public class ConsumeAMQPTest {
         final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
         final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
         final Map<String, Object> headersMap = new HashMap<>();
-        headersMap.put("foo1","bar,bar");
-        headersMap.put("foo2","bar,bar");
+        headersMap.put("foo1", "bar,bar");
+        headersMap.put("foo2", "bar,bar");
+        headersMap.put("foo3", "null");
+        headersMap.put("foo4", null);
+        final String EXPECTED_RESULT = 
"{foo1=bar,bar|foo2=bar,bar|foo3=null|foo4}";
 
         AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
         builderBasicProperties.headers(headersMap);
@@ -190,8 +191,7 @@ public class ConsumeAMQPTest {
             successFF.assertAttributeEquals("amqp$routingKey", "key1");
             successFF.assertAttributeEquals("amqp$exchange", "myExchange");
             String headers = successFF.getAttribute("amqp$headers");
-            Map<String, String> properties = 
convertStringToMap(headers.substring(1,headers.length()-1),"|");
-            assertEquals(headersMap,properties);
+            assertEquals(EXPECTED_RESULT, headers);
         }
     }
     @Test
@@ -239,6 +239,7 @@ public class ConsumeAMQPTest {
         final Map<String, Object> headersMap = new HashMap<>();
         headersMap.put("key1","(bar,bar)");
         headersMap.put("key2","(bar,bar)");
+        final String EXPECTED_RESULT = "key1=(bar,bar)|key2=(bar,bar)";
 
         AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
         builderBasicProperties.headers(headersMap);
@@ -259,8 +260,7 @@ public class ConsumeAMQPTest {
             successFF.assertAttributeEquals("amqp$routingKey", "key1");
             successFF.assertAttributeEquals("amqp$exchange", "myExchange");
             String headers = successFF.getAttribute("amqp$headers");
-            Map<String, String> properties = convertStringToMap(headers,"|");
-            assertEquals(headersMap,properties);
+            assertEquals(EXPECTED_RESULT, headers);
         }
     }
 
@@ -271,10 +271,14 @@ public class ConsumeAMQPTest {
         final Map<String, Object> headersMap = new HashMap<>();
         headersMap.put("key1","bar");
         headersMap.put("key2","bar2");
+        headersMap.put("key3","");
+        headersMap.put("key4", null);
+        final String EXPECTED_RESULT = "{key1=bar,key2=bar2,key3=,key4}";
 
         AMQP.BasicProperties.Builder builderBasicProperties = new 
AMQP.BasicProperties.Builder();
         builderBasicProperties.headers(headersMap);
 
+
         final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
 
         try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
@@ -289,22 +293,10 @@ public class ConsumeAMQPTest {
             successFF.assertAttributeEquals("amqp$routingKey", "key1");
             successFF.assertAttributeEquals("amqp$exchange", "myExchange");
             String headers = successFF.getAttribute("amqp$headers");
-            Map<String, String> properties = 
convertStringToMap(headers.substring(1,headers.length()-1),",");
-            assertEquals(headersMap,properties);
+            assertEquals(EXPECTED_RESULT, headers);
         }
     }
 
-
-    private Map<String,String> convertStringToMap(String map,String 
splitCharacter){
-        Map<String, String> headers = new HashMap<>();
-        String[] pairs = 
map.split(Pattern.quote(String.valueOf(splitCharacter)));
-        for (String pair : pairs) {
-            String[] keyValue = pair.split("=", 2);
-            assertEquals(2,keyValue.length);
-            headers.put(keyValue[0].trim(), keyValue[1].trim());
-        }
-        return headers;
-    }
     private TestRunner initTestRunner(ConsumeAMQP proc) {
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");

Reply via email to