This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a3e9697fa NIFI-13738 - ConsumeAMQP - Ability to set the maximum body 
size of inbound (received) messages in bytes.
1a3e9697fa is described below

commit 1a3e9697fa2f443f24008ef28f681b15672d1b55
Author: HamidReza Ireh <[email protected]>
AuthorDate: Thu Mar 6 09:53:01 2025 +0330

    NIFI-13738 - ConsumeAMQP - Ability to set the maximum body size of inbound 
(received) messages in bytes.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9776.
---
 .../apache/nifi/amqp/processors/AbstractAMQPProcessor.java   | 12 +++++++++++-
 .../java/org/apache/nifi/amqp/processors/ConsumeAMQP.java    |  3 ++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
 
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index 77f16a7f27..41371d81d1 100644
--- 
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ 
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -43,9 +44,9 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.ssl.SSLContextProvider;
 
-
 /**
  * Base processor that uses RabbitMQ client API
  * (<a href="https://www.rabbitmq.com/api-guide.html";>Java Client API 
Guide</a>) to rendezvous with AMQP-based
@@ -138,6 +139,14 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> 
extends AbstractProce
             .allowableValues("true", "false")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
+    protected static final PropertyDescriptor MAX_INBOUND_MESSAGE_BODY_SIZE = 
new PropertyDescriptor.Builder()
+            .name("Max Inbound Message Body Size")
+            .description("Maximum body size of inbound (received) messages.")
+            .required(true)
+            .defaultValue("64 MB")
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
Integer.MAX_VALUE))
+            .build();
 
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
             BROKERS,
@@ -303,6 +312,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> 
extends AbstractProce
         final ConnectionFactory cf = new ConnectionFactory();
         
cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
         cf.setPassword(context.getProperty(PASSWORD).getValue());
+        
cf.setMaxInboundMessageBodySize(context.getProperty(MAX_INBOUND_MESSAGE_BODY_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue());
 
         final String vHost = 
context.getProperty(V_HOST).evaluateAttributeExpressions().getValue();
         if (vHost != null) {
diff --git 
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
 
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 84399ef66f..4dd6ffc7a3 100644
--- 
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ 
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -171,7 +171,8 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
               HEADER_FORMAT,
               HEADER_KEY_PREFIX,
               HEADER_SEPARATOR,
-              REMOVE_CURLY_BRACES
+              REMOVE_CURLY_BRACES,
+              MAX_INBOUND_MESSAGE_BODY_SIZE
           ), getCommonPropertyDescriptors().stream()
     ).toList();
 

Reply via email to