Repository: nifi
Updated Branches:
  refs/heads/master 100799941 -> b36841e21


NIFI-919 Adding SplitAvro processor that splits binary datafiles into smaller 
datafiles, or bare records.
- Adding documentation about bare record use, renaming Split Size to Output 
Size, and adding a test case with 0 records
- Removing validators on properties that have allowable values, using positive 
integer validator for Output Size, and fixing type on processor description


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b36841e2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b36841e2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b36841e2

Branch: refs/heads/master
Commit: b36841e2134fc2d3aea0cafdf84ea9b0bafd3e50
Parents: 1007999
Author: Bryan Bende <[email protected]>
Authored: Thu Sep 3 14:46:44 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Wed Sep 16 10:11:30 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/avro/SplitAvro.java  | 370 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   3 +-
 .../nifi/processors/avro/TestSplitAvro.java     | 317 ++++++++++++++++
 3 files changed, 689 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b36841e2/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
new file mode 100644
index 0000000..3b344b5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.avro;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.util.ObjectHolder;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@SideEffectFree
+@SupportsBatching
+@Tags({ "avro", "split" })
+@CapabilityDescription("Splits a binary encoded Avro datafile into smaller 
files based on the configured Output Size. The Output Strategy determines if " +
+        "the smaller files will be Avro datafiles, or bare Avro records with 
metadata in the FlowFile attributes. The output will always be binary encoded.")
+public class SplitAvro extends AbstractProcessor {
+
+    public static final String RECORD_SPLIT_VALUE = "Record";
+    public static final AllowableValue RECORD_SPLIT = new 
AllowableValue(RECORD_SPLIT_VALUE, RECORD_SPLIT_VALUE, "Split at Record 
boundaries");
+
+    public static final PropertyDescriptor SPLIT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Split Strategy")
+            .description("The strategy for splitting the incoming datafile. 
The Record strategy will read the incoming datafile by de-serializing each 
record.")
+            .required(true)
+            .allowableValues(RECORD_SPLIT)
+            .defaultValue(RECORD_SPLIT.getValue())
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Output Size")
+            .description("The number of Avro records to include per split 
file. In cases where the incoming file has less records than the Output Size, 
or " +
+                    "when the total number of records does not divide evenly 
by the Output Size, it is possible to get a split file with less records.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .required(true)
+            .defaultValue("1")
+            .build();
+
+    public static final String DATAFILE_OUTPUT_VALUE = "Datafile";
+    public static final String BARE_RECORD_OUTPUT_VALUE = "Bare Record";
+
+    public static final AllowableValue DATAFILE_OUTPUT = new 
AllowableValue(DATAFILE_OUTPUT_VALUE, DATAFILE_OUTPUT_VALUE, "Avro's object 
container file format");
+    public static final AllowableValue BARE_RECORD_OUTPUT = new 
AllowableValue(BARE_RECORD_OUTPUT_VALUE, BARE_RECORD_OUTPUT_VALUE, "Bare Avro 
records");
+
+    public static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Output Strategy")
+            .description("Determines the format of the output. Either Avro 
Datafile, or bare record. Bare record output is only intended for use with 
systems " +
+                    "that already require it, and shouldn't be needed for 
normal use.")
+            .required(true)
+            .allowableValues(DATAFILE_OUTPUT, BARE_RECORD_OUTPUT)
+            .defaultValue(DATAFILE_OUTPUT.getValue())
+            .build();
+
+    public static final PropertyDescriptor TRANSFER_METADATA = new 
PropertyDescriptor.Builder()
+            .name("Transfer Metadata")
+            .description("Whether or not to transfer metadata from the parent 
datafile to the children. If the Output Strategy is Bare Record, " +
+                    "then the metadata will be stored as FlowFile attributes, 
otherwise it will be in the Datafile header.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original FlowFile that was split. If the 
FlowFile fails processing, nothing will be sent to " +
+                    "this relationship")
+            .build();
+    public static final Relationship REL_SPLIT = new Relationship.Builder()
+            .name("split")
+            .description("All new files split from the original FlowFile will 
be routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for 
example, the FlowFile is not valid Avro), " +
+                    "it will be routed to this relationship")
+            .build();
+
+    // Metadata keys that are not transferred to split files when output 
strategy is datafile
+    // Avro will write this key/values pairs on its own
+    static final Set<String> RESERVED_METADATA;
+    static {
+        Set<String> reservedMetadata = new HashSet<>();
+        reservedMetadata.add("avro.schema");
+        reservedMetadata.add("avro.codec");
+        RESERVED_METADATA = Collections.unmodifiableSet(reservedMetadata);
+    }
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(SPLIT_STRATEGY);
+        properties.add(OUTPUT_SIZE);
+        properties.add(OUTPUT_STRATEGY);
+        properties.add(TRANSFER_METADATA);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_SPLIT);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int splitSize = context.getProperty(OUTPUT_SIZE).asInteger();
+        final boolean transferMetadata = 
context.getProperty(TRANSFER_METADATA).asBoolean();
+
+        SplitWriter splitWriter;
+        final String outputStrategy = 
context.getProperty(OUTPUT_STRATEGY).getValue();
+        switch (outputStrategy) {
+            case DATAFILE_OUTPUT_VALUE:
+                splitWriter = new DatafileSplitWriter(transferMetadata);
+                break;
+            case BARE_RECORD_OUTPUT_VALUE:
+                splitWriter = new BareRecordSplitWriter();
+                break;
+            default:
+                throw new AssertionError();
+        }
+
+        Splitter splitter;
+        final String splitStrategy = 
context.getProperty(SPLIT_STRATEGY).getValue();
+        switch (splitStrategy) {
+            case RECORD_SPLIT_VALUE:
+                splitter = new RecordSplitter(splitSize, transferMetadata);
+                break;
+            default:
+                throw new AssertionError();
+        }
+
+        try {
+            final List<FlowFile> splits = splitter.split(session, flowFile, 
splitWriter);
+            session.transfer(splits, REL_SPLIT);
+            session.transfer(flowFile, REL_ORIGINAL);
+        } catch (ProcessException e) {
+            getLogger().error("Failed to split {} due to {}", new Object[] 
{flowFile, e.getMessage()}, e);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    /**
+     * Able to split an incoming Avro datafile into multiple smaller FlowFiles.
+     */
+    private interface Splitter {
+        List<FlowFile> split(final ProcessSession session, final FlowFile 
originalFlowFile, final SplitWriter splitWriter);
+    }
+
+    /**
+     * Splits the incoming Avro datafile into batches of records by reading 
and de-serializing each record.
+     */
+    private class RecordSplitter implements Splitter {
+
+        private final int splitSize;
+        private final boolean transferMetadata;
+
+        public RecordSplitter(final int splitSize, final boolean 
transferMetadata) {
+            this.splitSize = splitSize;
+            this.transferMetadata = transferMetadata;
+        }
+
+        @Override
+        public List<FlowFile> split(final ProcessSession session, final 
FlowFile originalFlowFile, final SplitWriter splitWriter) {
+            final List<FlowFile> childFlowFiles = new ArrayList<>();
+            final ObjectHolder<GenericRecord> recordHolder = new 
ObjectHolder<>(null);
+
+            session.read(originalFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(InputStream rawIn) throws IOException {
+                    try (final InputStream in = new BufferedInputStream(rawIn);
+                         final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
+                        final ObjectHolder<String> codec = new 
ObjectHolder<>(reader.getMetaString(DataFileConstants.CODEC));
+                        if (codec.get() == null) {
+                            codec.set(DataFileConstants.NULL_CODEC);
+                        }
+
+                        // while records are left, start a new split by 
spawning a FlowFile
+                        while (reader.hasNext()) {
+                            FlowFile childFlowFile = 
session.create(originalFlowFile);
+                            childFlowFile = session.write(childFlowFile, new 
OutputStreamCallback() {
+                                @Override
+                                public void process(OutputStream rawOut) 
throws IOException {
+                                    try (final BufferedOutputStream out = new 
BufferedOutputStream(rawOut)) {
+                                        splitWriter.init(reader, codec.get(), 
out);
+
+                                        // append to the current FlowFile 
until no more records, or splitSize is reached
+                                        int recordCount = 0;
+                                        while (reader.hasNext() && recordCount 
< splitSize) {
+                                            
recordHolder.set(reader.next(recordHolder.get()));
+                                            
splitWriter.write(recordHolder.get());
+                                            recordCount++;
+                                        }
+                                        splitWriter.flush();
+                                    } finally {
+                                        splitWriter.close();
+                                    }
+                                }
+                            });
+
+                            // would prefer this to be part of the 
SplitWriter, but putting the metadata in FlowFile attributes
+                            // can't be done inside of an OutputStream 
callback which is where the splitWriter is used
+                            if (splitWriter instanceof BareRecordSplitWriter 
&& transferMetadata) {
+                                final Map<String,String> metadata = new 
HashMap<>();
+                                for (String metaKey : reader.getMetaKeys()) {
+                                    metadata.put(metaKey, 
reader.getMetaString(metaKey));
+                                }
+                                childFlowFile = 
session.putAllAttributes(childFlowFile, metadata);
+                            }
+
+                            childFlowFiles.add(childFlowFile);
+                        }
+                    }
+                }
+            });
+
+            return childFlowFiles;
+        }
+    }
+
+    /**
+     * Writes records from the reader to the given output stream.
+     */
+    private interface SplitWriter {
+        void init(final DataFileStream<GenericRecord> reader, final String 
codec, final OutputStream out) throws IOException;
+        void write(final GenericRecord datum) throws IOException;
+        void flush() throws IOException;
+        void close() throws IOException;
+    }
+
+    /**
+     * Writes a binary Avro Datafile to the OutputStream.
+     */
+    private class DatafileSplitWriter implements SplitWriter {
+
+        private final boolean transferMetadata;
+        private DataFileWriter<GenericRecord> writer;
+
+        public DatafileSplitWriter(final boolean transferMetadata) {
+            this.transferMetadata = transferMetadata;
+        }
+
+        @Override
+        public void init(final DataFileStream<GenericRecord> reader, final 
String codec, final OutputStream out) throws IOException {
+            writer = new DataFileWriter<>(new 
GenericDatumWriter<GenericRecord>());
+
+            if (transferMetadata) {
+                for (String metaKey : reader.getMetaKeys()) {
+                    if (!RESERVED_METADATA.contains(metaKey)) {
+                        writer.setMeta(metaKey, reader.getMeta(metaKey));
+                    }
+                }
+            }
+
+            writer.setCodec(CodecFactory.fromString(codec));
+            writer.create(reader.getSchema(), out);
+        }
+
+        @Override
+        public void write(final GenericRecord datum) throws IOException {
+            writer.append(datum);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            writer.flush();
+        }
+
+        @Override
+        public void close() throws IOException {
+            writer.close();
+        }
+    }
+
+    /**
+     * Writes bare Avro records to the OutputStream.
+     */
+    private class BareRecordSplitWriter implements SplitWriter {
+        private Encoder encoder;
+        private DatumWriter<GenericRecord> writer;
+
+        @Override
+        public void init(final DataFileStream<GenericRecord> reader, final 
String codec, final OutputStream out) throws IOException {
+            writer = new GenericDatumWriter<>(reader.getSchema());
+            encoder = EncoderFactory.get().binaryEncoder(out, null);
+        }
+
+        @Override
+        public void write(GenericRecord datum) throws IOException {
+            writer.write(datum, encoder);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            encoder.flush();
+        }
+
+        @Override
+        public void close() throws IOException {
+            // nothing to do
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b36841e2/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 192ec00..7ab13fa 100644
--- 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.avro.ConvertAvroToJSON
-org.apache.nifi.processors.avro.ExtractAvroMetadata
\ No newline at end of file
+org.apache.nifi.processors.avro.ExtractAvroMetadata
+org.apache.nifi.processors.avro.SplitAvro

http://git-wip-us.apache.org/repos/asf/nifi/blob/b36841e2/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
new file mode 100644
index 0000000..73da818
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestSplitAvro {
+
+    static final String META_KEY1 = "metaKey1";
+    static final String META_KEY2 = "metaKey2";
+    static final String META_KEY3 = "metaKey3";
+
+    static final String META_VALUE1 = "metaValue1";
+    static final long META_VALUE2 = Long.valueOf(1234567);
+    static final String META_VALUE3 = "metaValue3";
+
+    private Schema schema;
+    private ByteArrayOutputStream users;
+
+    @Before
+    public void setup() throws IOException {
+        this.users = new ByteArrayOutputStream();
+        this.schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+        createUsers(100, users);
+    }
+
+    void createUsers(final int numUsers, final ByteArrayOutputStream users) 
throws IOException {
+        final List<GenericRecord> userList = new ArrayList<>();
+        for (int i=0; i < numUsers; i++) {
+            final GenericRecord user = new GenericData.Record(schema);
+            user.put("name", "name" + i);
+            user.put("favorite_number", i);
+            userList.add(user);
+        }
+
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema))) {
+            dataFileWriter.setMeta(META_KEY1, META_VALUE1);
+            dataFileWriter.setMeta(META_KEY2, META_VALUE2);
+            dataFileWriter.setMeta(META_KEY3, META_VALUE3.getBytes("UTF-8"));
+
+            dataFileWriter.create(schema, users);
+            for (GenericRecord user : userList) {
+                dataFileWriter.append(user);
+            }
+            dataFileWriter.flush();
+        }
+    }
+
+    @Test
+    public void testRecordSplitWithNoIncomingRecords() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        createUsers(0, out);
+
+        runner.enqueue(out.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 0);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testRecordSplitDatafileOutputWithSingleRecords() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+
+        runner.enqueue(users.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
+        checkDataFileSplitSize(flowFiles, 1, true);
+    }
+
+    @Test
+    public void testRecordSplitDatafileOutputWithMultipleRecords() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+        runner.setProperty(SplitAvro.OUTPUT_SIZE, "20");
+
+        runner.enqueue(users.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 5);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
+        checkDataFileSplitSize(flowFiles, 20, true);
+    }
+
+    @Test
+    public void testRecordSplitDatafileOutputWithSplitSizeLarger() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+        runner.setProperty(SplitAvro.OUTPUT_SIZE, "200");
+
+        runner.enqueue(users.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 1);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
+        checkDataFileSplitSize(flowFiles, 100, true);
+    }
+
+    @Test
+    public void testRecordSplitDatafileOutputWithoutMetadata() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+        runner.setProperty(SplitAvro.TRANSFER_METADATA, "false");
+
+        runner.enqueue(users.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
+        checkDataFileSplitSize(flowFiles, 1, false);
+
+        for (final MockFlowFile flowFile : flowFiles) {
+            try (final ByteArrayInputStream in = new 
ByteArrayInputStream(flowFile.toByteArray());
+                 final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+                Assert.assertFalse(reader.getMetaKeys().contains(META_KEY1));
+                Assert.assertFalse(reader.getMetaKeys().contains(META_KEY2));
+                Assert.assertFalse(reader.getMetaKeys().contains(META_KEY3));
+            }
+        }
+    }
+
+    @Test
+    public void testRecordSplitBareOutputWithSingleRecords() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+        runner.setProperty(SplitAvro.OUTPUT_STRATEGY, 
SplitAvro.BARE_RECORD_OUTPUT);
+
+        runner.enqueue(users.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
+
+        checkBareRecordsSplitSize(flowFiles, 1, true);
+    }
+
+    @Test
+    public void testRecordSplitBareOutputWithMultipleRecords() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+        runner.setProperty(SplitAvro.OUTPUT_STRATEGY, 
SplitAvro.BARE_RECORD_OUTPUT);
+        runner.setProperty(SplitAvro.OUTPUT_SIZE, "20");
+
+        runner.enqueue(users.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 5);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
+
+        checkBareRecordsSplitSize(flowFiles, 20, true);
+    }
+
+    @Test
+    public void testRecordSplitBareOutputWithoutMetadata() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+        runner.setProperty(SplitAvro.OUTPUT_STRATEGY, 
SplitAvro.BARE_RECORD_OUTPUT);
+        runner.setProperty(SplitAvro.OUTPUT_SIZE, "20");
+        runner.setProperty(SplitAvro.TRANSFER_METADATA, "false");
+
+        runner.enqueue(users.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 5);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
+
+        checkBareRecordsSplitSize(flowFiles, 20, false);
+
+        for (final MockFlowFile flowFile : flowFiles) {
+            
Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY1));
+            
Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY2));
+            
Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY3));
+        }
+    }
+
+    @Test
+    public void testFailure() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
+        runner.setProperty(SplitAvro.OUTPUT_SIZE, "200");
+
+        runner.enqueue("not avro".getBytes("UTF-8"));
+        runner.run();
+
+        runner.assertTransferCount(SplitAvro.REL_SPLIT, 0);
+        runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 0);
+        runner.assertTransferCount(SplitAvro.REL_FAILURE, 1);
+    }
+
+    private void checkBareRecordsSplitSize(final List<MockFlowFile> flowFiles, 
final int expectedRecordsPerSplit, final boolean checkMetadata) throws 
IOException {
+        for (final MockFlowFile flowFile : flowFiles) {
+            try (final ByteArrayInputStream in = new 
ByteArrayInputStream(flowFile.toByteArray())) {
+                final DatumReader<GenericRecord> reader = new 
GenericDatumReader<>(schema);
+                final Decoder decoder = DecoderFactory.get().binaryDecoder(in, 
null);
+
+                int count = 0;
+                GenericRecord record = reader.read(null, decoder);
+                try {
+                    while (record != null) {
+                        Assert.assertNotNull(record.get("name"));
+                        Assert.assertNotNull(record.get("favorite_number"));
+                        count++;
+                        record = reader.read(record, decoder);
+                    }
+                } catch (EOFException eof) {
+                    // expected
+                }
+                Assert.assertEquals(expectedRecordsPerSplit, count);
+            }
+
+            if (checkMetadata) {
+                
Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY1));
+                
Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY2));
+                
Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY3));
+            }
+        }
+    }
+
+    private void checkDataFileSplitSize(List<MockFlowFile> flowFiles, int 
expectedRecordsPerSplit, boolean checkMetadata) throws IOException {
+        for (final MockFlowFile flowFile : flowFiles) {
+            try (final ByteArrayInputStream in = new 
ByteArrayInputStream(flowFile.toByteArray());
+                final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
+                int count = 0;
+                GenericRecord record = null;
+                while (reader.hasNext()) {
+                    record = reader.next(record);
+                    Assert.assertNotNull(record.get("name"));
+                    Assert.assertNotNull(record.get("favorite_number"));
+                    count++;
+                }
+                Assert.assertEquals(expectedRecordsPerSplit, count);
+
+                if (checkMetadata) {
+                    Assert.assertEquals(META_VALUE1, 
reader.getMetaString(META_KEY1));
+                    Assert.assertEquals(META_VALUE2, 
reader.getMetaLong(META_KEY2));
+                    Assert.assertEquals(META_VALUE3, new 
String(reader.getMeta(META_KEY3), "UTF-8"));
+                }
+            }
+        }
+    }
+
+    private void checkDataFileTotalSize(List<MockFlowFile> flowFiles, int 
expectedTotalRecords) throws IOException {
+        int count = 0;
+        for (final MockFlowFile flowFile : flowFiles) {
+            try (final ByteArrayInputStream in = new 
ByteArrayInputStream(flowFile.toByteArray());
+                 final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
+                GenericRecord record = null;
+                while (reader.hasNext()) {
+                    record = reader.next(record);
+                    Assert.assertNotNull(record.get("name"));
+                    Assert.assertNotNull(record.get("favorite_number"));
+                    count++;
+                }
+            }
+        }
+        Assert.assertEquals(expectedTotalRecords, count);
+    }
+
+}

Reply via email to