pvillard31 commented on code in PR #11242:
URL: https://github.com/apache/nifi/pull/11242#discussion_r3252716021


##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -220,7 +226,7 @@ public void migrateProperties(final PropertyConfiguration 
config) {
      * Extracts contents of the {@link FlowFile} as byte array.
      */
     private byte[] extractMessage(final FlowFile flowFile, ProcessSession 
session) {
-        final byte[] messageContent = new byte[(int) flowFile.getSize()];
+        final byte[] messageContent = new 
byte[Math.toIntExact(flowFile.getSize())];

Review Comment:
   is `Math.toIntExact` required since we filter out larger flowfiles before?



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -317,9 +323,18 @@ private Map<String, Object> 
validateAMQPHeaderProperty(final String amqpPropValu
         for (String strEntry : strEntries) {
             final String[] kv = strEntry.split("=", -1); // without using 
limit, trailing delimiter would be ignored
             if (kv.length == 2) {
-                headers.put(kv[0].trim(), kv[1].trim());
+                final String key = kv[0].trim();
+                if (key.isEmpty()) {
+                    getLogger().warn("Ignoring AMQP header property with empty 
key: {}", strEntry);
+                    continue;
+                }
+                headers.put(key, kv[1].trim());
             } else if (kv.length == 1) {
-                headers.put(kv[0].trim(), null);
+                final String key = kv[0].trim();
+                if (key.isEmpty()) {
+                    continue;
+                }
+                headers.put(key, null);

Review Comment:
   Should the two empty-key branches log at the same level (both warn, or both 
silent), and could the trim plus empty check be extracted into a small helper 
so the behavior is identical for key=value and bare-key entries?



##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -166,6 +166,12 @@ protected void processResource(final Connection 
connection, final AMQPPublisher
             return;
         }
 
+        if (flowFile.getSize() > Integer.MAX_VALUE) {
+            getLogger().error("FlowFile {} with size {} bytes is too large to 
publish as an AMQP message", flowFile, flowFile.getSize());
+            session.transfer(session.penalize(flowFile), REL_FAILURE);

Review Comment:
   we usually don't penalize when sending into the failure relationship unless 
we believe that the failure relationship is used as a self-loop on the 
processor (but this is not a good practice in terms of flow design given that 
we have the retry at framework level)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to