Repository: nifi
Updated Branches:
  refs/heads/master a031fb3dc -> ba5b1d837


NIFI-1117 Added unit tests and default behavior for avro files with no records. 
Also fixed a findbugs warning in SplitAvro (made inner classes static)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9756e9f1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9756e9f1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9756e9f1

Branch: refs/heads/master
Commit: 9756e9f1269efc05d1f51ab420e0468d5e2ee769
Parents: a031fb3
Author: Tony Kurc <[email protected]>
Authored: Sat Nov 7 15:35:03 2015 -0500
Committer: Tony Kurc <[email protected]>
Committed: Tue Nov 10 18:46:17 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java | 23 ++++++++++------
 .../apache/nifi/processors/avro/SplitAvro.java  |  6 ++--
 .../processors/avro/TestConvertAvroToJSON.java  | 29 ++++++++++++++++++++
 3 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9756e9f1/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 33951d7..d320e78 100644
--- 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -61,6 +61,8 @@ public class ConvertAvroToJSON extends AbstractProcessor {
     protected static final String CONTAINER_ARRAY = "array";
     protected static final String CONTAINER_NONE = "none";
 
+    private static final byte [] EMPTY_JSON_OBJECT = 
"{}".getBytes(StandardCharsets.UTF_8);
+
     static final PropertyDescriptor CONTAINER_OPTIONS = new 
PropertyDescriptor.Builder()
         .name("JSON container options")
         .description("Determines how stream of records is exposed: either as a 
sequence of single Objects (" + CONTAINER_NONE
@@ -123,16 +125,18 @@ public class ConvertAvroToJSON extends AbstractProcessor {
                         final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
 
                         final GenericData genericData = GenericData.get();
-                        GenericRecord record = reader.next();
-                        final String json = genericData.toString(record);
 
-                        int recordCount = 0;
-                        if (reader.hasNext() && 
containerOption.equals(CONTAINER_ARRAY)) {
+                        if (reader.hasNext() == false ) {
+                            out.write(EMPTY_JSON_OBJECT);
+                            return;
+                        }
+                        int recordCount = 1;
+                        GenericRecord reuse = reader.next();
+                        // Only open container if more than one record
+                        if(reader.hasNext() && 
containerOption.equals(CONTAINER_ARRAY)){
                             out.write('[');
                         }
-
-                        out.write(json.getBytes(StandardCharsets.UTF_8));
-                        recordCount++;
+                        
out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
 
                         while (reader.hasNext()) {
                             if (containerOption.equals(CONTAINER_ARRAY)) {
@@ -141,11 +145,12 @@ public class ConvertAvroToJSON extends AbstractProcessor {
                                 out.write('\n');
                             }
 
-                            final GenericRecord nextRecord = 
reader.next(record);
-                            
out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8));
+                            reuse = reader.next(reuse);
+                            
out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
                             recordCount++;
                         }
 
+                        // Only close container if more than one record
                         if (recordCount > 1 && 
containerOption.equals(CONTAINER_ARRAY)) {
                             out.write(']');
                         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9756e9f1/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
index dbf5778..38e3a0d 100644
--- 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
@@ -218,7 +218,7 @@ public class SplitAvro extends AbstractProcessor {
     /**
      * Splits the incoming Avro datafile into batches of records by reading 
and de-serializing each record.
      */
-    private class RecordSplitter implements Splitter {
+    static private class RecordSplitter implements Splitter {
 
         private final int splitSize;
         private final boolean transferMetadata;
@@ -300,7 +300,7 @@ public class SplitAvro extends AbstractProcessor {
     /**
      * Writes a binary Avro Datafile to the OutputStream.
      */
-    private class DatafileSplitWriter implements SplitWriter {
+    static private class DatafileSplitWriter implements SplitWriter {
 
         private final boolean transferMetadata;
         private DataFileWriter<GenericRecord> writer;
@@ -344,7 +344,7 @@ public class SplitAvro extends AbstractProcessor {
     /**
      * Writes bare Avro records to the OutputStream.
      */
-    private class BareRecordSplitWriter implements SplitWriter {
+    static private class BareRecordSplitWriter implements SplitWriter {
         private Encoder encoder;
         private DatumWriter<GenericRecord> writer;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9756e9f1/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
index 302528e..9486826 100644
--- 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
@@ -126,4 +126,33 @@ public class TestConvertAvroToJSON {
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
         out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 
256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 
1024, \"favorite_color\": \"red\"}");
     }
+
+    @Test
+    public void testEmptyFlowFile() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ConvertAvroToJSON());
+
+        runner.enqueue(new byte[]{});
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testZeroRecords() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ConvertAvroToJSON());
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, 
datumWriter);
+        runner.enqueue(out1.toByteArray());
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("{}");
+
+    }
 }

Reply via email to