exceptionfactory commented on code in PR #7652:
URL: https://github.com/apache/nifi/pull/7652#discussion_r1379386210
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -68,15 +72,24 @@
@WritesAttribute(attribute = "amqp$exchange", description = "The exchange
from which AMQP Message was received")
})
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
+
private static final String ATTRIBUTES_PREFIX = "amqp$";
+ public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
+
+ public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING =
new AllowableValue("Comma Separated String", "Comma Separated String",
Review Comment:
```suggestion
public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING
= new AllowableValue("Comma-Separated String", "Comma-Separated String",
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -68,15 +72,24 @@
@WritesAttribute(attribute = "amqp$exchange", description = "The exchange
from which AMQP Message was received")
})
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
+
private static final String ATTRIBUTES_PREFIX = "amqp$";
+ public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
+
+ public static final AllowableValue HEADERS_FORMAT_COMMA_SEPARATED_STRING =
new AllowableValue("Comma Separated String", "Comma Separated String",
+ "Put all headers as a string with the specified separator in the
attribute 'amqp$headers'.");
+ public static final AllowableValue HEADERS_FORMAT_JSON_STRING = new
AllowableValue("JSON String", "JSON String",
+ "Format all headers as JSON string and output in the attribute
'amqp$headers'. It will include keys with null value as well.");
+ public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new
AllowableValue("Flow file attributes", "Flow file attributes",
Review Comment:
```suggestion
public static final AllowableValue HEADERS_FORMAT_ATTRIBUTES = new
AllowableValue("FlowFile Attributes", "FlowFile Attributes",
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java:
##########
@@ -156,13 +158,83 @@ public void
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
TestRunner runner = initTestRunner(proc);
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
}
}
+ @Test
+ public void
validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess()
throws Exception {
+ final Map<String, List<String>> routingMap =
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
+ final Map<String, Object> headersMap = new HashMap<>();
+ headersMap.put("foo1", "bar,bar");
+ headersMap.put("foo2", "bar,bar");
+ headersMap.put("foo3", "null");
+ headersMap.put("foo4", null);
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode EXPECTED_JSON = objectMapper.valueToTree(headersMap);
+
+ AMQP.BasicProperties.Builder builderBasicProperties = new
AMQP.BasicProperties.Builder();
+ builderBasicProperties.headers(headersMap);
+
+ final Connection connection = new
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+ try (AMQPPublisher sender = new AMQPPublisher(connection,
mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), builderBasicProperties.build(),
"key1", "myExchange");
+
+ ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+ TestRunner runner = initTestRunner(proc);
+ runner.setProperty(ConsumeAMQP.HEADER_FORMAT,
ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
+ runner.run();
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ assertNotNull(successFF);
+ successFF.assertAttributeEquals("amqp$routingKey", "key1");
+ successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+ String headers = successFF.getAttribute("amqp$headers");
+ JsonNode jsonNode = objectMapper.readTree(headers);
+ assertEquals(EXPECTED_JSON, jsonNode);
Review Comment:
```suggestion
assertEquals(expectedJson, jsonNode);
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java:
##########
@@ -156,13 +158,83 @@ public void
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
TestRunner runner = initTestRunner(proc);
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
}
}
+ @Test
+ public void
validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess()
throws Exception {
+ final Map<String, List<String>> routingMap =
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
+ final Map<String, Object> headersMap = new HashMap<>();
+ headersMap.put("foo1", "bar,bar");
+ headersMap.put("foo2", "bar,bar");
+ headersMap.put("foo3", "null");
+ headersMap.put("foo4", null);
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode EXPECTED_JSON = objectMapper.valueToTree(headersMap);
+
+ AMQP.BasicProperties.Builder builderBasicProperties = new
AMQP.BasicProperties.Builder();
+ builderBasicProperties.headers(headersMap);
+
+ final Connection connection = new
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+ try (AMQPPublisher sender = new AMQPPublisher(connection,
mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), builderBasicProperties.build(),
"key1", "myExchange");
+
+ ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+ TestRunner runner = initTestRunner(proc);
+ runner.setProperty(ConsumeAMQP.HEADER_FORMAT,
ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
+ runner.run();
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ assertNotNull(successFF);
+ successFF.assertAttributeEquals("amqp$routingKey", "key1");
+ successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+ String headers = successFF.getAttribute("amqp$headers");
+ JsonNode jsonNode = objectMapper.readTree(headers);
+ assertEquals(EXPECTED_JSON, jsonNode);
+ }
+ }
+
+ @Test
+ public void
validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAndTransferToSuccess()
throws Exception {
+ final Map<String, List<String>> routingMap =
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
+ final Map<String,Object> expectedHeadersMap = new HashMap<>();
+ expectedHeadersMap.put("foo1", "bar,bar");
+ expectedHeadersMap.put("foo2", "bar,bar");
+ expectedHeadersMap.put("foo3", "null");
+ final Map<String, Object> headersMap = new
HashMap<>(expectedHeadersMap);
+ headersMap.put("foo4", null);
+
+ final String HEADER_PREFIX = "test.header";
Review Comment:
```suggestion
final String headerPrefix = "test.header";
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -170,8 +208,8 @@ protected void processResource(final Connection connection,
final AMQPConsumer c
final BasicProperties amqpProperties = response.getProps();
final Envelope envelope = response.getEnvelope();
- final Map<String, String> attributes =
buildAttributes(amqpProperties, envelope,
context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
- context.getProperty(HEADER_SEPARATOR).toString());
+ final Map<String, String> attributes =
buildAttributes(amqpProperties, envelope,
context.getProperty(HEADER_FORMAT).toString(),
context.getProperty(HEADER_KEY_PREFIX).toString(),
Review Comment:
It would be helpful to declare the values for `headerFormat` and
`headerKeyPrefix`. In addition, `PropertyValue.getValue()` should be used
instead of `PropertyValue.toString()`:
```suggestion
final String headerFormat =
context.getProperty(HEADER_FORMAT).getValue();
final String headerKeyPrefix =
context.getProperty(HEADER_KEY_PREFIX).getValue();
final Map<String, String> attributes =
buildAttributes(amqpProperties, envelope, headerFormat, headerKeyPrefix,
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -99,22 +112,43 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
.defaultValue("10")
.required(true)
.build();
+
+ public static final PropertyDescriptor HEADER_FORMAT = new
PropertyDescriptor.Builder()
+ .name("header.format")
+ .displayName("Header Output Format")
+ .description("Defines how to output headers from the received message")
+ .allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING,
HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
+ .defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
+ .required(true)
+ .build();
+ public static final PropertyDescriptor HEADER_KEY_PREFIX = new
PropertyDescriptor.Builder()
+ .name("header.key.prefix")
+ .displayName("Header Key Prefix")
+ .description("Text to be prefixed to header keys as the are added to
the flowfile attributes. Processor will append '.' to the value")
Review Comment:
```suggestion
.description("Text to be prefixed to header keys as the are added to
the FlowFile attributes. Processor will append '.' to the value of this
property")
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -99,22 +112,43 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
.defaultValue("10")
.required(true)
.build();
+
+ public static final PropertyDescriptor HEADER_FORMAT = new
PropertyDescriptor.Builder()
+ .name("header.format")
+ .displayName("Header Output Format")
+ .description("Defines how to output headers from the received message")
+ .allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING,
HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
+ .defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
+ .required(true)
+ .build();
+ public static final PropertyDescriptor HEADER_KEY_PREFIX = new
PropertyDescriptor.Builder()
+ .name("header.key.prefix")
+ .displayName("Header Key Prefix")
+ .description("Text to be prefixed to header keys as the are added to
the flowfile attributes. Processor will append '.' to the value")
+ .defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor HEADER_SEPARATOR = new
PropertyDescriptor.Builder()
- .name("header.separator")
- .displayName("Header Separator")
- .description("The character that is used to separate key-value for
header in String. The value must only one character."
- + "Otherwise you will get an error message")
- .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
- .defaultValue(",")
- .required(false)
- .build();
+ .name("header.separator")
+ .displayName("Header Separator")
+ .description("The character that is used to separate key-value for
header in String. The value must only one character."
+ + "Otherwise you will get an error message")
Review Comment:
The use of `you` should be avoided, and invalid values generally produce
errors, so this can be removed.
```suggestion
)
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java:
##########
@@ -156,13 +158,83 @@ public void
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
TestRunner runner = initTestRunner(proc);
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
}
}
+ @Test
+ public void
validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess()
throws Exception {
+ final Map<String, List<String>> routingMap =
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
+ final Map<String, Object> headersMap = new HashMap<>();
+ headersMap.put("foo1", "bar,bar");
+ headersMap.put("foo2", "bar,bar");
+ headersMap.put("foo3", "null");
+ headersMap.put("foo4", null);
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode EXPECTED_JSON = objectMapper.valueToTree(headersMap);
Review Comment:
Local variables should be camelCased.
```suggestion
JsonNode expectedJson = objectMapper.valueToTree(headersMap);
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java:
##########
@@ -99,22 +112,43 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
.defaultValue("10")
.required(true)
.build();
+
+ public static final PropertyDescriptor HEADER_FORMAT = new
PropertyDescriptor.Builder()
+ .name("header.format")
+ .displayName("Header Output Format")
+ .description("Defines how to output headers from the received message")
+ .allowableValues(HEADERS_FORMAT_COMMA_SEPARATED_STRING,
HEADERS_FORMAT_JSON_STRING, HEADERS_FORMAT_ATTRIBUTES)
+ .defaultValue(HEADERS_FORMAT_COMMA_SEPARATED_STRING.getValue())
+ .required(true)
+ .build();
+ public static final PropertyDescriptor HEADER_KEY_PREFIX = new
PropertyDescriptor.Builder()
+ .name("header.key.prefix")
+ .displayName("Header Key Prefix")
+ .description("Text to be prefixed to header keys as the are added to
the flowfile attributes. Processor will append '.' to the value")
+ .defaultValue(DEFAULT_HEADERS_KEY_PREFIX)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dependsOn(HEADER_FORMAT, HEADERS_FORMAT_ATTRIBUTES)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor HEADER_SEPARATOR = new
PropertyDescriptor.Builder()
- .name("header.separator")
- .displayName("Header Separator")
- .description("The character that is used to separate key-value for
header in String. The value must only one character."
- + "Otherwise you will get an error message")
- .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
- .defaultValue(",")
- .required(false)
- .build();
+ .name("header.separator")
+ .displayName("Header Separator")
+ .description("The character that is used to separate key-value for
header in String. The value must only one character."
Review Comment:
```suggestion
.description("The character that is used to separate key-value for
header in String. The value must be only one character."
```
##########
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java:
##########
@@ -156,13 +158,83 @@ public void
validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
TestRunner runner = initTestRunner(proc);
runner.run();
- final MockFlowFile successFF =
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
}
}
+ @Test
+ public void
validateHeaderWithJsonStringForHeaderFormatParameterConsumeAndTransferToSuccess()
throws Exception {
+ final Map<String, List<String>> routingMap =
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
+ final Map<String, Object> headersMap = new HashMap<>();
+ headersMap.put("foo1", "bar,bar");
+ headersMap.put("foo2", "bar,bar");
+ headersMap.put("foo3", "null");
+ headersMap.put("foo4", null);
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode EXPECTED_JSON = objectMapper.valueToTree(headersMap);
+
+ AMQP.BasicProperties.Builder builderBasicProperties = new
AMQP.BasicProperties.Builder();
+ builderBasicProperties.headers(headersMap);
+
+ final Connection connection = new
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+ try (AMQPPublisher sender = new AMQPPublisher(connection,
mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), builderBasicProperties.build(),
"key1", "myExchange");
+
+ ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+ TestRunner runner = initTestRunner(proc);
+ runner.setProperty(ConsumeAMQP.HEADER_FORMAT,
ConsumeAMQP.HEADERS_FORMAT_JSON_STRING);
+ runner.run();
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ assertNotNull(successFF);
+ successFF.assertAttributeEquals("amqp$routingKey", "key1");
+ successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+ String headers = successFF.getAttribute("amqp$headers");
+ JsonNode jsonNode = objectMapper.readTree(headers);
+ assertEquals(EXPECTED_JSON, jsonNode);
+ }
+ }
+
+ @Test
+ public void
validateHeaderWithFlowFileAttributeForHeaderFormatParameterConsumeAndTransferToSuccess()
throws Exception {
+ final Map<String, List<String>> routingMap =
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
+ final Map<String,Object> expectedHeadersMap = new HashMap<>();
+ expectedHeadersMap.put("foo1", "bar,bar");
+ expectedHeadersMap.put("foo2", "bar,bar");
+ expectedHeadersMap.put("foo3", "null");
+ final Map<String, Object> headersMap = new
HashMap<>(expectedHeadersMap);
+ headersMap.put("foo4", null);
+
+ final String HEADER_PREFIX = "test.header";
+
+ AMQP.BasicProperties.Builder builderBasicProperties = new
AMQP.BasicProperties.Builder();
+ builderBasicProperties.headers(headersMap);
+
+ final Connection connection = new
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+ try (AMQPPublisher sender = new AMQPPublisher(connection,
mock(ComponentLog.class))) {
+ sender.publish("hello".getBytes(), builderBasicProperties.build(),
"key1", "myExchange");
+
+ ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+ TestRunner runner = initTestRunner(proc);
+ runner.setProperty(ConsumeAMQP.HEADER_FORMAT,
ConsumeAMQP.HEADERS_FORMAT_ATTRIBUTES);
+ runner.setProperty(ConsumeAMQP.HEADER_KEY_PREFIX,HEADER_PREFIX);
+ runner.run();
+ final MockFlowFile successFF =
runner.getFlowFilesForRelationship(ConsumeAMQP.REL_SUCCESS).get(0);
+ assertNotNull(successFF);
+ successFF.assertAttributeEquals("amqp$routingKey", "key1");
+ successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+ successFF.assertAttributeNotExists("amqp$headers");
+ expectedHeadersMap.forEach((key, value) ->{
+
successFF.assertAttributeEquals(HEADER_PREFIX+"."+key,value.toString());
Review Comment:
The spacing should be adjusted.
```suggestion
successFF.assertAttributeEquals(HEADER_PREFIX + "." + key,
value.toString());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]