This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 747b5d4d9e NIFI-10317 Taking care of NullPointerException if AMQP
header value is null NIFI-10317 refactoring to remove repeated conversions to
string from char NIFI-10317 correctly handle null values: null vs "null"
NIFI-10317 adding test NIFI-10317 - Updated ConsumeAMQPTest to test for null
and empty header values. NIFI-10317 - Updated ConsumeAMQPTest to use hard coded
string values when testing. Made convertMapToString() private.
747b5d4d9e is described below
commit 747b5d4d9e933f6ced31c8735f7e8eb791449a71
Author: SaumyaGurtu <[email protected]>
AuthorDate: Mon Sep 19 11:57:27 2022 +0530
NIFI-10317 Taking care of NullPointerException if AMQP header value is null
NIFI-10317 refactoring to remove repeated conversions to string from char
NIFI-10317 correctly handle null values: null vs "null"
NIFI-10317 adding test
NIFI-10317 - Updated ConsumeAMQPTest to test for null and empty header
values.
NIFI-10317 - Updated ConsumeAMQPTest to use hard coded string values when
testing. Made convertMapToString() private.
Signed-off-by: Nathan Gough <[email protected]>
This closes #6382.
---
.../apache/nifi/amqp/processors/ConsumeAMQP.java | 27 +++------
.../nifi/amqp/processors/ConsumeAMQPTest.java | 64 ++++++++++------------
2 files changed, 37 insertions(+), 54 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 734d3efd40..87cefc7922 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -43,6 +43,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -170,7 +171,7 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
final BasicProperties amqpProperties = response.getProps();
final Envelope envelope = response.getEnvelope();
final Map<String, String> attributes =
buildAttributes(amqpProperties, envelope,
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
-
context.getProperty(HEADER_SEPARATOR).toString().charAt(0));
+ context.getProperty(HEADER_SEPARATOR).toString());
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile,
connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@@ -184,12 +185,12 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
}
}
- private Map<String, String> buildAttributes(final BasicProperties
properties, final Envelope envelope, boolean removeCurlyBraces, Character
valueSeperatorForHeaders) {
+ private Map<String, String> buildAttributes(final BasicProperties
properties, final Envelope envelope, boolean removeCurlyBraces, String
valueSeperatorForHeaders) {
final Map<String, String> attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId",
properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding",
properties.getContentEncoding());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType",
properties.getContentType());
- addAttribute(attributes, ATTRIBUTES_PREFIX + "headers",
buildHeaders(properties.getHeaders(),
removeCurlyBraces,valueSeperatorForHeaders));
+ addAttribute(attributes, ATTRIBUTES_PREFIX + "headers",
buildHeaders(properties.getHeaders(), removeCurlyBraces,
valueSeperatorForHeaders));
addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode",
properties.getDeliveryMode());
addAttribute(attributes, ATTRIBUTES_PREFIX + "priority",
properties.getPriority());
addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId",
properties.getCorrelationId());
@@ -213,31 +214,21 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
attributes.put(attributeName, value.toString());
}
- private String buildHeaders(Map<String, Object> headers, boolean
removeCurlyBraces,Character valueSeparatorForHeaders) {
+ private String buildHeaders(Map<String, Object> headers, boolean
removeCurlyBraces, String valueSeparatorForHeaders) {
if (headers == null) {
return null;
}
String headerString =
convertMapToString(headers,valueSeparatorForHeaders);
if (!removeCurlyBraces) {
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("{").append(headerString).append("}");
- headerString = stringBuilder.toString();
+ headerString = "{" + headerString + "}";
}
return headerString;
}
- public static String convertMapToString(Map<String, Object>
headers,Character valueSeparatorForHeaders) {
- StringBuilder stringBuilder = new StringBuilder();
- boolean notFirst = false;
- for (Map.Entry<String, Object> entry : headers.entrySet()) {
- if (notFirst) {
- stringBuilder.append(valueSeparatorForHeaders);
- }
-
stringBuilder.append(entry.getKey()).append("=").append(entry.getValue().toString());
- notFirst = true;
- }
- return stringBuilder.toString();
+ private static String convertMapToString(Map<String, Object> headers,
String valueSeparatorForHeaders) {
+ return headers.entrySet().stream().map(e -> (e.getValue()!= null) ?
e.getKey() + "=" + e.getValue(): e.getKey())
+ .collect(Collectors.joining(valueSeparatorForHeaders));
}
@Override
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 6daf5d979c..c382e730dd 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -16,11 +16,16 @@
*/
package org.apache.nifi.amqp.processors;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.MessageProperties;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
@@ -30,19 +35,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
-import java.util.regex.Pattern;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.MessageProperties;
-import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
public class ConsumeAMQPTest {
@@ -170,8 +168,11 @@ public class ConsumeAMQPTest {
final Map<String, List<String>> routingMap =
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
final Map<String, Object> headersMap = new HashMap<>();
- headersMap.put("foo1","bar,bar");
- headersMap.put("foo2","bar,bar");
+ headersMap.put("foo1", "bar,bar");
+ headersMap.put("foo2", "bar,bar");
+ headersMap.put("foo3", "null");
+ headersMap.put("foo4", null);
+ final String EXPECTED_RESULT =
"{foo1=bar,bar|foo2=bar,bar|foo3=null|foo4}";
AMQP.BasicProperties.Builder builderBasicProperties = new
AMQP.BasicProperties.Builder();
builderBasicProperties.headers(headersMap);
@@ -190,8 +191,7 @@ public class ConsumeAMQPTest {
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers");
- Map<String, String> properties =
convertStringToMap(headers.substring(1,headers.length()-1),"|");
- assertEquals(headersMap,properties);
+ assertEquals(EXPECTED_RESULT, headers);
}
}
@Test
@@ -239,6 +239,7 @@ public class ConsumeAMQPTest {
final Map<String, Object> headersMap = new HashMap<>();
headersMap.put("key1","(bar,bar)");
headersMap.put("key2","(bar,bar)");
+ final String EXPECTED_RESULT = "key1=(bar,bar)|key2=(bar,bar)";
AMQP.BasicProperties.Builder builderBasicProperties = new
AMQP.BasicProperties.Builder();
builderBasicProperties.headers(headersMap);
@@ -259,8 +260,7 @@ public class ConsumeAMQPTest {
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers");
- Map<String, String> properties = convertStringToMap(headers,"|");
- assertEquals(headersMap,properties);
+ assertEquals(EXPECTED_RESULT, headers);
}
}
@@ -271,10 +271,14 @@ public class ConsumeAMQPTest {
final Map<String, Object> headersMap = new HashMap<>();
headersMap.put("key1","bar");
headersMap.put("key2","bar2");
+ headersMap.put("key3","");
+ headersMap.put("key4", null);
+ final String EXPECTED_RESULT = "{key1=bar,key2=bar2,key3=,key4}";
AMQP.BasicProperties.Builder builderBasicProperties = new
AMQP.BasicProperties.Builder();
builderBasicProperties.headers(headersMap);
+
final Connection connection = new
TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection,
mock(ComponentLog.class))) {
@@ -289,22 +293,10 @@ public class ConsumeAMQPTest {
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
String headers = successFF.getAttribute("amqp$headers");
- Map<String, String> properties =
convertStringToMap(headers.substring(1,headers.length()-1),",");
- assertEquals(headersMap,properties);
+ assertEquals(EXPECTED_RESULT, headers);
}
}
-
- private Map<String,String> convertStringToMap(String map,String
splitCharacter){
- Map<String, String> headers = new HashMap<>();
- String[] pairs =
map.split(Pattern.quote(String.valueOf(splitCharacter)));
- for (String pair : pairs) {
- String[] keyValue = pair.split("=", 2);
- assertEquals(2,keyValue.length);
- headers.put(keyValue[0].trim(), keyValue[1].trim());
- }
- return headers;
- }
private TestRunner initTestRunner(ConsumeAMQP proc) {
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");