[
https://issues.apache.org/jira/browse/NIFI-1686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213533#comment-15213533
]
ASF GitHub Bot commented on NIFI-1686:
--------------------------------------
Github user steveyh25 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/305#discussion_r57531501
--- Diff:
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
---
@@ -195,29 +193,85 @@ public void process(final InputStream in) throws
IOException {
* Extracts AMQP properties from the {@link FlowFile} attributes.
Attributes
* extracted from {@link FlowFile} are considered candidates for AMQP
* properties if their names are prefixed with
- * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
+ * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g.,
amqp$contentType=text/xml).
+ * The header property is an exception and requires a {@link Map}, so
should
+ * be passed in the following format: amqp$headers=key$value$key$value
etc.
*/
private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile
flowFile) {
Map<String, String> attributes = flowFile.getAttributes();
AMQP.BasicProperties.Builder builder = new
AMQP.BasicProperties.Builder();
for (Entry<String, String> attributeEntry : attributes.entrySet())
{
if
(attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
- String amqpPropName = attributeEntry.getKey().split("\\" +
AMQPUtils.AMQP_PROP_DELIMITER)[1];
+ String amqpPropName = attributeEntry.getKey();
String amqpPropValue = attributeEntry.getValue();
- try {
- if
(amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) {
- Method m =
builder.getClass().getDeclaredMethod(amqpPropName, String.class);
- m.invoke(builder, amqpPropValue);
- } else {
- getLogger().warn("Unrecogninsed AMQP property '" +
amqpPropName + "', will ignore.");
+
+ AMQPUtils.PropertyNames propertyNames =
AMQPUtils.PropertyNames.fromValue(amqpPropName);
+
+ if (propertyNames != null) {
+ try {
+ switch (propertyNames){
+ case CONTENT_TYPE:
+ builder.contentType(amqpPropValue);
+ break;
+ case CONTENT_ENCODING:
+ builder.contentEncoding(amqpPropValue);
+ break;
+ case HEADERS:
+ String[] s = amqpPropValue.split("\\" +
AMQPUtils.AMQP_PROP_DELIMITER);
+ Map<String, Object> headers = new
HashMap<>();
+
+ for (int i = 0; i < s.length ; i += 2){
+ if (i + 2 <= s.length){
+ headers.put(s[i], s[i + 1]);
+ }
+ }
+
+ builder.headers(headers);
+ break;
+ case DELIVERY_MODE:
+
builder.deliveryMode(Integer.parseInt(amqpPropValue));
+ break;
+ case PRIORITY:
+
builder.priority(Integer.parseInt(amqpPropValue));
+ break;
+ case CORRELATION_ID:
+ builder.correlationId(amqpPropValue);
+ break;
+ case REPLY_TO:
+ builder.replyTo(amqpPropValue);
+ break;
+ case EXPIRATION:
+ builder.expiration(amqpPropValue);
+ break;
+ case MESSAGE_ID:
+ builder.messageId(amqpPropValue);
+ break;
+ case TIMESTAMP:
+ builder.timestamp(new
SimpleDateFormat("yyyy/MM/dd HH:mm:ss").parse(amqpPropValue));
--- End diff --
1 & 2: These errors will be caught by the try/catch block, so if we are
going to fully validate each individual one the then we could remove the
try/catch? That seems like a lot more code for not-a-lot of benefit?
3: I was under the assumption that the date would be validated before this
step and adhere to a certain format anyway (since in my case the programmer was
passing the date in, so could adhere to the correct format), but in hindsight
that was probably a bit naive, since I guess flows could come from anywhere
with any date format that is out of the programmer's control. I will look into
the StringDate representation like you mentioned.
> NiFi is unable to populate over 1/4 of AMQP properties from flow properties
> ---------------------------------------------------------------------------
>
> Key: NIFI-1686
> URL: https://issues.apache.org/jira/browse/NIFI-1686
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 0.5.1
> Reporter: Stephen Harper
>
> When creating a flow (we used ListenHTTP, but this bug will affect all) that
> forwards on to a rabbit queue, org.apache.nifi.amqp.processors.PublishAMQP
> uses the method extractAmqpPropertiesFromFlowFile to populate the AMQP
> BasicProperties if the flow attributes match a certain format (i.e
> amqp$contentType=text/xml).
> The method in question uses reflection to find a matching method name from
> the AMQP.BasicProperties class, and tries to populate accordingly.
> This works fine for all properties that take a String argument - however
> there are some that don't (specifically, headers takes a Map<String, Object>,
> deliveryMode and priority take Integer, and timestamp takes a Date), and it
> is impossible to populate these values because the invocation assumes a
> String is required, and fails on line 210.
> Whatsmore, the comment underneath (line 215) states that "this should really
> never happen since it should be caught by the above IF" - however the author
> of the code mustn't have tested all cases because this error is consistently
> present when trying to forward flow attributes in over a quarter of the
> available amqp properties.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)