Repository: nifi
Updated Branches:
  refs/heads/master 8e6649ba1 -> 113ad5ecf


NIFI-4639: fresh writer for each output record


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c9cc76b5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c9cc76b5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c9cc76b5

Branch: refs/heads/master
Commit: c9cc76b5c849613b2d3d24f00ed8fa46204668b3
Parents: 8e6649b
Author: matthew-silverman <[email protected]>
Authored: Fri Nov 24 09:12:13 2017 +0000
Committer: Mark Payne <[email protected]>
Committed: Fri Dec 8 08:39:22 2017 -0500

----------------------------------------------------------------------
 .../processors/kafka/pubsub/PublisherLease.java |  8 ++--
 .../kafka/pubsub/TestPublisherLease.java        | 46 ++++++++++++++++++++
 .../processors/kafka/pubsub/PublisherLease.java |  8 ++--
 .../kafka/pubsub/TestPublisherLease.java        | 46 ++++++++++++++++++++
 4 files changed, 102 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c9cc76b5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 70957ce..5a05a08 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -106,13 +106,15 @@ public class PublisherLease implements Closeable {
         Record record;
         int recordCount = 0;
 
-        try (final RecordSetWriter writer = writerFactory.createWriter(logger, 
schema, baos)) {
+        try {
             while ((record = recordSet.next()) != null) {
                 recordCount++;
                 baos.reset();
 
-                writer.write(record);
-                writer.flush();
+                try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos)) {
+                    writer.write(record);
+                    writer.flush();
+                }
 
                 final byte[] messageContent = baos.toByteArray();
                 final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c9cc76b5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index c2d143c..836a4b3 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -29,6 +30,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -36,6 +38,17 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Assert;
 import org.junit.Before;
@@ -191,4 +204,37 @@ public class TestPublisherLease {
 
         verify(producer, times(1)).flush();
     }
+
+    @Test
+    public void testRecordsSentToRecordWriterAndThenToProducer() throws 
IOException, SchemaNotFoundException, MalformedRecordException {
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 
10L, logger);
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 
47".getBytes(StandardCharsets.UTF_8);
+
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("person_id", RecordFieldType.LONG);
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        final RecordReader reader = 
readerService.createRecordReader(Collections.emptyMap(), new 
ByteArrayInputStream(exampleInput), logger);
+        final RecordSet recordSet = reader.createRecordSet();
+        final RecordSchema schema = reader.getSchema();
+
+        final RecordSetWriterFactory writerService = new 
MockRecordWriter("person_id, name, age");
+
+        final String topic = "unit-test";
+        final String keyField = "person_id";
+
+        final RecordSetWriterFactory writerFactory = 
Mockito.mock(RecordSetWriterFactory.class);
+        final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
+
+        Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), 
any())).thenReturn(writer);
+
+        lease.publish(flowFile, recordSet, writerFactory, schema, keyField, 
topic);
+
+        verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), 
any());
+        verify(writer, times(2)).write(any(Record.class));
+        verify(producer, times(2)).send(any(), any());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c9cc76b5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index abcd15f..d18df7f 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -148,13 +148,15 @@ public class PublisherLease implements Closeable {
         Record record;
         int recordCount = 0;
 
-        try (final RecordSetWriter writer = writerFactory.createWriter(logger, 
schema, baos)) {
+        try {
             while ((record = recordSet.next()) != null) {
                 recordCount++;
                 baos.reset();
 
-                writer.write(record);
-                writer.flush();
+                try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos)) {
+                    writer.write(record);
+                    writer.flush();
+                }
 
                 final byte[] messageContent = baos.toByteArray();
                 final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c9cc76b5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 54c1222..105a4d5 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -28,6 +29,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -35,6 +37,17 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Assert;
 import org.junit.Before;
@@ -187,4 +200,37 @@ public class TestPublisherLease {
 
         verify(producer, times(1)).flush();
     }
+
+    @Test
+    public void testRecordsSentToRecordWriterAndThenToProducer() throws 
IOException, SchemaNotFoundException, MalformedRecordException {
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 
10L, logger, true, null, StandardCharsets.UTF_8);
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 
47".getBytes(StandardCharsets.UTF_8);
+
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("person_id", RecordFieldType.LONG);
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        final RecordReader reader = 
readerService.createRecordReader(Collections.emptyMap(), new 
ByteArrayInputStream(exampleInput), logger);
+        final RecordSet recordSet = reader.createRecordSet();
+        final RecordSchema schema = reader.getSchema();
+
+        final RecordSetWriterFactory writerService = new 
MockRecordWriter("person_id, name, age");
+
+        final String topic = "unit-test";
+        final String keyField = "person_id";
+
+        final RecordSetWriterFactory writerFactory = 
Mockito.mock(RecordSetWriterFactory.class);
+        final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
+
+        Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), 
any())).thenReturn(writer);
+
+        lease.publish(flowFile, recordSet, writerFactory, schema, keyField, 
topic);
+
+        verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), 
any());
+        verify(writer, times(2)).write(any(Record.class));
+        verify(producer, times(2)).send(any(), any());
+    }
 }

Reply via email to