NIFI-4639: Updated PublishKafka 1.0 processor to use a fresh writer for each 
output record as well. This closes #2292.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/113ad5ec
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/113ad5ec
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/113ad5ec

Branch: refs/heads/master
Commit: 113ad5ecfa8990c97a56b8fc31656f3542735906
Parents: c9cc76b
Author: Mark Payne <[email protected]>
Authored: Fri Dec 8 09:13:52 2017 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Dec 8 09:14:17 2017 -0500

----------------------------------------------------------------------
 .../processors/kafka/pubsub/PublisherLease.java |  8 ++--
 .../kafka/pubsub/TestPublisherLease.java        | 44 ++++++++++++++++++++
 2 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/113ad5ec/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index abcd15f..d18df7f 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -148,13 +148,15 @@ public class PublisherLease implements Closeable {
         Record record;
         int recordCount = 0;
 
-        try (final RecordSetWriter writer = writerFactory.createWriter(logger, 
schema, baos)) {
+        try {
             while ((record = recordSet.next()) != null) {
                 recordCount++;
                 baos.reset();
 
-                writer.write(record);
-                writer.flush();
+                try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos)) {
+                    writer.write(record);
+                    writer.flush();
+                }
 
                 final byte[] messageContent = baos.toByteArray();
                 final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);

http://git-wip-us.apache.org/repos/asf/nifi/blob/113ad5ec/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 54c1222..64451d5 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -28,6 +29,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -35,6 +37,16 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Assert;
 import org.junit.Before;
@@ -187,4 +199,36 @@ public class TestPublisherLease {
 
         verify(producer, times(1)).flush();
     }
+
+
+    @Test
+    public void testRecordsSentToRecordWriterAndThenToProducer() throws 
IOException, SchemaNotFoundException, MalformedRecordException {
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 
10L, logger, true, null, StandardCharsets.UTF_8);
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 
47".getBytes(StandardCharsets.UTF_8);
+
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("person_id", RecordFieldType.LONG);
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        final RecordReader reader = 
readerService.createRecordReader(Collections.emptyMap(), new 
ByteArrayInputStream(exampleInput), logger);
+        final RecordSet recordSet = reader.createRecordSet();
+        final RecordSchema schema = reader.getSchema();
+
+        final String topic = "unit-test";
+        final String keyField = "person_id";
+
+        final RecordSetWriterFactory writerFactory = 
Mockito.mock(RecordSetWriterFactory.class);
+        final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
+
+        Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), 
any())).thenReturn(writer);
+
+        lease.publish(flowFile, recordSet, writerFactory, schema, keyField, 
topic);
+
+        verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), 
any());
+        verify(writer, times(2)).write(any(Record.class));
+        verify(producer, times(2)).send(any(), any());
+    }
 }

Reply via email to