Repository: nifi
Updated Branches:
  refs/heads/develop bbacc3d8c -> b31c76bf3


NIFI-821 Support Merging of Avro
NIFI-821 Changing error logging to debug, changing mime-type, adding a 
try-close for Avro reader, changing new ArrayList to Collections.emptyList()


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b31c76bf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b31c76bf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b31c76bf

Branch: refs/heads/develop
Commit: b31c76bf30f847842e3ef1c6f928b2e93f517b34
Parents: bbacc3d
Author: Bryan Bende <[email protected]>
Authored: Wed Aug 5 14:54:02 2015 -0500
Committer: Bryan Bende <[email protected]>
Committed: Wed Aug 12 10:30:30 2015 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   2 +
 .../nifi/processors/standard/MergeContent.java  | 157 ++++++++++++++++++-
 .../processors/standard/TestMergeContent.java   | 139 ++++++++++++++++
 .../test/resources/TestMergeContent/place.avsc  |   7 +
 .../test/resources/TestMergeContent/user.avsc   |   9 ++
 5 files changed, 313 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b31c76bf/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 4d1e542..9946122 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -214,6 +214,8 @@ language governing permissions and limitations under the 
License. -->
                         
<exclude>src/test/resources/TestMergeContent/demarcate</exclude>
                         
<exclude>src/test/resources/TestMergeContent/foot</exclude>
                         
<exclude>src/test/resources/TestMergeContent/head</exclude>
+                        
<exclude>src/test/resources/TestMergeContent/user.avsc</exclude>
+                        
<exclude>src/test/resources/TestMergeContent/place.avsc</exclude>
                         
<exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude>
                         
<exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude>
                         
<exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/b31c76bf/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 65f4124..c8f9bbe 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -23,6 +23,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -32,10 +33,19 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -149,6 +159,7 @@ public class MergeContent extends BinFiles {
     public static final String MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE = 
"FlowFile Stream, v2";
     public static final String MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE = "FlowFile 
Tar, v1";
     public static final String MERGE_FORMAT_CONCAT_VALUE = "Binary 
Concatenation";
+    public static final String MERGE_FORMAT_AVRO_VALUE = "Avro";
 
     public static final AllowableValue MERGE_FORMAT_TAR = new AllowableValue(
             MERGE_FORMAT_TAR_VALUE,
@@ -179,6 +190,10 @@ public class MergeContent extends BinFiles {
             MERGE_FORMAT_CONCAT_VALUE,
             MERGE_FORMAT_CONCAT_VALUE,
             "The contents of all FlowFiles will be concatenated together into 
a single FlowFile");
+    public static final AllowableValue MERGE_FORMAT_AVRO = new AllowableValue(
+            MERGE_FORMAT_AVRO_VALUE,
+            MERGE_FORMAT_AVRO_VALUE,
+            "The Avro contents of all FlowFiles will be concatenated together 
into a single FlowFile");
 
     public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only 
Common Attributes";
     public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All 
Unique Attributes";
@@ -200,7 +215,7 @@ public class MergeContent extends BinFiles {
             .required(true)
             .name("Merge Format")
             .description("Determines the format that will be used to merge the 
content.")
-            .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, 
MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, 
MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT)
+            .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, 
MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, 
MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO)
             .defaultValue(MERGE_FORMAT_CONCAT.getValue())
             .build();
     public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new 
PropertyDescriptor.Builder()
@@ -400,6 +415,9 @@ public class MergeContent extends BinFiles {
             case MERGE_FORMAT_CONCAT_VALUE:
                 merger = new BinaryConcatenationMerge();
                 break;
+            case MERGE_FORMAT_AVRO_VALUE:
+                merger = new AvroMerge();
+                break;
             default:
                 throw new AssertionError();
         }
@@ -451,6 +469,12 @@ public class MergeContent extends BinFiles {
         getLogger().info("Merged {} into {}", new Object[]{inputDescription, 
bundle});
         session.transfer(bundle, REL_MERGED);
 
+        for (final FlowFileSessionWrapper unmerged : 
merger.getUnmergedFlowFiles()) {
+            final ProcessSession unmergedSession = unmerged.getSession();
+            final FlowFile unmergedCopy = 
unmergedSession.clone(unmerged.getFlowFile());
+            unmergedSession.transfer(unmergedCopy, REL_FAILURE);
+        }
+
         // We haven't committed anything, parent will take care of it
         return false;
     }
@@ -628,6 +652,11 @@ public class MergeContent extends BinFiles {
         public String getMergedContentType() {
             return mimeType;
         }
+
+        @Override
+        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+            return Collections.emptyList();
+        }
     }
 
     private List<FlowFile> getFlowFiles(final List<FlowFileSessionWrapper> 
sessionWrappers) {
@@ -714,6 +743,11 @@ public class MergeContent extends BinFiles {
         public String getMergedContentType() {
             return "application/tar";
         }
+
+        @Override
+        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+            return Collections.emptyList();
+        }
     }
 
     private class FlowFileStreamMerger implements MergeBin {
@@ -771,6 +805,11 @@ public class MergeContent extends BinFiles {
         public String getMergedContentType() {
             return mimeType;
         }
+
+        @Override
+        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+            return Collections.emptyList();
+        }
     }
 
     private class ZipMerge implements MergeBin {
@@ -821,6 +860,120 @@ public class MergeContent extends BinFiles {
         public String getMergedContentType() {
             return "application/zip";
         }
+
+        @Override
+        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+            return Collections.emptyList();
+        }
+    }
+
+    private class AvroMerge implements MergeBin {
+
+        private List<FlowFileSessionWrapper> unmerged = new ArrayList<>();
+
+        @Override
+        public FlowFile merge(ProcessContext context, final ProcessSession 
session, final List<FlowFileSessionWrapper> wrappers) {
+
+            final Map<String, byte[]> metadata = new TreeMap<>();
+            final ObjectHolder<Schema> schema = new ObjectHolder<>(null);
+            final ObjectHolder<String> inputCodec = new ObjectHolder<>(null);
+            final DataFileWriter<GenericRecord> writer = new 
DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+
+            // we don't pass the parents to the #create method because the 
parents belong to different sessions
+            FlowFile bundle = session.create();
+            bundle = session.write(bundle, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream rawOut) throws 
IOException {
+                    try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
+                        for (final FlowFileSessionWrapper wrapper : wrappers) {
+                            final FlowFile flowFile = wrapper.getFlowFile();
+                            wrapper.getSession().read(flowFile, new 
InputStreamCallback() {
+                                @Override
+                                public void process(InputStream in) throws 
IOException {
+                                    boolean canMerge = true;
+                                    try (DataFileStream<GenericRecord> reader 
= new DataFileStream<>(in,
+                                            new 
GenericDatumReader<GenericRecord>())) {
+                                        if (schema.get() == null) {
+                                            // this is the first file - set up 
the writer, and store the
+                                            // Schema & metadata we'll use.
+                                            schema.set(reader.getSchema());
+                                            for (String key : 
reader.getMetaKeys()) {
+                                                if 
(!DataFileWriter.isReservedMeta(key)) {
+                                                    byte[] metadatum = 
reader.getMeta(key);
+                                                    metadata.put(key, 
metadatum);
+                                                    writer.setMeta(key, 
metadatum);
+                                                }
+                                            }
+                                            
inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
+                                            if (inputCodec.get() == null) {
+                                                
inputCodec.set(DataFileConstants.NULL_CODEC);
+                                            }
+                                            
writer.setCodec(CodecFactory.fromString(inputCodec.get()));
+                                            writer.create(schema.get(), out);
+                                        } else {
+                                            // check that we're appending to 
the same schema
+                                            if 
(!schema.get().equals(reader.getSchema())) {
+                                                getLogger().debug("Input file 
{} has different schema - {}, not merging",
+                                                        new 
Object[]{flowFile.getId(), reader.getSchema().getName()});
+                                                canMerge = false;
+                                                unmerged.add(wrapper);
+                                            }
+
+                                            // check that we're appending to 
the same metadata
+                                            for (String key : 
reader.getMetaKeys()) {
+                                                if 
(!DataFileWriter.isReservedMeta(key)) {
+                                                    byte[] metadatum = 
reader.getMeta(key);
+                                                    byte[] writersMetadatum = 
metadata.get(key);
+                                                    if 
(!Arrays.equals(metadatum, writersMetadatum)) {
+                                                        
getLogger().debug("Input file {} has different non-reserved metadata, not 
merging",
+                                                                new 
Object[]{flowFile.getId()});
+                                                        canMerge = false;
+                                                        unmerged.add(wrapper);
+                                                    }
+                                                }
+                                            }
+
+                                            // check that we're appending to 
the same codec
+                                            String thisCodec = 
reader.getMetaString(DataFileConstants.CODEC);
+                                            if (thisCodec == null) {
+                                                thisCodec = 
DataFileConstants.NULL_CODEC;
+                                            }
+                                            if 
(!inputCodec.get().equals(thisCodec)) {
+                                                getLogger().debug("Input file 
{} has different codec, not merging",
+                                                        new 
Object[]{flowFile.getId()});
+                                                canMerge = false;
+                                                unmerged.add(wrapper);
+                                            }
+                                        }
+
+                                        // write the Avro content from the 
current FlowFile to the merged OutputStream
+                                        if (canMerge) {
+                                            writer.appendAllFrom(reader, 
false);
+                                        }
+                                    }
+                                }
+                            });
+                        }
+                        writer.flush();
+                    } finally {
+                        writer.close();
+                    }
+                }
+            });
+
+            session.getProvenanceReporter().join(getFlowFiles(wrappers), 
bundle);
+            return bundle;
+        }
+
+        @Override
+        public String getMergedContentType() {
+            return "application/avro-binary";
+        }
+
+        @Override
+        public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
+            return unmerged;
+        }
     }
 
     private static class KeepUniqueAttributeStrategy implements 
AttributeStrategy {
@@ -911,6 +1064,8 @@ public class MergeContent extends BinFiles {
         FlowFile merge(ProcessContext context, ProcessSession session, 
List<FlowFileSessionWrapper> flowFiles);
 
         String getMergedContentType();
+
+        List<FlowFileSessionWrapper> getUnmergedFlowFiles();
     }
 
     private interface AttributeStrategy {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b31c76bf/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index daad455..c53c488 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -32,6 +32,16 @@ import java.util.Map;
 import java.util.Set;
 import java.util.zip.ZipInputStream;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 import org.apache.commons.compress.archivers.ArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.io.IOUtils;
@@ -39,6 +49,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
@@ -56,6 +67,134 @@ public class TestMergeContent {
     }
 
     @Test
+    public void testSimpleAvroConcat() throws IOException, 
InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
+        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_AVRO);
+
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/TestMergeContent/user.avsc"));
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("name", "Ben");
+        user2.put("favorite_number", 7);
+        user2.put("favorite_color", "red");
+
+        final GenericRecord user3 = new GenericData.Record(schema);
+        user3.put("name", "John");
+        user3.put("favorite_number", 5);
+        user3.put("favorite_color", "blue");
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, 
datumWriter);
+        final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, 
datumWriter);
+        final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, 
datumWriter);
+
+        runner.enqueue(out1.toByteArray());
+        runner.enqueue(out2.toByteArray());
+        runner.enqueue(out3.toByteArray());
+
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+
+        // create a reader for the merged contet
+        byte[] data = runner.getContentAsByteArray(bundle);
+        final Map<String, GenericRecord> users = getGenericRecordMap(data, 
schema, "name");
+
+        Assert.assertEquals(3, users.size());
+        Assert.assertTrue(users.containsKey("Alyssa"));
+        Assert.assertTrue(users.containsKey("Ben"));
+        Assert.assertTrue(users.containsKey("John"));
+    }
+
+    @Test
+    public void testAvroConcatWithDifferentSchemas() throws IOException, 
InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
+        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_AVRO);
+
+        final Schema schema1 = new Schema.Parser().parse(new 
File("src/test/resources/TestMergeContent/user.avsc"));
+        final Schema schema2 = new Schema.Parser().parse(new 
File("src/test/resources/TestMergeContent/place.avsc"));
+
+        final GenericRecord record1 = new GenericData.Record(schema1);
+        record1.put("name", "Alyssa");
+        record1.put("favorite_number", 256);
+
+        final GenericRecord record2 = new GenericData.Record(schema2);
+        record2.put("name", "Some Place");
+
+        final GenericRecord record3 = new GenericData.Record(schema1);
+        record3.put("name", "John");
+        record3.put("favorite_number", 5);
+        record3.put("favorite_color", "blue");
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema1);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema1, 
record1, datumWriter);
+        final ByteArrayOutputStream out2 = serializeAvroRecord(schema2, 
record2, datumWriter);
+        final ByteArrayOutputStream out3 = serializeAvroRecord(schema1, 
record3, datumWriter);
+
+        runner.enqueue(out1.toByteArray());
+        runner.enqueue(out2.toByteArray());
+        runner.enqueue(out3.toByteArray());
+
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/avro-binary");
+
+        final byte[] data = runner.getContentAsByteArray(bundle);
+        final Map<String, GenericRecord> users = getGenericRecordMap(data, 
schema1, "name");
+        Assert.assertEquals(2, users.size());
+        Assert.assertTrue(users.containsKey("Alyssa"));
+        Assert.assertTrue(users.containsKey("John"));
+
+        final MockFlowFile failure = 
runner.getFlowFilesForRelationship(MergeContent.REL_FAILURE).get(0);
+        final byte[] failureData = runner.getContentAsByteArray(failure);
+        final Map<String, GenericRecord> places = 
getGenericRecordMap(failureData, schema2, "name");
+        Assert.assertEquals(1, places.size());
+        Assert.assertTrue(places.containsKey("Some Place"));
+    }
+
+    private Map<String, GenericRecord> getGenericRecordMap(byte[] data, Schema 
schema, String key) throws IOException {
+        // create a reader for the merged contet
+        DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>(schema);
+        SeekableByteArrayInput input = new SeekableByteArrayInput(data);
+        DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<>(input, datumReader);
+
+        // read all the records into a map to verify all the records are there
+        Map<String,GenericRecord> records = new HashMap<>();
+        while (dataFileReader.hasNext()) {
+            GenericRecord user = dataFileReader.next();
+            records.put(user.get(key).toString(), user);
+        }
+        return records;
+    }
+
+    private ByteArrayOutputStream serializeAvroRecord(Schema schema, 
GenericRecord user2, DatumWriter<GenericRecord> datumWriter) throws IOException 
{
+        ByteArrayOutputStream out2 = new ByteArrayOutputStream();
+        DataFileWriter<GenericRecord> dataFileWriter2 = new 
DataFileWriter<GenericRecord>(datumWriter);
+        dataFileWriter2.create(schema, out2);
+        dataFileWriter2.append(user2);
+        dataFileWriter2.close();
+        return out2;
+    }
+
+    @Test
     public void testSimpleBinaryConcat() throws IOException, 
InterruptedException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
         runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");

http://git-wip-us.apache.org/repos/asf/nifi/blob/b31c76bf/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/place.avsc
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/place.avsc
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/place.avsc
new file mode 100644
index 0000000..a39daa9
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/place.avsc
@@ -0,0 +1,7 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "Place",
+ "fields": [
+     {"name": "name", "type": "string"}
+ ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b31c76bf/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/user.avsc
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/user.avsc
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/user.avsc
new file mode 100644
index 0000000..117ea70
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/user.avsc
@@ -0,0 +1,9 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}

Reply via email to