pvillard31 commented on code in PR #11242:
URL: https://github.com/apache/nifi/pull/11242#discussion_r3252716021
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -220,7 +226,7 @@ public void migrateProperties(final PropertyConfiguration
config) {
* Extracts contents of the {@link FlowFile} as byte array.
*/
private byte[] extractMessage(final FlowFile flowFile, ProcessSession
session) {
- final byte[] messageContent = new byte[(int) flowFile.getSize()];
+ final byte[] messageContent = new
byte[Math.toIntExact(flowFile.getSize())];
Review Comment:
is `Math.toIntExact` required since we filter out larger flowfiles before?
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -317,9 +323,18 @@ private Map<String, Object>
validateAMQPHeaderProperty(final String amqpPropValu
for (String strEntry : strEntries) {
final String[] kv = strEntry.split("=", -1); // without using
limit, trailing delimiter would be ignored
if (kv.length == 2) {
- headers.put(kv[0].trim(), kv[1].trim());
+ final String key = kv[0].trim();
+ if (key.isEmpty()) {
+ getLogger().warn("Ignoring AMQP header property with empty
key: {}", strEntry);
+ continue;
+ }
+ headers.put(key, kv[1].trim());
} else if (kv.length == 1) {
- headers.put(kv[0].trim(), null);
+ final String key = kv[0].trim();
+ if (key.isEmpty()) {
+ continue;
+ }
+ headers.put(key, null);
Review Comment:
Should the two empty-key branches log at the same level (both warn, or both
silent), and could the trim plus empty check be extracted into a small helper
so the behavior is identical for key=value and bare-key entries?
##########
nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java:
##########
@@ -166,6 +166,12 @@ protected void processResource(final Connection
connection, final AMQPPublisher
return;
}
+ if (flowFile.getSize() > Integer.MAX_VALUE) {
+ getLogger().error("FlowFile {} with size {} bytes is too large to
publish as an AMQP message", flowFile, flowFile.getSize());
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
Review Comment:
we usually don't penalize when sending into the failure relationship unless
we believe that the failure relationship is used as a self-loop on the
processor (but this is not a good practice in terms of flow design given that
we have the retry at framework level)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]