This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 383ca31d3a NIFI-14624 - PublishKafka; propagate max.request.size to
producer
383ca31d3a is described below
commit 383ca31d3a8e9659732f71192225c06962a0ad94
Author: Paul Grey <[email protected]>
AuthorDate: Wed Jun 4 14:55:00 2025 -0400
NIFI-14624 - PublishKafka; propagate max.request.size to producer
Signed-off-by: Pierre Villard <[email protected]>
This closes #9989.
---
.../nifi/kafka/processors/AbstractKafkaBaseIT.java | 12 +++-
.../processors/PublishKafkaOneLargePayloadIT.java | 69 ++++++++++++++++++++
.../processors/PublishKafkaTooLargePayloadIT.java | 75 ++++++++++++++++++++++
.../publish/PublishKafkaMultipleFFIT.java | 10 +--
.../kafka/service/Kafka3ConnectionService.java | 2 +
5 files changed, 162 insertions(+), 6 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
index 5325c0694f..f705bf31d8 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java
@@ -32,12 +32,15 @@ import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
+import java.util.Map;
import java.util.Properties;
public abstract class AbstractKafkaBaseIT {
protected static final String IMAGE_NAME = "confluentinc/cp-kafka:7.8.0";
// December 2024
+ protected static final Integer MESSAGE_MAX_BYTES = 2097152;
+
private static final String DYNAMIC_PROPERTY_KEY_PUBLISH =
"delivery.timeout.ms";
private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
private static final String DYNAMIC_PROPERTY_KEY_CONSUME =
"fetch.max.wait.ms";
@@ -53,10 +56,17 @@ public abstract class AbstractKafkaBaseIT {
// NIFI-11259 - single testcontainers Kafka instance needed for all module
integration tests
static {
- kafkaContainer = new
ConfluentKafkaContainer(DockerImageName.parse(IMAGE_NAME));
+ kafkaContainer = new
ConfluentKafkaContainer(DockerImageName.parse(IMAGE_NAME))
+ .withEnv(getEnvironmentIntegration());
kafkaContainer.start();
}
+ private static Map<String, String> getEnvironmentIntegration() {
+ return Map.of(
+ "KAFKA_MESSAGE_MAX_BYTES", Integer.toString(MESSAGE_MAX_BYTES)
+ );
+ }
+
protected static ObjectMapper objectMapper;
@BeforeAll
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaOneLargePayloadIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaOneLargePayloadIT.java
new file mode 100644
index 0000000000..91413b9dc8
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaOneLargePayloadIT.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.Properties;
+
+/// Custom max message size specified in [#getEnvironmentIntegration()]
+@TestMethodOrder(MethodOrderer.MethodName.class)
+public class PublishKafkaOneLargePayloadIT extends AbstractPublishKafkaIT {
+
+ private static final int MESSAGE_SIZE = 1024 * 1024 * 3 / 2;
+
+ @Test
+ public void test_1_KafkaTestContainerProduceOneLargeFlowFile() throws
InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(PublishKafka.class);
+
+ runner.setValidateExpressionUsage(false);
+ runner.setProperty(PublishKafka.CONNECTION_SERVICE,
addKafkaConnectionService(runner));
+ runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+ runner.setProperty(PublishKafka.MAX_REQUEST_SIZE, (MESSAGE_SIZE + 128)
+ "B");
+
+ runner.enqueue(new byte[MESSAGE_SIZE]);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).getFirst();
+ assertEquals(MESSAGE_SIZE, flowFile.getSize());
+ }
+
+ @Test
+ public void test_2_KafkaTestContainerConsumeLargeFlowFileBatch() {
+ final Properties kafkaConsumerProperties =
getKafkaConsumerProperties();
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerProperties)) {
+
consumer.subscribe(Collections.singletonList(getClass().getName()));
+ final ConsumerRecords<String, String> records =
consumer.poll(DURATION_POLL);
+ assertEquals(1, records.count());
+ final ConsumerRecord<String, String> record =
records.iterator().next();
+ assertEquals(MESSAGE_SIZE, record.value().length());
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTooLargePayloadIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTooLargePayloadIT.java
new file mode 100644
index 0000000000..c95298e0ce
--- /dev/null
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaTooLargePayloadIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.LogMessage;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+/// Verify expected behavior on submission of "too large" record, as specified
in [#getEnvironmentIntegration()]
+public class PublishKafkaTooLargePayloadIT extends AbstractPublishKafkaIT {
+
+ private static final int MESSAGE_SIZE = 1024 * 1024 * 5 / 2;
+ private static final int MESSAGE_SIZE_CONFIG = MESSAGE_SIZE + 128; //
extra space for Kafka internal buffer
+
+ @Test
+ public void testProduceLargeFlowFile_ClientFailure() throws
InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(PublishKafka.class);
+
+ runner.setValidateExpressionUsage(false);
+ runner.setProperty(PublishKafka.CONNECTION_SERVICE,
addKafkaConnectionService(runner));
+ runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+
+ runner.enqueue(new byte[MESSAGE_SIZE]);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
+
+ final List<LogMessage> errorMessages =
runner.getLogger().getErrorMessages();
+ assertEquals(1, errorMessages.size());
+ final LogMessage logMessage = errorMessages.getFirst();
+ assertTrue(logMessage.getMsg().contains("IOException"));
+ assertTrue(Pattern.compile("max.message.size \\d+
exceeded").matcher(logMessage.getMsg()).find());
+ }
+
+ @Test
+ public void testProduceLargeFlowFile_ServerFailure() throws
InitializationException {
+ final TestRunner runner =
TestRunners.newTestRunner(PublishKafka.class);
+
+ runner.setValidateExpressionUsage(false);
+ runner.setProperty(PublishKafka.CONNECTION_SERVICE,
addKafkaConnectionService(runner));
+ runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+ runner.setProperty(PublishKafka.MAX_REQUEST_SIZE, MESSAGE_SIZE_CONFIG
+ "B");
+
+ runner.enqueue(new byte[MESSAGE_SIZE]);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
+
+ final List<LogMessage> errorMessages =
runner.getLogger().getErrorMessages();
+ assertEquals(1, errorMessages.size());
+ final LogMessage logMessage = errorMessages.getFirst();
+ assertTrue(logMessage.getMsg().contains("RecordTooLargeException"));
+ assertTrue(logMessage.getMsg().contains("larger than the max message
size the server will accept"));
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/PublishKafkaMultipleFFIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/PublishKafkaMultipleFFIT.java
index afb5813af0..789f873588 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/PublishKafkaMultipleFFIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/PublishKafkaMultipleFFIT.java
@@ -70,7 +70,7 @@ public class PublishKafkaMultipleFFIT extends
AbstractPublishKafkaIT {
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.TRANSACTIONS_ENABLED,
transactionality.toString());
- runner.enqueue(new byte[1024 * 1280]); // by default, NiFi maximum is
1MB per record
+ runner.enqueue(new byte[MESSAGE_MAX_BYTES * 6 / 5]);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
}
@@ -86,9 +86,9 @@ public class PublishKafkaMultipleFFIT extends
AbstractPublishKafkaIT {
runner.setProperty(PublishKafka.CONNECTION_SERVICE,
addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.TRANSACTIONS_ENABLED,
transactionality.toString());
- runner.setProperty(PublishKafka.MAX_REQUEST_SIZE, "2 MB");
+ runner.setProperty(PublishKafka.MAX_REQUEST_SIZE, "3 MB");
- runner.enqueue(new byte[1024 * 1280]); // by default, Kafka maximum
is 1MB per record
+ runner.enqueue(new byte[MESSAGE_MAX_BYTES * 6 / 5]);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
}
@@ -104,7 +104,7 @@ public class PublishKafkaMultipleFFIT extends
AbstractPublishKafkaIT {
runner.setProperty(PublishKafka.FAILURE_STRATEGY,
FailureStrategy.ROLLBACK.getValue());
runner.enqueue(TEST_RECORD_VALUE);
- runner.enqueue(new byte[1024 * 1280]); // by default, max 1MB per
record
+ runner.enqueue(new byte[MESSAGE_MAX_BYTES * 6 / 5]);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
}
@@ -120,7 +120,7 @@ public class PublishKafkaMultipleFFIT extends
AbstractPublishKafkaIT {
runner.setProperty(PublishKafka.FAILURE_STRATEGY,
FailureStrategy.ROUTE_TO_FAILURE);
runner.enqueue(TEST_RECORD_VALUE);
- runner.enqueue(new byte[1024 * 1280]); // by default, max 1MB per
record
+ runner.enqueue(new byte[MESSAGE_MAX_BYTES * 6 / 5]);
runner.run(2);
runner.assertTransferCount(PublishKafka.REL_SUCCESS, 1);
runner.assertTransferCount(PublishKafka.REL_FAILURE, 1);
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index 3d7f1dfdf5..cb19eae037 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -352,6 +352,8 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
&&
!partitionClass.equals("org.apache.kafka.clients.producer.internals.DefaultPartitioner"))
{
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
partitionClass);
}
+ // because this property is always set from the processor properties,
and has a default, we set it here unconditionally
+ properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
producerConfiguration.getMaxRequestSize());
return new Kafka3ProducerService(properties, serviceConfiguration,
producerConfiguration);
}