This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1a3e9697fa NIFI-13738 - ConsumeAMQP - Ability to set the maximum body
size of inbound (received) messages in bytes.
1a3e9697fa is described below
commit 1a3e9697fa2f443f24008ef28f681b15672d1b55
Author: HamidReza Ireh <[email protected]>
AuthorDate: Thu Mar 6 09:53:01 2025 +0330
NIFI-13738 - ConsumeAMQP - Ability to set the maximum body size of inbound
(received) messages in bytes.
Signed-off-by: Pierre Villard <[email protected]>
This closes #9776.
---
.../apache/nifi/amqp/processors/AbstractAMQPProcessor.java | 12 +++++++++++-
.../java/org/apache/nifi/amqp/processors/ConsumeAMQP.java | 3 ++-
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index 77f16a7f27..41371d81d1 100644
---
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
+
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -43,9 +44,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.ssl.SSLContextProvider;
-
/**
* Base processor that uses RabbitMQ client API
* (<a href="https://www.rabbitmq.com/api-guide.html">Java Client API
Guide</a>) to rendezvous with AMQP-based
@@ -138,6 +139,14 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker>
extends AbstractProce
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ protected static final PropertyDescriptor MAX_INBOUND_MESSAGE_BODY_SIZE =
new PropertyDescriptor.Builder()
+ .name("Max Inbound Message Body Size")
+ .description("Maximum body size of inbound (received) messages.")
+ .required(true)
+ .defaultValue("64 MB")
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.createDataSizeBoundsValidator(1,
Integer.MAX_VALUE))
+ .build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
BROKERS,
@@ -303,6 +312,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker>
extends AbstractProce
final ConnectionFactory cf = new ConnectionFactory();
cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
cf.setPassword(context.getProperty(PASSWORD).getValue());
+
cf.setMaxInboundMessageBodySize(context.getProperty(MAX_INBOUND_MESSAGE_BODY_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue());
final String vHost =
context.getProperty(V_HOST).evaluateAttributeExpressions().getValue();
if (vHost != null) {
diff --git
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 84399ef66f..4dd6ffc7a3 100644
---
a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++
b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -171,7 +171,8 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
HEADER_FORMAT,
HEADER_KEY_PREFIX,
HEADER_SEPARATOR,
- REMOVE_CURLY_BRACES
+ REMOVE_CURLY_BRACES,
+ MAX_INBOUND_MESSAGE_BODY_SIZE
), getCommonPropertyDescriptors().stream()
).toList();