Repository: nifi
Updated Branches:
  refs/heads/master 4ed6631ed -> 0289ca711


NIFI-4456: Support multiple JSON objects in JSON record reader/writers

This closes #2640.

Signed-off-by: Mark Payne <marka...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0289ca71
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0289ca71
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0289ca71

Branch: refs/heads/master
Commit: 0289ca7114484e9d5295cb62e741d7793ed8c85c
Parents: 4ed6631
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Tue Apr 17 11:04:56 2018 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed May 2 15:22:01 2018 -0400

----------------------------------------------------------------------
 .../nifi-record-serialization-services/pom.xml  |   4 +
 .../nifi/json/AbstractJsonRowRecordReader.java  |   7 +-
 .../org/apache/nifi/json/JsonPathReader.java    |  11 +-
 .../apache/nifi/json/JsonRecordSetWriter.java   |  45 ++++++-
 .../org/apache/nifi/json/JsonTreeReader.java    |  12 +-
 .../org/apache/nifi/json/OutputGrouping.java    |  23 ++++
 .../org/apache/nifi/json/WriteJsonResult.java   |  20 +++-
 .../nifi/json/TestJsonPathRowRecordReader.java  |  26 +++++
 .../nifi/json/TestJsonTreeRowRecordReader.java  | 117 +++++++++++++++++++
 .../apache/nifi/json/TestWriteJsonResult.java   |  85 +++++++++++---
 .../test/resources/json/bank-account-mixed.json |  32 +++++
 .../resources/json/bank-account-multiarray.json |  42 +++++++
 .../resources/json/bank-account-multiline.json  |  19 +++
 .../resources/json/bank-account-oneline.json    |   2 +
 14 files changed, 406 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 4d47701..0c54b7c 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -115,6 +115,10 @@
                         
<exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
                         
<exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude>
                         
<exclude>src/test/resources/json/bank-account-array.json</exclude>
+                        
<exclude>src/test/resources/json/bank-account-mixed.json</exclude>
+                        
<exclude>src/test/resources/json/bank-account-multiarray.json</exclude>
+                        
<exclude>src/test/resources/json/bank-account-multiline.json</exclude>
+                        
<exclude>src/test/resources/json/bank-account-oneline.json</exclude>
                         
<exclude>src/test/resources/json/json-with-unicode.json</exclude>
                         
<exclude>src/test/resources/json/primitive-type-array.json</exclude>
                         
<exclude>src/test/resources/json/single-bank-account.json</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 663b837..cc08d34 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -84,10 +84,6 @@ public abstract class AbstractJsonRowRecordReader implements 
RecordReader {
 
     @Override
     public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
-        if (firstObjectConsumed && !array) {
-            return null;
-        }
-
         final JsonNode nextNode = getNextJsonNode();
         final RecordSchema schema = getSchema();
         try {
@@ -197,7 +193,8 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
                     return jsonParser.readValueAsTree();
                 case END_ARRAY:
                 case START_ARRAY:
-                    return null;
+                    continue;
+
                 default:
                     throw new MalformedRecordException("Expected to get a JSON 
Object but got a token of type " + token.name());
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
index f8f1ea2..d8f21ca 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
@@ -48,15 +48,16 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import com.jayway.jsonpath.JsonPath;
 
 @Tags({"json", "jsonpath", "record", "reader", "parser"})
-@CapabilityDescription("Parses JSON records and evaluates user-defined JSON 
Path's against each JSON object. The root element may be either "
-    + "a single JSON object or a JSON array. If a JSON array is found, each 
JSON object within that array is treated as a separate record. "
-    + "User-defined properties define the fields that should be extracted from 
the JSON in order to form the fields of a Record. Any JSON field "
-    + "that is not extracted via a JSONPath will not be returned in the JSON 
Records.")
+@CapabilityDescription("Parses JSON records and evaluates user-defined JSON 
Path's against each JSON object. While the reader expects each record "
+        + "to be well-formed JSON, the content of a FlowFile may consist of 
many records, each as a well-formed JSON array or JSON object with "
+        + "optional whitespace between them, such as the common 
'JSON-per-line' format. If an array is encountered, each element in that array 
will "
+        + "be treated as a separate record. User-defined properties define the 
fields that should be extracted from the JSON in order to form the "
+        + "fields of a Record. Any JSON field that is not extracted via a 
JSONPath will not be returned in the JSON Records.")
 @SeeAlso(JsonTreeReader.class)
 @DynamicProperty(name = "The field name for the record.",
     value="A JSONPath Expression that will be evaluated against each JSON 
record. The result of the JSONPath will be the value of the "
         + "field whose name is the same as the property name.",
-    description="User-defined properties identifiy how to extract specific 
fields from a JSON object in order to create a Record",
+    description="User-defined properties identify how to extract specific 
fields from a JSON object in order to create a Record",
     expressionLanguageScope=ExpressionLanguageScope.NONE)
 public class JsonPathReader extends SchemaRegistryService implements 
RecordReaderFactory {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index c5cb82a..078a15d 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -20,6 +20,7 @@ package org.apache.nifi.json;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -27,6 +28,8 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
@@ -37,8 +40,8 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 @Tags({"json", "resultset", "writer", "serialize", "record", "recordset", 
"row"})
-@CapabilityDescription("Writes the results of a RecordSet as a JSON Array. 
Even if the RecordSet "
-    + "consists of a single row, it will be written as an array with a single 
element.")
+@CapabilityDescription("Writes the results of a RecordSet as either a JSON 
Array or one JSON object per line. If using Array output, then even if the 
RecordSet "
+    + "consists of a single row, it will be written as an array with a single 
element. If using One Line Per Object output, the JSON objects cannot be 
pretty-printed.")
 public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter 
implements RecordSetWriterFactory {
 
     static final AllowableValue ALWAYS_SUPPRESS = new 
AllowableValue("always-suppress", "Always Suppress",
@@ -48,6 +51,11 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
     static final AllowableValue SUPPRESS_MISSING = new 
AllowableValue("suppress-missing", "Suppress Missing Values",
         "When a field has a value of null, it will be written out. However, if 
a field is defined in the schema and not present in the record, the field will 
not be written out.");
 
+    static final AllowableValue OUTPUT_ARRAY = new 
AllowableValue("output-array", "Array",
+            "Output records as a JSON array");
+    static final AllowableValue OUTPUT_ONELINE = new 
AllowableValue("output-oneline", "One Line Per Object",
+            "Output records with one JSON object per line, delimited by a 
newline character");
+
     static final PropertyDescriptor SUPPRESS_NULLS = new 
PropertyDescriptor.Builder()
         .name("suppress-nulls")
         .displayName("Suppress Null Values")
@@ -64,18 +72,40 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
         .defaultValue("false")
         .required(true)
         .build();
+    static final PropertyDescriptor OUTPUT_GROUPING = new 
PropertyDescriptor.Builder()
+            .name("output-grouping")
+            .displayName("Output Grouping")
+            .description("Specifies how the writer should output the JSON 
records (as an array or one object per line, e.g.) Note that if 'One Line Per 
Object' is "
+                    + "selected, then Pretty Print JSON must be false.")
+            .allowableValues(OUTPUT_ARRAY, OUTPUT_ONELINE)
+            .defaultValue(OUTPUT_ARRAY.getValue())
+            .required(true)
+            .build();
 
     private volatile boolean prettyPrint;
     private volatile NullSuppression nullSuppression;
+    private volatile OutputGrouping outputGrouping;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(PRETTY_PRINT_JSON);
         properties.add(SUPPRESS_NULLS);
+        properties.add(OUTPUT_GROUPING);
         return properties;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
+        // Don't allow Pretty Print if One Line Per Object is selected
+        if (context.getProperty(PRETTY_PRINT_JSON).asBoolean() && 
context.getProperty(OUTPUT_GROUPING).getValue().equals(OUTPUT_ONELINE.getValue()))
 {
+            problems.add(new ValidationResult.Builder().input("Pretty 
Print").valid(false)
+                    .explanation("Pretty Print JSON must be false when 'Output 
Grouping' is set to 'One Line Per Object'").build());
+        }
+        return problems;
+    }
+
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
         prettyPrint = context.getProperty(PRETTY_PRINT_JSON).asBoolean();
@@ -90,11 +120,20 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
             suppression = NullSuppression.NEVER_SUPPRESS;
         }
         this.nullSuppression = suppression;
+
+        String outputGroupingValue = 
context.getProperty(OUTPUT_GROUPING).getValue();
+        final OutputGrouping grouping;
+        if(OUTPUT_ONELINE.getValue().equals(outputGroupingValue)) {
+            grouping = OutputGrouping.OUTPUT_ONELINE;
+        } else {
+            grouping = OutputGrouping.OUTPUT_ARRAY;
+        }
+        this.outputGrouping = grouping;
     }
 
     @Override
     public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, 
IOException {
-        return new WriteJsonResult(logger, schema, 
getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression,
+        return new WriteJsonResult(logger, schema, 
getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression, 
outputGrouping,
             getDateFormat().orElse(null), getTimeFormat().orElse(null), 
getTimestampFormat().orElse(null));
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index 8290284..27833ad 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -38,12 +38,12 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.SchemaRegistryService;
 
 @Tags({"json", "tree", "record", "reader", "parser"})
-@CapabilityDescription("Parses JSON into individual Record objects. The Record 
that is produced will contain all top-level "
-    + "elements of the corresponding JSON Object. "
-    + "The root JSON element can be either a single element or an array of 
JSON elements, and each "
-    + "element in that array will be treated as a separate record. "
-    + "If the schema that is configured contains a field that is not present 
in the JSON, a null value will be used. If the JSON contains "
-    + "a field that is not present in the schema, that field will be skipped. "
+@CapabilityDescription("Parses JSON into individual Record objects. While the 
reader expects each record "
+        + "to be well-formed JSON, the content of a FlowFile may consist of 
many records, each as a well-formed "
+        + "JSON array or JSON object with optional whitespace between them, 
such as the common 'JSON-per-line' format. "
+        + "If an array is encountered, each element in that array will be 
treated as a separate record. "
+        + "If the schema that is configured contains a field that is not 
present in the JSON, a null value will be used. If the JSON contains "
+        + "a field that is not present in the schema, that field will be 
skipped. "
     + "See the Usage of the Controller Service for more information and 
examples.")
 @SeeAlso(JsonPathReader.class)
 public class JsonTreeReader extends SchemaRegistryService implements 
RecordReaderFactory {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/OutputGrouping.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/OutputGrouping.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/OutputGrouping.java
new file mode 100644
index 0000000..9c0aff5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/OutputGrouping.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+public enum OutputGrouping {
+    OUTPUT_ARRAY,
+    OUTPUT_ONELINE
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 41a72c7..c91768e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -46,6 +46,7 @@ import 
org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
 
 public class WriteJsonResult extends AbstractRecordSetWriter implements 
RecordSetWriter, RawRecordWriter {
     private final ComponentLog logger;
@@ -53,19 +54,23 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
     private final RecordSchema recordSchema;
     private final JsonFactory factory = new JsonFactory();
     private final JsonGenerator generator;
+    private final OutputStream out;
     private final NullSuppression nullSuppression;
+    private final OutputGrouping outputGrouping;
     private final Supplier<DateFormat> LAZY_DATE_FORMAT;
     private final Supplier<DateFormat> LAZY_TIME_FORMAT;
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
 
     public WriteJsonResult(final ComponentLog logger, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, 
final boolean prettyPrint,
-        final NullSuppression nullSuppression, final String dateFormat, final 
String timeFormat, final String timestampFormat) throws IOException {
+        final NullSuppression nullSuppression, final OutputGrouping 
outputGrouping, final String dateFormat, final String timeFormat, final String 
timestampFormat) throws IOException {
 
         super(out);
         this.logger = logger;
         this.recordSchema = recordSchema;
         this.schemaAccess = schemaAccess;
+        this.out = out;
         this.nullSuppression = nullSuppression;
+        this.outputGrouping = outputGrouping;
 
         final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
         final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
@@ -78,6 +83,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter 
implements RecordSe
         this.generator = factory.createJsonGenerator(out);
         if (prettyPrint) {
             generator.useDefaultPrettyPrinter();
+        } else if (OutputGrouping.OUTPUT_ONELINE.equals(outputGrouping)) {
+            // Use a minimal pretty printer with a newline object separator, 
will output one JSON object per line
+            generator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
         }
     }
 
@@ -87,12 +95,16 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
         final OutputStream out = getOutputStream();
         schemaAccess.writeHeader(recordSchema, out);
 
-        generator.writeStartArray();
+        if (outputGrouping == OutputGrouping.OUTPUT_ARRAY) {
+            generator.writeStartArray();
+        }
     }
 
     @Override
     protected Map<String, String> onFinishRecordSet() throws IOException {
-        generator.writeEndArray();
+        if (outputGrouping == OutputGrouping.OUTPUT_ARRAY) {
+            generator.writeEndArray();
+        }
         return schemaAccess.getAttributes(recordSchema);
     }
 
@@ -191,8 +203,6 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
                 }
             }
 
-
-
             endTask.apply(generator);
         } catch (final Exception e) {
             logger.error("Failed to write {} with schema {} as a JSON Object 
due to {}", new Object[] {record, record.getSchema(), e.toString(), e});

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
index 11e2828..5e0da91 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
@@ -119,6 +119,32 @@ public class TestJsonPathRowRecordReader {
     }
 
     @Test
+    public void testReadOneLine() throws IOException, MalformedRecordException 
{
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-oneline.json"));
+             final JsonPathRowRecordReader reader = new 
JsonPathRowRecordReader(allJsonPaths, schema, in, 
Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList(new String[] 
{"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(dt -> 
dt.getFieldType()).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(new 
RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING});
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, 
"123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, 
"321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
     public void testSingleJsonElement() throws IOException, 
MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index e898edd..73abdff 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -162,6 +162,123 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
+    public void testReadOneLinePerJSON() throws IOException, 
MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-oneline.json"));
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList(new String[] 
{"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(dt -> 
dt.getFieldType()).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(new 
RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING});
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, 
"123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, 
"321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testReadMultilineJSON() throws IOException, 
MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-multiline.json"));
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList(new String[] 
{"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(dt -> 
dt.getFieldType()).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(new 
RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING});
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, 
"123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, 
"321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testReadMultilineArrays() throws IOException, 
MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-multiarray.json"));
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList(new String[] 
{"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(dt -> 
dt.getFieldType()).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(new 
RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING});
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, 
"123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, 
"321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, 
"123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
+
+            final Object[] fourthRecordValues = 
reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 
Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testReadMixedJSON() throws IOException, 
MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream(new 
File("src/test/resources/json/bank-account-mixed.json"));
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, 
dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList(new String[] 
{"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = 
schema.getDataTypes().stream().map(dt -> 
dt.getFieldType()).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(new 
RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, 
RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, 
RecordFieldType.STRING});
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, 
"123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = 
reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, 
"321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, 
"123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
+
+            final Object[] fourthRecordValues = 
reader.nextRecord().getValues();
+            Assert.assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 
Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
+
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
     public void testReadRawRecordIncludesFieldsNotInSchema() throws 
IOException, MalformedRecordException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index af19394..88f3ed9 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -104,7 +104,8 @@ public class TestWriteJsonResult {
         final RecordSet rs = RecordSet.of(schema, record);
 
         try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, true,
-            NullSuppression.NEVER_SUPPRESS, 
RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
RecordFieldType.DATE.getDefaultFormat(),
+                RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
 
             writer.write(rs);
         }
@@ -141,7 +142,8 @@ public class TestWriteJsonResult {
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, true,
-            NullSuppression.NEVER_SUPPRESS, 
RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
RecordFieldType.DATE.getDefaultFormat(),
+                RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
 
             writer.write(rs);
         }
@@ -172,7 +174,8 @@ public class TestWriteJsonResult {
         final RecordSet rs = RecordSet.of(schema, record);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.write(rs);
         }
 
@@ -196,7 +199,8 @@ public class TestWriteJsonResult {
         final Record record = new MapRecord(schema, values);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.beginRecordSet();
             writer.writeRecord(record);
             writer.finishRecordSet();
@@ -222,7 +226,8 @@ public class TestWriteJsonResult {
         final Record record = new MapRecord(schema, values);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.beginRecordSet();
             writer.writeRawRecord(record);
             writer.finishRecordSet();
@@ -248,7 +253,8 @@ public class TestWriteJsonResult {
         final Record record = new MapRecord(schema, values);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.beginRecordSet();
             writer.writeRecord(record);
             writer.finishRecordSet();
@@ -274,7 +280,8 @@ public class TestWriteJsonResult {
         final Record record = new MapRecord(schema, values);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.beginRecordSet();
             writer.writeRawRecord(record);
             writer.finishRecordSet();
@@ -301,7 +308,8 @@ public class TestWriteJsonResult {
         final Record record = new MapRecord(schema, values);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.beginRecordSet();
             writer.writeRecord(record);
             writer.finishRecordSet();
@@ -328,7 +336,8 @@ public class TestWriteJsonResult {
         final Record record = new MapRecord(schema, values);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.beginRecordSet();
             writer.writeRawRecord(record);
             writer.finishRecordSet();
@@ -354,7 +363,8 @@ public class TestWriteJsonResult {
         final Record recordWithMissingName = new MapRecord(schema, values);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.beginRecordSet();
             writer.write(recordWithMissingName);
             writer.finishRecordSet();
@@ -364,7 +374,8 @@ public class TestWriteJsonResult {
 
         baos.reset();
         try (
-            final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.ALWAYS_SUPPRESS, null, 
null, null)) {
+            final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                    NullSuppression.ALWAYS_SUPPRESS, 
OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
             writer.beginRecordSet();
             writer.write(recordWithMissingName);
             writer.finishRecordSet();
@@ -373,7 +384,8 @@ public class TestWriteJsonResult {
         assertEquals("[{\"id\":\"1\"}]", new String(baos.toByteArray(), 
StandardCharsets.UTF_8));
 
         baos.reset();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.SUPPRESS_MISSING, null, 
null,
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.SUPPRESS_MISSING, OutputGrouping.OUTPUT_ARRAY, 
null, null,
             null)) {
             writer.beginRecordSet();
             writer.write(recordWithMissingName);
@@ -387,7 +399,8 @@ public class TestWriteJsonResult {
         final Record recordWithNullValue = new MapRecord(schema, values);
 
         baos.reset();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, 
null, null)) {
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
null, null, null)) {
             writer.beginRecordSet();
             writer.write(recordWithNullValue);
             writer.finishRecordSet();
@@ -397,7 +410,8 @@ public class TestWriteJsonResult {
 
         baos.reset();
         try (
-            final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.ALWAYS_SUPPRESS, null, 
null, null)) {
+            final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                    NullSuppression.ALWAYS_SUPPRESS, 
OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
             writer.beginRecordSet();
             writer.write(recordWithNullValue);
             writer.finishRecordSet();
@@ -406,7 +420,8 @@ public class TestWriteJsonResult {
         assertEquals("[{\"id\":\"1\"}]", new String(baos.toByteArray(), 
StandardCharsets.UTF_8));
 
         baos.reset();
-        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, NullSuppression.SUPPRESS_MISSING, null, 
null,
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.SUPPRESS_MISSING, OutputGrouping.OUTPUT_ARRAY, 
null, null,
             null)) {
             writer.beginRecordSet();
             writer.write(recordWithNullValue);
@@ -416,4 +431,44 @@ public class TestWriteJsonResult {
         assertEquals("[{\"id\":\"1\",\"name\":null}]", new 
String(baos.toByteArray(), StandardCharsets.UTF_8));
 
     }
+
+    @Test
+    public void testOnelineOutput() throws IOException {
+        final Map<String, Object> values1 = new HashMap<>();
+        values1.put("timestamp", new java.sql.Timestamp(37293723L));
+        values1.put("time", new java.sql.Time(37293723L));
+        values1.put("date", new java.sql.Date(37293723L));
+
+        final List<RecordField> fields1 = new ArrayList<>();
+        fields1.add(new RecordField("timestamp", 
RecordFieldType.TIMESTAMP.getDataType()));
+        fields1.add(new RecordField("time", 
RecordFieldType.TIME.getDataType()));
+        fields1.add(new RecordField("date", 
RecordFieldType.DATE.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields1);
+
+        final Record record1 = new MapRecord(schema, values1);
+
+        final Map<String, Object> values2 = new HashMap<>();
+        values2.put("timestamp", new java.sql.Timestamp(37293999L));
+        values2.put("time", new java.sql.Time(37293999L));
+        values2.put("date", new java.sql.Date(37293999L));
+
+
+        final Record record2 = new MapRecord(schema, values2);
+
+        final RecordSet rs = RecordSet.of(schema, record1, record2);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ONELINE, 
null, null, null)) {
+            writer.write(rs);
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        final String expected = 
"{\"timestamp\":37293723,\"time\":37293723,\"date\":37293723}\n{\"timestamp\":37293999,\"time\":37293999,\"date\":37293999}";
+
+        final String output = new String(data, StandardCharsets.UTF_8);
+        assertEquals(expected, output);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-mixed.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-mixed.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-mixed.json
new file mode 100644
index 0000000..6c6def4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-mixed.json
@@ -0,0 +1,32 @@
+[
+       {
+               "id": 1,
+               "name": "John Doe",
+               "balance": 4750.89,
+               "address": "123 My Street",
+               "city": "My City", 
+               "state": "MS",
+               "zipCode": "11111",
+               "country": "USA"
+       }, {
+               "id": 2,
+               "name": "Jane Doe",
+               "balance": 4820.09,
+               "address": "321 Your Street",
+               "city": "Your City", 
+               "state": "NY",
+               "zipCode": "33333",
+               "country": "USA"
+       }
+]
+{ "id": 3, "name": "Maria Doe", "balance": 4750.89, "address": "123 My 
Street", "city": "My City", "state": "ME", "zipCode": "11111", "country": "USA" 
}
+{
+       "id": 4,
+       "name": "Xi Doe",
+       "balance": 4820.09,
+       "address": "321 Your Street",
+       "city": "Your City",
+       "state": "NV",
+       "zipCode": "33333",
+       "country": "USA"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-multiarray.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-multiarray.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-multiarray.json
new file mode 100644
index 0000000..a77dfa6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-multiarray.json
@@ -0,0 +1,42 @@
+[
+       {
+               "id": 1,
+               "name": "John Doe",
+               "balance": 4750.89,
+               "address": "123 My Street",
+               "city": "My City", 
+               "state": "MS",
+               "zipCode": "11111",
+               "country": "USA"
+       }, {
+               "id": 2,
+               "name": "Jane Doe",
+               "balance": 4820.09,
+               "address": "321 Your Street",
+               "city": "Your City", 
+               "state": "NY",
+               "zipCode": "33333",
+               "country": "USA"
+       }
+]
+[
+       {
+               "id": 3,
+               "name": "Maria Doe",
+               "balance": 4750.89,
+               "address": "123 My Street",
+               "city": "My City",
+               "state": "ME",
+               "zipCode": "11111",
+               "country": "USA"
+       }, {
+       "id": 4,
+       "name": "Xi Doe",
+       "balance": 4820.09,
+       "address": "321 Your Street",
+       "city": "Your City",
+       "state": "NV",
+       "zipCode": "33333",
+       "country": "USA"
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-multiline.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-multiline.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-multiline.json
new file mode 100644
index 0000000..472a0a0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-multiline.json
@@ -0,0 +1,19 @@
+       {
+               "id": 1,
+               "name": "John Doe",
+               "balance": 4750.89,
+               "address": "123 My Street",
+               "city": "My City", 
+               "state": "MS",
+               "zipCode": "11111",
+               "country": "USA"
+       } {
+               "id": 2,
+               "name": "Jane Doe",
+               "balance": 4820.09,
+               "address": "321 Your Street",
+               "city": "Your City", 
+               "state": "NY",
+               "zipCode": "33333",
+               "country": "USA"
+       }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0289ca71/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-oneline.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-oneline.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-oneline.json
new file mode 100644
index 0000000..6a0dbd8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-oneline.json
@@ -0,0 +1,2 @@
+{ "id": 1, "name": "John Doe", "balance": 4750.89, "address": "123 My Street", 
"city": "My City", "state": "MS", "zipCode": "11111", "country": "USA" }
+{ "id": 2, "name": "Jane Doe", "balance": 4820.09, "address": "321 Your 
Street", "city": "Your City", "state": "NY", "zipCode": "33333", "country": 
"USA"}

Reply via email to