This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 276cc5765b NIFI-8326: Send records as individual messages in Kafka
RecordSinks
276cc5765b is described below
commit 276cc5765b55dc6c9d1cb386bb6b4e98a09f1053
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Mar 16 10:25:44 2021 -0400
NIFI-8326: Send records as individual messages in Kafka RecordSinks
Signed-off-by: Pierre Villard <[email protected]>
This closes #4901.
---
.../nifi/stream/io/ByteCountingOutputStream.java | 4 ++
.../stream/io/ByteCountingOutputStreamTest.java | 39 +++++++++++
.../record/sink/kafka/KafkaRecordSink_2_6.java | 81 ++++++++++++++--------
.../record/sink/kafka/TestKafkaRecordSink_2_6.java | 9 ++-
4 files changed, 100 insertions(+), 33 deletions(-)
diff --git
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
index 47f236db0f..55dd82dc15 100644
---
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -64,6 +64,10 @@ public class ByteCountingOutputStream extends OutputStream {
out.close();
}
+ public void reset() {
+ bytesWritten = 0;
+ }
+
public OutputStream getWrappedStream() {
return out;
}
diff --git
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingOutputStreamTest.java
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingOutputStreamTest.java
new file mode 100644
index 0000000000..9fb3525062
--- /dev/null
+++
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingOutputStreamTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+class ByteCountingOutputStreamTest {
+
+ @Test
+ void testReset() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(100);
+ ByteCountingOutputStream bcos = new ByteCountingOutputStream(baos);
+ bcos.write("Hello".getBytes(StandardCharsets.UTF_8));
+ assertEquals(5, bcos.getBytesWritten());
+ bcos.reset();
+ assertEquals(0, bcos.getBytesWritten());
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
index 1156a12914..4b5ec0d548 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -60,8 +61,12 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -229,58 +234,43 @@ public class KafkaRecordSink_2_6 extends
AbstractControllerService implements Ka
public WriteResult sendData(final RecordSet recordSet, final Map<String,
String> attributes, final boolean sendZeroResults) throws IOException {
try {
- WriteResult writeResult;
final RecordSchema writeSchema =
getWriterFactory().getSchema(null, recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteCountingOutputStream out = new
ByteCountingOutputStream(baos);
+ final Queue<Future<RecordMetadata>> ackQ = new LinkedList<>();
int recordCount = 0;
try (final RecordSetWriter writer =
getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
- writer.beginRecordSet();
Record record;
while ((record = recordSet.next()) != null) {
+ baos.reset();
+ out.reset();
writer.write(record);
+ writer.flush();
recordCount++;
if (out.getBytesWritten() > maxMessageSize) {
- throw new TokenTooLargeException("The query's result
set size exceeds the maximum allowed message size of " + maxMessageSize + "
bytes.");
+ throw new TokenTooLargeException("A record's size
exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
}
+ sendMessage(topic, baos.toByteArray(), ackQ);
}
- writeResult = writer.finishRecordSet();
if (out.getBytesWritten() > maxMessageSize) {
- throw new TokenTooLargeException("The query's result set
size exceeds the maximum allowed message size of " + maxMessageSize + "
bytes.");
+ throw new TokenTooLargeException("A record's size exceeds
the maximum allowed message size of " + maxMessageSize + " bytes.");
}
- recordCount = writeResult.getRecordCount();
attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
attributes.put("record.count", Integer.toString(recordCount));
- attributes.putAll(writeResult.getAttributes());
}
- if (recordCount > 0 || sendZeroResults) {
- final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(topic, null, null, baos.toByteArray());
- try {
- producer.send(record, (metadata, exception) -> {
- if (exception != null) {
- throw new KafkaSendException(exception);
- }
- }).get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
- } catch (KafkaSendException kse) {
- Throwable t = kse.getCause();
- if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new IOException(t);
- }
- } catch (final InterruptedException e) {
- getLogger().warn("Interrupted while waiting for an
acknowledgement from Kafka");
- Thread.currentThread().interrupt();
- } catch (final TimeoutException e) {
- getLogger().warn("Timed out while waiting for an
acknowledgement from Kafka");
+ if (recordCount == 0) {
+ if (sendZeroResults) {
+ sendMessage(topic, new byte[0], ackQ);
+ } else {
+ return WriteResult.EMPTY;
}
- } else {
- writeResult = WriteResult.EMPTY;
}
- return writeResult;
+ acknowledgeTransmission(ackQ);
+
+ return WriteResult.of(recordCount, attributes);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
@@ -288,6 +278,37 @@ public class KafkaRecordSink_2_6 extends
AbstractControllerService implements Ka
}
}
+ private void sendMessage(String topic, byte[] payload, final
Queue<Future<RecordMetadata>> ackQ) throws IOException {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(topic, null, null, payload);
+ // Add the Future to the queue
+ ackQ.add(producer.send(record, (metadata, exception) -> {
+ if (exception != null) {
+ throw new KafkaSendException(exception);
+ }
+ }));
+ }
+
+ private void acknowledgeTransmission(final Queue<Future<RecordMetadata>>
ackQ) throws IOException, ExecutionException {
+ try {
+ Future<RecordMetadata> ack;
+ while ((ack = ackQ.poll()) != null) {
+ ack.get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
+ }
+ } catch (KafkaSendException kse) {
+ Throwable t = kse.getCause();
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ } else {
+ throw new IOException(t);
+ }
+ } catch (final InterruptedException e) {
+ getLogger().warn("Interrupted while waiting for an acknowledgement
from Kafka");
+ Thread.currentThread().interrupt();
+ } catch (final TimeoutException e) {
+ getLogger().warn("Timed out while waiting for an acknowledgement
from Kafka");
+ }
+ }
+
@OnDisabled
public void stop() {
if (producer != null) {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
index fe5898620f..d19d805fc3 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
@@ -97,14 +97,17 @@ public class TestKafkaRecordSink_2_6 {
task.sendData(recordSet, new HashMap<>(), true);
- assertEquals(1, task.dataSent.size());
+ assertEquals(2, task.dataSent.size());
String[] lines = new String(task.dataSent.get(0)).split("\n");
assertNotNull(lines);
- assertEquals(2, lines.length);
+ assertEquals(1, lines.length);
String[] data = lines[0].split(",");
assertEquals("15", data[0]); // In the MockRecordWriter all values are
strings
assertEquals("Hello", data[1]);
- data = lines[1].split(",");
+ lines = new String(task.dataSent.get(1)).split("\n");
+ assertNotNull(lines);
+ assertEquals(1, lines.length);
+ data = lines[0].split(",");
assertEquals("6", data[0]);
assertEquals("World!", data[1]);
}