This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 383ca31d3a NIFI-14624 - PublishKafka; propagate max.request.size to 
producer
383ca31d3a is described below

commit 383ca31d3a8e9659732f71192225c06962a0ad94
Author: Paul Grey <[email protected]>
AuthorDate: Wed Jun 4 14:55:00 2025 -0400

    NIFI-14624 - PublishKafka; propagate max.request.size to producer
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9989.
---
 .../nifi/kafka/processors/AbstractKafkaBaseIT.java | 12 +++-
 .../processors/PublishKafkaOneLargePayloadIT.java  | 69 ++++++++++++++++++++
 .../processors/PublishKafkaTooLargePayloadIT.java  | 75 ++++++++++++++++++++++
 .../publish/PublishKafkaMultipleFFIT.java          | 10 +--
 .../kafka/service/Kafka3ConnectionService.java     |  2 +
 5 files changed, 162 insertions(+), 6 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
index 5325c0694f..f705bf31d8 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
@@ -32,12 +32,15 @@ import org.testcontainers.kafka.ConfluentKafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
 import java.time.Duration;
+import java.util.Map;
 import java.util.Properties;
 
 public abstract class AbstractKafkaBaseIT {
 
     protected static final String IMAGE_NAME = "confluentinc/cp-kafka:7.8.0"; 
// December 2024
 
+    protected static final Integer MESSAGE_MAX_BYTES = 2097152;
+
     private static final String DYNAMIC_PROPERTY_KEY_PUBLISH = 
"delivery.timeout.ms";
     private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
     private static final String DYNAMIC_PROPERTY_KEY_CONSUME = 
"fetch.max.wait.ms";
@@ -53,10 +56,17 @@ public abstract class AbstractKafkaBaseIT {
 
     // NIFI-11259 - single testcontainers Kafka instance needed for all module 
integration tests
     static {
-        kafkaContainer = new 
ConfluentKafkaContainer(DockerImageName.parse(IMAGE_NAME));
+        kafkaContainer = new 
ConfluentKafkaContainer(DockerImageName.parse(IMAGE_NAME))
+                .withEnv(getEnvironmentIntegration());
         kafkaContainer.start();
     }
 
+    private static Map<String, String> getEnvironmentIntegration() {
+        return Map.of(
+                "KAFKA_MESSAGE_MAX_BYTES", Integer.toString(MESSAGE_MAX_BYTES)
+        );
+    }
+
     protected static ObjectMapper objectMapper;
 
     @BeforeAll
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaOneLargePayloadIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaOneLargePayloadIT.java
new file mode 100644
index 0000000000..91413b9dc8
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaOneLargePayloadIT.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.Properties;
+
+/// Custom max message size specified in [#getEnvironmentIntegration()]
+@TestMethodOrder(MethodOrderer.MethodName.class)
+public class PublishKafkaOneLargePayloadIT extends AbstractPublishKafkaIT {
+
+    private static final int MESSAGE_SIZE = 1024 * 1024 * 3 / 2;
+
+    @Test
+    public void test_1_KafkaTestContainerProduceOneLargeFlowFile() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PublishKafka.class);
+
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
+        runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+        runner.setProperty(PublishKafka.MAX_REQUEST_SIZE, (MESSAGE_SIZE + 128) 
+ "B");
+
+        runner.enqueue(new byte[MESSAGE_SIZE]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).getFirst();
+        assertEquals(MESSAGE_SIZE, flowFile.getSize());
+    }
+
+    @Test
+    public void test_2_KafkaTestContainerConsumeLargeFlowFileBatch() {
+        final Properties kafkaConsumerProperties = 
getKafkaConsumerProperties();
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerProperties)) {
+            
consumer.subscribe(Collections.singletonList(getClass().getName()));
+            final ConsumerRecords<String, String> records = 
consumer.poll(DURATION_POLL);
+            assertEquals(1, records.count());
+            final ConsumerRecord<String, String> record = 
records.iterator().next();
+            assertEquals(MESSAGE_SIZE, record.value().length());
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTooLargePayloadIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTooLargePayloadIT.java
new file mode 100644
index 0000000000..c95298e0ce
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTooLargePayloadIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.LogMessage;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+/// Verify expected behavior on submission of "too large" record, as specified 
in [#getEnvironmentIntegration()]
+public class PublishKafkaTooLargePayloadIT extends AbstractPublishKafkaIT {
+
+    private static final int MESSAGE_SIZE = 1024 * 1024 * 5 / 2;
+    private static final int MESSAGE_SIZE_CONFIG = MESSAGE_SIZE + 128;  // 
extra space for Kafka internal buffer
+
+    @Test
+    public void testProduceLargeFlowFile_ClientFailure() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PublishKafka.class);
+
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
+        runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+
+        runner.enqueue(new byte[MESSAGE_SIZE]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
+
+        final List<LogMessage> errorMessages = 
runner.getLogger().getErrorMessages();
+        assertEquals(1, errorMessages.size());
+        final LogMessage logMessage = errorMessages.getFirst();
+        assertTrue(logMessage.getMsg().contains("IOException"));
+        assertTrue(Pattern.compile("max.message.size \\d+ 
exceeded").matcher(logMessage.getMsg()).find());
+    }
+
+    @Test
+    public void testProduceLargeFlowFile_ServerFailure() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PublishKafka.class);
+
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
+        runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+        runner.setProperty(PublishKafka.MAX_REQUEST_SIZE, MESSAGE_SIZE_CONFIG 
+ "B");
+
+        runner.enqueue(new byte[MESSAGE_SIZE]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
+
+        final List<LogMessage> errorMessages = 
runner.getLogger().getErrorMessages();
+        assertEquals(1, errorMessages.size());
+        final LogMessage logMessage = errorMessages.getFirst();
+        assertTrue(logMessage.getMsg().contains("RecordTooLargeException"));
+        assertTrue(logMessage.getMsg().contains("larger than the max message 
size the server will accept"));
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/PublishKafkaMultipleFFIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/PublishKafkaMultipleFFIT.java
index afb5813af0..789f873588 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/PublishKafkaMultipleFFIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/PublishKafkaMultipleFFIT.java
@@ -70,7 +70,7 @@ public class PublishKafkaMultipleFFIT extends 
AbstractPublishKafkaIT {
         runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
         runner.setProperty(PublishKafka.TRANSACTIONS_ENABLED, 
transactionality.toString());
 
-        runner.enqueue(new byte[1024 * 1280]);  // by default, NiFi maximum is 
1MB per record
+        runner.enqueue(new byte[MESSAGE_MAX_BYTES * 6 / 5]);
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
     }
@@ -86,9 +86,9 @@ public class PublishKafkaMultipleFFIT extends 
AbstractPublishKafkaIT {
         runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
         runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
         runner.setProperty(PublishKafka.TRANSACTIONS_ENABLED, 
transactionality.toString());
-        runner.setProperty(PublishKafka.MAX_REQUEST_SIZE, "2 MB");
+        runner.setProperty(PublishKafka.MAX_REQUEST_SIZE, "3 MB");
 
-        runner.enqueue(new byte[1024 * 1280]);  // by default, Kafka maximum 
is 1MB per record
+        runner.enqueue(new byte[MESSAGE_MAX_BYTES * 6 / 5]);
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
     }
@@ -104,7 +104,7 @@ public class PublishKafkaMultipleFFIT extends 
AbstractPublishKafkaIT {
         runner.setProperty(PublishKafka.FAILURE_STRATEGY, 
FailureStrategy.ROLLBACK.getValue());
 
         runner.enqueue(TEST_RECORD_VALUE);
-        runner.enqueue(new byte[1024 * 1280]);  // by default, max 1MB per 
record
+        runner.enqueue(new byte[MESSAGE_MAX_BYTES * 6 / 5]);
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
     }
@@ -120,7 +120,7 @@ public class PublishKafkaMultipleFFIT extends 
AbstractPublishKafkaIT {
         runner.setProperty(PublishKafka.FAILURE_STRATEGY, 
FailureStrategy.ROUTE_TO_FAILURE);
 
         runner.enqueue(TEST_RECORD_VALUE);
-        runner.enqueue(new byte[1024 * 1280]);  // by default, max 1MB per 
record
+        runner.enqueue(new byte[MESSAGE_MAX_BYTES * 6 / 5]);
         runner.run(2);
         runner.assertTransferCount(PublishKafka.REL_SUCCESS, 1);
         runner.assertTransferCount(PublishKafka.REL_FAILURE, 1);
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 3d7f1dfdf5..cb19eae037 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -352,6 +352,8 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
                 && 
!partitionClass.equals("org.apache.kafka.clients.producer.internals.DefaultPartitioner"))
 {
             properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
partitionClass);
         }
+        // because this property is always set from the processor properties, 
and has a default, we set it here unconditionally
+        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
producerConfiguration.getMaxRequestSize());
 
         return new Kafka3ProducerService(properties, serviceConfiguration, 
producerConfiguration);
     }

Reply via email to