This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ea4e6397f NIFI-14545 - Remove casting for PublishKafka to 
BufferedInputStream
4ea4e6397f is described below

commit 4ea4e6397f7d27e8b90613d2cee5543c6ef9e5e2
Author: Jordan Sammut <[email protected]>
AuthorDate: Thu May 8 13:16:28 2025 +0200

    NIFI-14545 - Remove casting for PublishKafka to BufferedInputStream
---
 .../processors/PublishKafkaLargePayloadIT.java     | 139 +++++++++++++++++++++
 .../apache/nifi/kafka/processors/PublishKafka.java |   5 +-
 2 files changed, 141 insertions(+), 3 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaLargePayloadIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaLargePayloadIT.java
new file mode 100644
index 0000000000..c0a22a9cae
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaLargePayloadIT.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@TestMethodOrder(MethodOrderer.MethodName.class)
+public class PublishKafkaLargePayloadIT extends AbstractPublishKafkaIT {
+    private static final String TEST_KEY_ATTRIBUTE = "my-key";
+    private static final String TEST_KEY_ATTRIBUTE_EL = "${my-key}";
+    private static final String TEST_KEY_VALUE = "some-key-value";
+
+    private static final int EXPECTED_RECORD_COUNT = 20000; // Expect 20,000 
records
+    private static final int MAX_MESSAGE_SIZE = 2 * 1024 * 1024; // 2MB
+    private static final byte[] LARGE_SAMPLE_INPUT = new 
byte[MAX_MESSAGE_SIZE];
+
+    @Test
+    public void test_1_KafkaTestContainerProduceOneLargeFlowFile() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(PublishKafka.class);
+
+        final JsonTreeReader jsonTreeReader = new JsonTreeReader();
+
+        runner.addControllerService("jsonTreeReader", jsonTreeReader);
+        runner.setProperty(jsonTreeReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+        runner.enableControllerService(jsonTreeReader);
+
+        final JsonRecordSetWriter jsonRecordSetWriter = new 
JsonRecordSetWriter();
+
+        runner.addControllerService("jsonRecordSetWriter", 
jsonRecordSetWriter);
+        runner.setProperty(jsonRecordSetWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        runner.setProperty(jsonRecordSetWriter, "output-grouping", 
"output-oneline");
+        runner.enableControllerService(jsonRecordSetWriter);
+
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
+        runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+        runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
+        runner.setProperty(PublishKafka.KAFKA_KEY, TEST_KEY_ATTRIBUTE_EL);
+        runner.setProperty(PublishKafka.RECORD_READER, "jsonTreeReader");
+        runner.setProperty(PublishKafka.RECORD_WRITER, "jsonRecordSetWriter");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a1", "valueA1");
+        attributes.put("b1", "valueB1");
+        attributes.put(TEST_KEY_ATTRIBUTE, TEST_KEY_VALUE);
+
+        populateSampleInput();
+        runner.enqueue(LARGE_SAMPLE_INPUT, attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PublishKafka.REL_SUCCESS).getFirst();
+        assertEquals(Integer.toString(EXPECTED_RECORD_COUNT), 
flowFile.getAttribute("msg.count"));
+    }
+
+    @Test
+    public void test_2_KafkaTestContainerConsumeLargeFlowFileBatch() {
+        final Properties kafkaConsumerProperties = 
getKafkaConsumerProperties();
+        kafkaConsumerProperties.setProperty(MAX_POLL_RECORDS_CONFIG, 
Integer.toString(EXPECTED_RECORD_COUNT * 2));
+
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerProperties)) {
+            
consumer.subscribe(Collections.singletonList(getClass().getName()));
+
+            final List<ConsumerRecord<String, String>> records = new 
ArrayList<>();
+            for (int i = 0; (i < 5); ++i) {
+                final ConsumerRecords<String, String> recordsPoll = 
consumer.poll(Duration.ofSeconds(1));
+                recordsPoll.forEach(records::add);
+            }
+            assertEquals(EXPECTED_RECORD_COUNT, records.size());
+        }
+    }
+
+    // Create sample data with multiple records
+    private static void populateSampleInput() {
+        StringBuilder sb = new StringBuilder();
+        int recordCount = EXPECTED_RECORD_COUNT;
+        int approximateRecordSize = MAX_MESSAGE_SIZE / recordCount;
+
+        for (int i = 0; i < recordCount; i++) {
+            sb.append("{\"key\": \"").append(i).append("\",\"value\":\"");
+            int payloadSize = approximateRecordSize - 30;
+            for (int j = 0; j < payloadSize; j++) {
+                sb.append((char) ('A' + (j % 26)));
+            }
+            sb.append("\"}");
+            if (i < recordCount - 1) {
+                sb.append("\n");
+            }
+            if (sb.length() >= MAX_MESSAGE_SIZE - 100) {
+                break;
+            }
+        }
+
+        byte[] stringBytes = sb.toString().getBytes();
+
+        Arrays.fill(LARGE_SAMPLE_INPUT, (byte) ' '); // Fill with spaces 
instead of NULL
+
+        int copyLength = Math.min(stringBytes.length, 
LARGE_SAMPLE_INPUT.length);
+        System.arraycopy(stringBytes, 0, LARGE_SAMPLE_INPUT, 0, copyLength);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
index 1777bd4504..c61dd0d2ea 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
@@ -72,7 +72,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 
-import java.io.BufferedInputStream;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -610,8 +609,8 @@ public class PublishKafka extends AbstractProcessor 
implements KafkaPublishCompo
 
         @Override
         public void process(final InputStream in) {
-            try (final InputStream is = new BufferedInputStream(in)) {
-                final Iterator<KafkaRecord> records = 
kafkaConverter.convert(attributes, is, inputLength);
+            try {
+                final Iterator<KafkaRecord> records = 
kafkaConverter.convert(attributes, in, inputLength);
                 producerService.send(records, publishContext);
             } catch (final Exception e) {
                 publishContext.setException(e); // on data pre-process 
failure, indicate this to controller service

Reply via email to