This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 276cc5765b NIFI-8326: Send records as individual messages in Kafka 
RecordSinks
276cc5765b is described below

commit 276cc5765b55dc6c9d1cb386bb6b4e98a09f1053
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Mar 16 10:25:44 2021 -0400

    NIFI-8326: Send records as individual messages in Kafka RecordSinks
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4901.
---
 .../nifi/stream/io/ByteCountingOutputStream.java   |  4 ++
 .../stream/io/ByteCountingOutputStreamTest.java    | 39 +++++++++++
 .../record/sink/kafka/KafkaRecordSink_2_6.java     | 81 ++++++++++++++--------
 .../record/sink/kafka/TestKafkaRecordSink_2_6.java |  9 ++-
 4 files changed, 100 insertions(+), 33 deletions(-)

diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index 47f236db0f..55dd82dc15 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -64,6 +64,10 @@ public class ByteCountingOutputStream extends OutputStream {
         out.close();
     }
 
+    public void reset() {
+        bytesWritten = 0;
+    }
+
     public OutputStream getWrappedStream() {
         return out;
     }
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingOutputStreamTest.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingOutputStreamTest.java
new file mode 100644
index 0000000000..9fb3525062
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingOutputStreamTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+class ByteCountingOutputStreamTest {
+
+    @Test
+    void testReset() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(100);
+        ByteCountingOutputStream bcos = new ByteCountingOutputStream(baos);
+        bcos.write("Hello".getBytes(StandardCharsets.UTF_8));
+        assertEquals(5, bcos.getBytesWritten());
+        bcos.reset();
+        assertEquals(0, bcos.getBytesWritten());
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
index 1156a12914..4b5ec0d548 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -60,8 +61,12 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -229,58 +234,43 @@ public class KafkaRecordSink_2_6 extends 
AbstractControllerService implements Ka
     public WriteResult sendData(final RecordSet recordSet, final Map<String, 
String> attributes, final boolean sendZeroResults) throws IOException {
 
         try {
-            WriteResult writeResult;
             final RecordSchema writeSchema = 
getWriterFactory().getSchema(null, recordSet.getSchema());
             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
             final ByteCountingOutputStream out = new 
ByteCountingOutputStream(baos);
+            final Queue<Future<RecordMetadata>> ackQ = new LinkedList<>();
             int recordCount = 0;
             try (final RecordSetWriter writer = 
getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
-                writer.beginRecordSet();
                 Record record;
                 while ((record = recordSet.next()) != null) {
+                    baos.reset();
+                    out.reset();
                     writer.write(record);
+                    writer.flush();
                     recordCount++;
                     if (out.getBytesWritten() > maxMessageSize) {
-                        throw new TokenTooLargeException("The query's result 
set size exceeds the maximum allowed message size of " + maxMessageSize + " 
bytes.");
+                        throw new TokenTooLargeException("A record's size 
exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
                     }
+                    sendMessage(topic, baos.toByteArray(), ackQ);
                 }
-                writeResult = writer.finishRecordSet();
                 if (out.getBytesWritten() > maxMessageSize) {
-                    throw new TokenTooLargeException("The query's result set 
size exceeds the maximum allowed message size of " + maxMessageSize + " 
bytes.");
+                    throw new TokenTooLargeException("A record's size exceeds 
the maximum allowed message size of " + maxMessageSize + " bytes.");
                 }
-                recordCount = writeResult.getRecordCount();
 
                 attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
                 attributes.put("record.count", Integer.toString(recordCount));
-                attributes.putAll(writeResult.getAttributes());
             }
 
-            if (recordCount > 0 || sendZeroResults) {
-                final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, null, null, baos.toByteArray());
-                try {
-                    producer.send(record, (metadata, exception) -> {
-                        if (exception != null) {
-                            throw new KafkaSendException(exception);
-                        }
-                    }).get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
-                } catch (KafkaSendException kse) {
-                    Throwable t = kse.getCause();
-                    if (t instanceof IOException) {
-                        throw (IOException) t;
-                    } else {
-                        throw new IOException(t);
-                    }
-                } catch (final InterruptedException e) {
-                    getLogger().warn("Interrupted while waiting for an 
acknowledgement from Kafka");
-                    Thread.currentThread().interrupt();
-                } catch (final TimeoutException e) {
-                    getLogger().warn("Timed out while waiting for an 
acknowledgement from Kafka");
+            if (recordCount == 0) {
+                if (sendZeroResults) {
+                    sendMessage(topic, new byte[0], ackQ);
+                } else {
+                    return WriteResult.EMPTY;
                 }
-            } else {
-                writeResult = WriteResult.EMPTY;
             }
 
-            return writeResult;
+            acknowledgeTransmission(ackQ);
+
+            return WriteResult.of(recordCount, attributes);
         } catch (IOException ioe) {
             throw ioe;
         } catch (Exception e) {
@@ -288,6 +278,37 @@ public class KafkaRecordSink_2_6 extends 
AbstractControllerService implements Ka
         }
     }
 
+    private void sendMessage(String topic, byte[] payload, final 
Queue<Future<RecordMetadata>> ackQ) throws IOException {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, null, null, payload);
+        // Add the Future to the queue
+        ackQ.add(producer.send(record, (metadata, exception) -> {
+            if (exception != null) {
+                throw new KafkaSendException(exception);
+            }
+        }));
+    }
+
+    private void acknowledgeTransmission(final Queue<Future<RecordMetadata>> 
ackQ) throws IOException, ExecutionException {
+        try {
+            Future<RecordMetadata> ack;
+            while ((ack = ackQ.poll()) != null) {
+                ack.get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
+            }
+        } catch (KafkaSendException kse) {
+            Throwable t = kse.getCause();
+            if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new IOException(t);
+            }
+        } catch (final InterruptedException e) {
+            getLogger().warn("Interrupted while waiting for an acknowledgement 
from Kafka");
+            Thread.currentThread().interrupt();
+        } catch (final TimeoutException e) {
+            getLogger().warn("Timed out while waiting for an acknowledgement 
from Kafka");
+        }
+    }
+
     @OnDisabled
     public void stop() {
         if (producer != null) {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
index fe5898620f..d19d805fc3 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
@@ -97,14 +97,17 @@ public class TestKafkaRecordSink_2_6 {
 
         task.sendData(recordSet, new HashMap<>(), true);
 
-        assertEquals(1, task.dataSent.size());
+        assertEquals(2, task.dataSent.size());
         String[] lines = new String(task.dataSent.get(0)).split("\n");
         assertNotNull(lines);
-        assertEquals(2, lines.length);
+        assertEquals(1, lines.length);
         String[] data = lines[0].split(",");
         assertEquals("15", data[0]); // In the MockRecordWriter all values are 
strings
         assertEquals("Hello", data[1]);
-        data = lines[1].split(",");
+        lines = new String(task.dataSent.get(1)).split("\n");
+        assertNotNull(lines);
+        assertEquals(1, lines.length);
+        data = lines[0].split(",");
         assertEquals("6", data[0]);
         assertEquals("World!", data[1]);
     }

Reply via email to