Repository: nifi
Updated Branches:
  refs/heads/master a490c29a4 -> 1ac44a0a4


NIFI-1118 adding SplitText features of size limit and header marker characters

Signed-off-by: Mike Moser <[email protected]>

This closes #444


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1ac44a0a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1ac44a0a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1ac44a0a

Branch: refs/heads/master
Commit: 1ac44a0a4be94da5c88a994a39b7c803e12a291b
Parents: a490c29
Author: Mark Bean <[email protected]>
Authored: Sun May 15 20:07:04 2016 -0400
Committer: Mike Moser <[email protected]>
Committed: Tue Jun 7 11:37:23 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/SplitText.java     | 432 +++++++++++++------
 .../nifi/processors/standard/TestSplitText.java | 297 +++++++++++++
 2 files changed, 590 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac44a0a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index 39eb4e8..0f1abfd 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -16,11 +16,14 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -41,10 +44,13 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -56,7 +62,9 @@ import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.util.IntegerHolder;
+import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.ObjectHolder;
 
 @EventDriven
@@ -64,9 +72,15 @@ import org.apache.nifi.util.ObjectHolder;
 @SupportsBatching
 @Tags({"split", "text"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Splits a text file into multiple smaller text files on 
line boundaries, each having up to a configured number of lines.")
+@CapabilityDescription("Splits a text file into multiple smaller text files on 
line boundaries limited by maximum number of lines "
+        + "or total size of fragment. Each output split file will contain no 
more than the configured number of lines or bytes. "
+        + "If both Line Split Count and Maximum Fragment Size are specified, 
the split occurs at whichever limit is reached first. "
+        + "If the first line of a fragment exceeds the Maximum Fragment Size, 
that line will be output in a single split file which "
+        +" exceeds the configured maximum size limit.")
 @WritesAttributes({
     @WritesAttribute(attribute = "text.line.count", description = "The number 
of lines of text from the original FlowFile that were copied to this FlowFile"),
+    @WritesAttribute(attribute = "fragment.size", description = "The number of 
bytes from the original FlowFile that were copied to this FlowFile, " +
+            "including header, if applicable, which is duplicated in each 
split FlowFile"),
     @WritesAttribute(attribute = "fragment.identifier", description = "All 
split FlowFiles produced from the same parent FlowFile will have the same 
randomly generated UUID added for this attribute"),
     @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the split FlowFiles that were created 
from a single parent FlowFile"),
     @WritesAttribute(attribute = "fragment.count", description = "The number 
of split FlowFiles generated from the parent FlowFile"),
@@ -76,6 +90,7 @@ public class SplitText extends AbstractProcessor {
 
     // attribute keys
     public static final String SPLIT_LINE_COUNT = "text.line.count";
+    public static final String FRAGMENT_SIZE = "fragment.size";
     public static final String FRAGMENT_ID = "fragment.identifier";
     public static final String FRAGMENT_INDEX = "fragment.index";
     public static final String FRAGMENT_COUNT = "fragment.count";
@@ -83,9 +98,18 @@ public class SplitText extends AbstractProcessor {
 
     public static final PropertyDescriptor LINE_SPLIT_COUNT = new 
PropertyDescriptor.Builder()
             .name("Line Split Count")
-        .description("The number of lines that will be added to each split 
file (excluding the header, if the Header Line Count property is greater than 
0).")
+            .description("The number of lines that will be added to each split 
file, excluding header lines. " +
+                    "A value of zero requires Maximum Fragment Size to be set, 
and line count will not be considered in determining splits.")
             .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor FRAGMENT_MAX_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Maximum Fragment Size")
+            .description("The maximum size of each split file, including 
header lines. NOTE: in the case where a " +
+                    "single line exceeds this property (including headers, if 
applicable), that line will be output " +
+                    "in a split of its own which exceeds this Maximum Fragment 
Size setting.")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
             .build();
     public static final PropertyDescriptor HEADER_LINE_COUNT = new 
PropertyDescriptor.Builder()
             .name("Header Line Count")
@@ -94,12 +118,19 @@ public class SplitText extends AbstractProcessor {
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .defaultValue("0")
             .build();
+    public static final PropertyDescriptor HEADER_MARKER = new 
PropertyDescriptor.Builder()
+            .name("Header Line Marker Characters")
+            .description("The first character(s) on the line of the datafile 
which signifies a header line. This value is ignored when Header Line Count is 
non-zero. " +
+                    "The first line not containing the Header Line Marker 
Characters and all subsequent lines are considered non-header")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
     public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new 
PropertyDescriptor.Builder()
             .name("Remove Trailing Newlines")
             .description("Whether to remove newlines at the end of each split 
file. This should be false if you intend to merge the split files later. If 
this is set to "
-                + "'true' and a FlowFile is generated that contains only 
'empty lines' (i.e., consists only of \r and \n characters), the FlowFile will 
not be emitted. "
-                + "Note, however, that if the Header Line Count is greater 
than 0, the resultant FlowFile will never be empty as it will consist of the 
header lines, so "
-                + "a FlowFile may be emitted that contians only the header 
lines.")
+                    + "'true' and a FlowFile is generated that contains only 
'empty lines' (i.e., consists only of \r and \n characters), the FlowFile will 
not be emitted. "
+                    + "Note, however, that if header lines are specified, the 
resultant FlowFile will never be empty as it will consist of the header lines, 
so "
+                    + "a FlowFile may be emitted that contains only the header 
lines.")
             .required(true)
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .allowableValues("true", "false")
@@ -126,7 +157,9 @@ public class SplitText extends AbstractProcessor {
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(LINE_SPLIT_COUNT);
+        properties.add(FRAGMENT_MAX_SIZE);
         properties.add(HEADER_LINE_COUNT);
+        properties.add(HEADER_MARKER);
         properties.add(REMOVE_TRAILING_NEWLINES);
         this.properties = Collections.unmodifiableList(properties);
 
@@ -138,6 +171,22 @@ public class SplitText extends AbstractProcessor {
     }
 
     @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        List<ValidationResult> results = new ArrayList<>();
+
+        final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
+                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
+
+        results.add(new ValidationResult.Builder()
+            .subject("Maximum Fragment Size")
+            .valid(!invalidState)
+            .explanation("Property must be specified when Line Split Count is 
0")
+            .build()
+        );
+        return results;
+    }
+
+    @Override
     public Set<Relationship> getRelationships() {
         return relationships;
     }
@@ -147,138 +196,204 @@ public class SplitText extends AbstractProcessor {
         return properties;
     }
 
-    private int readLines(final InputStream in, final int maxNumLines, final 
OutputStream out, final boolean keepAllNewLines, final byte[] 
leadingNewLineBytes) throws IOException {
+    private int readLines(final InputStream in, final int maxNumLines, final 
long maxByteCount, final OutputStream out,
+                          final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
         final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
 
-        int numLines = 0;
         byte[] leadingBytes = leadingNewLineBytes;
+        int numLines = 0;
+        long totalBytes = 0L;
         for (int i = 0; i < maxNumLines; i++) {
-            final EndOfLineMarker eolMarker = locateEndOfLine(in, out, false, 
eolBuffer, leadingBytes);
+            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, 
totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
+            final long bytes = eolMarker.getBytesConsumed();
             leadingBytes = eolMarker.getLeadingNewLineBytes();
 
-            if (keepAllNewLines && out != null) {
+            if (includeLineDelimiter && out != null) {
                 if (leadingBytes != null) {
                     out.write(leadingBytes);
                     leadingBytes = null;
                 }
-
                 eolBuffer.drainTo(out);
             }
-
-            if (eolBuffer.length() > 0 || eolMarker.getBytesConsumed() > 0L) {
-                numLines++;
+            totalBytes += bytes;
+            if (bytes <= 0) {
+                return numLines;
             }
-
-            if (eolMarker.isStreamEnded()) {
+            numLines++;
+            if (totalBytes >= maxByteCount) {
                 break;
             }
         }
-
         return numLines;
     }
 
-    private EndOfLineMarker locateEndOfLine(final InputStream in, final 
OutputStream out, final boolean includeLineDelimiter,
-        final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) 
throws IOException {
-
-        int lastByte = -1;
+    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final 
OutputStream out, final long bytesReadSoFar, final long maxSize,
+                                                   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
         long bytesRead = 0L;
+        final ByteArrayOutputStream buffer;
+        if (out != null) {
+            buffer = new ByteArrayOutputStream();
+        } else {
+            buffer = null;
+        }
         byte[] bytesToWriteFirst = leadingNewLineBytes;
 
+        in.mark(Integer.MAX_VALUE);
         while (true) {
-            in.mark(1);
             final int nextByte = in.read();
 
-            final boolean isNewLineChar = nextByte == '\r' || nextByte == '\n';
-
-            // if we hit end of stream or new line we're done
+            // if we hit end of stream we're done
             if (nextByte == -1) {
-                if (lastByte == '\r') {
-                    eolBuffer.addEndOfLine(true, false);
+                if (buffer != null) {
+                    buffer.writeTo(out);
+                    buffer.close();
                 }
-
-                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);
+                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
             }
 
-            // If we get a character that's not an end-of-line char, then
-            // we need to write out the EOL's that we have buffered (if out != 
null).
-            // Then, we need to reset our EOL buffer because we no longer have 
consecutive EOL's
-            if (!isNewLineChar) {
+            // Verify leading bytes do not violate size limitation
+            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + 
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
+                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
+            }
+            // Write leadingNewLines, if appropriate
+            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst 
!= null) {
+                bytesRead += bytesToWriteFirst.length;
+                buffer.write(bytesToWriteFirst);
+                bytesToWriteFirst = null;
+            }
+            // buffer the output
+            bytesRead++;
+            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
                 if (bytesToWriteFirst != null) {
-                    if (out != null) {
-                        out.write(bytesToWriteFirst);
-                    }
-
-                    bytesToWriteFirst = null;
-                }
-
-                if (out != null) {
-                    eolBuffer.drainTo(out);
+                    buffer.write(bytesToWriteFirst);
                 }
-
+                bytesToWriteFirst = null;
+                eolBuffer.drainTo(buffer);
                 eolBuffer.clear();
+                buffer.write(nextByte);
             }
 
-            // if there's an OutputStream to copy the data to, copy it, if 
appropriate.
-            // "if appropriate" means that it's not a line delimiter or that 
we want to copy line delimiters
-            bytesRead++;
-            if (out != null && (includeLineDelimiter || !isNewLineChar)) {
-                if (bytesToWriteFirst != null) {
-                    out.write(bytesToWriteFirst);
-                    bytesToWriteFirst = null;
+            // check the size limit
+            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
+                in.reset();
+                if (buffer != null) {
+                    buffer.close();
                 }
-
-                out.write(nextByte);
+                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
             }
 
             // if we have a new line, then we're done
             if (nextByte == '\n') {
-                eolBuffer.addEndOfLine(lastByte == '\r', true);
+                if (buffer != null) {
+                    buffer.writeTo(out);
+                    buffer.close();
+                    eolBuffer.addEndOfLine(false, true);
+                }
                 return new EndOfLineMarker(bytesRead, eolBuffer, false, 
bytesToWriteFirst);
             }
 
-            // we didn't get a new line but if last byte was carriage return 
we've reached a new-line.
-            // so we roll back the last byte that we read and return
-            if (lastByte == '\r') {
-                in.reset();
-                bytesRead--;    // we reset the stream by 1 byte so decrement 
the number of bytes read by 1
-                eolBuffer.addEndOfLine(true, false);
-                return new EndOfLineMarker(bytesRead, eolBuffer, false, 
bytesToWriteFirst);
+            // Determine if \n follows \r; in either case, end of line has 
been reached
+            if (nextByte == '\r') {
+                if (buffer != null) {
+                    buffer.writeTo(out);
+                    buffer.close();
+                }
+                in.mark(1);
+                final int lookAheadByte = in.read();
+                if (lookAheadByte == '\n') {
+                    eolBuffer.addEndOfLine(true, true);
+                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, 
false, bytesToWriteFirst);
+                } else {
+                    in.reset();
+                    eolBuffer.addEndOfLine(true, false);
+                    return new EndOfLineMarker(bytesRead, eolBuffer, false, 
bytesToWriteFirst);
+                }
             }
-
-            // keep track of what the last byte was that we read so that we 
can detect \r followed by some other
-            // character.
-            lastByte = nextByte;
         }
     }
 
-    private SplitInfo locateSplitPoint(final InputStream in, final int 
numLines, final boolean keepAllNewLines) throws IOException {
+    private SplitInfo locateSplitPoint(final InputStream in, final int 
numLines, final boolean keepAllNewLines, final long maxSize,
+                                       final long bufferedBytes) throws 
IOException {
         final SplitInfo info = new SplitInfo();
         final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
+        int lastByte = -1;
+        info.lengthBytes = bufferedBytes;
+        long lastEolBufferLength = 0L;
 
-        while (info.lengthLines < numLines) {
-            final boolean keepNewLine = keepAllNewLines || (info.lengthLines 
!= numLines - 1);
-
-            final EndOfLineMarker eolMarker = locateEndOfLine(in, null, 
keepNewLine, eolBuffer, null);
-            long bytesTillNext = eolMarker.getBytesConsumed();
-
-            info.lengthLines++;
-
-            // if this is the last line in the split and we don't want to keep 
all lines
-            // (i.e., we want to remove trailing new lines), then decrement 
out lengthBytes
-            final boolean isLastLine = eolMarker.isStreamEnded() || 
info.lengthLines >= numLines;
-            if (isLastLine && !keepAllNewLines) {
-                bytesTillNext -= eolBuffer.length();
+        while ((info.lengthLines < numLines || (info.lengthLines == numLines 
&& lastByte == '\r'))
+                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || 
info.lengthLines == 0)
+                && eolBuffer.length() < maxSize) {
+            in.mark(1);
+            final int nextByte = in.read();
+            // Check for \n following \r on last line
+            if (info.lengthLines == numLines && lastByte == '\r' && nextByte 
!= '\n') {
+                in.reset();
+                break;
+            }
+            switch (nextByte) {
+                case -1:
+                    info.endOfStream = true;
+                    if (keepAllNewLines) {
+                        info.lengthBytes += eolBuffer.length();
+                    }
+                    if (lastByte != '\r') {
+                        info.lengthLines++;
+                    }
+                    info.bufferedBytes = 0;
+                    return info;
+                case '\r':
+                    eolBuffer.addEndOfLine(true, false);
+                    info.lengthLines++;
+                    info.bufferedBytes = 0;
+                    break;
+                case '\n':
+                    eolBuffer.addEndOfLine(false, true);
+                    if (lastByte != '\r') {
+                        info.lengthLines++;
+                    }
+                    info.bufferedBytes = 0;
+                    break;
+                default:
+                    if (eolBuffer.length() > 0) {
+                        info.lengthBytes += eolBuffer.length();
+                        lastEolBufferLength = eolBuffer.length();
+                        eolBuffer.clear();
+                    }
+                    info.lengthBytes++;
+                    info.bufferedBytes++;
+                    break;
             }
+            lastByte = nextByte;
+        }
+        // if current line exceeds size and not keeping eol characters, remove 
previously applied eol characters
+        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && 
!keepAllNewLines) {
+            info.lengthBytes -= lastEolBufferLength;
+        }
+        if (keepAllNewLines) {
+            info.lengthBytes += eolBuffer.length();
+        }
+        return info;
+    }
 
-            info.lengthBytes += bytesTillNext;
+    private int countHeaderLines(final ByteCountingInputStream in,
+                                 final String headerMarker) throws IOException 
{
+        int headerInfo = 0;
 
-            if (eolMarker.isStreamEnded()) {
-                info.endOfStream = true;
-                break;
+        final BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
+        in.mark(Integer.MAX_VALUE);
+        String line = br.readLine();
+        while (line != null) {
+            // if line is not a header line, reset stream and return header 
counts
+            if (!line.startsWith(headerMarker)) {
+                in.reset();
+                return headerInfo;
+            } else {
+                headerInfo++;
             }
+            line = br.readLine();
         }
-
-        return info;
+        in.reset();
+        return headerInfo;
     }
 
     @Override
@@ -290,8 +405,12 @@ public class SplitText extends AbstractProcessor {
 
         final ComponentLog logger = getLogger();
         final int headerCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
-        final int splitCount = 
context.getProperty(LINE_SPLIT_COUNT).asInteger();
-        final boolean removeTrailingNewlines = 
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
+        final int maxLineCount = 
(context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
+                ? Integer.MAX_VALUE : 
context.getProperty(LINE_SPLIT_COUNT).asInteger();
+        final long maxFragmentSize = 
context.getProperty(FRAGMENT_MAX_SIZE).isSet()
+                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
+        final String headerMarker = 
context.getProperty(HEADER_MARKER).getValue();
+        final boolean includeLineDelimiter = 
!context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
 
         final ObjectHolder<String> errorMessage = new ObjectHolder<>(null);
         final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
@@ -304,59 +423,82 @@ public class SplitText extends AbstractProcessor {
                 try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn);
                         final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
 
+                    long bufferedPartialLine = 0;
+
                     // if we have header lines, copy them into a 
ByteArrayOutputStream
                     final ByteArrayOutputStream headerStream = new 
ByteArrayOutputStream();
-
-                    final int headerLinesCopied = readLines(in, headerCount, 
headerStream, true, null);
-                    if (headerLinesCopied < headerCount) {
-                        errorMessage.set("Header Line Count is set to " + 
headerCount + " but file had only " + headerLinesCopied + " lines");
-                        return;
-                    }
-
-                    // Break header apart into trailing newlines and
-                    final byte[] headerBytes = headerStream.toByteArray();
-                    int headerNewLineByteCount = 0;
-                    for (int i = headerBytes.length - 1; i >= 0; i--) {
-                        final byte headerByte = headerBytes[i];
-
-                        if (headerByte == '\r' || headerByte == '\n') {
-                            headerNewLineByteCount++;
-                        } else {
-                            break;
+                    // Determine the number of lines of header, priority given 
to HEADER_LINE_COUNT property
+                    int headerInfoLineCount = 0;
+                    if (headerCount > 0) {
+                        headerInfoLineCount = headerCount;
+                    } else {
+                        if (headerMarker != null) {
+                            headerInfoLineCount = countHeaderLines(in, 
headerMarker);
                         }
                     }
 
                     final byte[] headerNewLineBytes;
                     final byte[] headerBytesWithoutTrailingNewLines;
-                    if (headerNewLineByteCount == 0) {
-                        headerNewLineBytes = null;
-                        headerBytesWithoutTrailingNewLines = headerBytes;
-                    } else {
-                        headerNewLineBytes = new byte[headerNewLineByteCount];
-                        System.arraycopy(headerBytes, headerBytes.length - 
headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
+                    if (headerInfoLineCount > 0) {
+                        final int headerLinesCopied = readLines(in, 
headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
+
+                        if (headerLinesCopied < headerInfoLineCount) {
+                            errorMessage.set("Header Line Count is set to " + 
headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
+                            return;
+                        }
 
-                        headerBytesWithoutTrailingNewLines = new 
byte[headerBytes.length - headerNewLineByteCount];
-                        System.arraycopy(headerBytes, 0, 
headerBytesWithoutTrailingNewLines, 0, headerBytes.length - 
headerNewLineByteCount);
+                        // Break header apart into trailing newlines and 
remaining text
+                        final byte[] headerBytes = headerStream.toByteArray();
+                        int headerNewLineByteCount = 0;
+                        for (int i = headerBytes.length - 1; i >= 0; i--) {
+                            final byte headerByte = headerBytes[i];
+
+                            if (headerByte == '\r' || headerByte == '\n') {
+                                headerNewLineByteCount++;
+                            } else {
+                                break;
+                            }
+                        }
+
+                        if (headerNewLineByteCount == 0) {
+                            headerNewLineBytes = null;
+                            headerBytesWithoutTrailingNewLines = headerBytes;
+                        } else {
+                            headerNewLineBytes = new 
byte[headerNewLineByteCount];
+                            System.arraycopy(headerBytes, headerBytes.length - 
headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
+
+                            headerBytesWithoutTrailingNewLines = new 
byte[headerBytes.length - headerNewLineByteCount];
+                            System.arraycopy(headerBytes, 0, 
headerBytesWithoutTrailingNewLines, 0, headerBytes.length - 
headerNewLineByteCount);
+                        }
+                    } else {
+                        headerBytesWithoutTrailingNewLines = null;
+                        headerNewLineBytes = null;
                     }
 
                     while (true) {
-                        if (headerCount > 0) {
+                        if (headerInfoLineCount > 0) {
                             // if we have header lines, create a new FlowFile, 
copy the header lines to that file,
                             // and then start copying lines
                             final IntegerHolder linesCopied = new 
IntegerHolder(0);
+                            final LongHolder bytesCopied = new LongHolder(0L);
                             FlowFile splitFile = session.create(flowFile);
                             try {
                                 splitFile = session.write(splitFile, new 
OutputStreamCallback() {
                                     @Override
                                     public void process(final OutputStream 
rawOut) throws IOException {
-                                        try (final BufferedOutputStream out = 
new BufferedOutputStream(rawOut)) {
-                                            
out.write(headerBytesWithoutTrailingNewLines);
-                                            linesCopied.set(readLines(in, 
splitCount, out, !removeTrailingNewlines, headerNewLineBytes));
+                                        try (final BufferedOutputStream out = 
new BufferedOutputStream(rawOut);
+                                                final ByteCountingOutputStream 
countingOut = new ByteCountingOutputStream(out)) {
+                                            
countingOut.write(headerBytesWithoutTrailingNewLines);
+                                            //readLines has an offset of 
countingOut.getBytesWritten() to allow for header bytes written already
+                                            linesCopied.set(readLines(in, 
maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
+                                                    includeLineDelimiter, 
headerNewLineBytes));
+                                            
bytesCopied.set(countingOut.getBytesWritten());
                                         }
                                     }
                                 });
                                 splitFile = session.putAttribute(splitFile, 
SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
-                                logger.debug("Created Split File {} with {} 
lines", new Object[]{splitFile, linesCopied.get()});
+                                splitFile = session.putAttribute(splitFile, 
FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
+                                logger.debug("Created Split File {} with {} 
lines, {} bytes", new Object[]{splitFile, linesCopied.get(), 
bytesCopied.get()});
                             } finally {
                                 if (linesCopied.get() > 0) {
                                     splits.add(splitFile);
@@ -367,27 +509,44 @@ public class SplitText extends AbstractProcessor {
                                 }
                             }
 
-                            // If we copied fewer lines than what we want, 
then we're done copying data (we've hit EOF).
-                            if (linesCopied.get() < splitCount) {
+                            // Check for EOF
+                            in.mark(1);
+                            if (in.read() == -1) {
                                 break;
                             }
+                            in.reset();
+
                         } else {
                             // We have no header lines, so we can simply 
demarcate the original File via the
                             // ProcessSession#clone method.
-                            long beforeReadingLines = in.getBytesConsumed();
-                            final SplitInfo info = locateSplitPoint(in, 
splitCount, !removeTrailingNewlines);
-                            if (info.lengthBytes > 0) {
-                                info.offsetBytes = beforeReadingLines;
-                                splitInfos.add(info);
-                                final long procNanos = System.nanoTime() - 
startNanos;
-                                final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
-                                logger.debug("Detected start of Split File in 
{} at byte offset {} with a length of {} bytes; "
-                                        + "total splits = {}; total processing 
time = {} ms",
-                                        new Object[]{flowFile, 
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
+                            long beforeReadingLines = in.getBytesConsumed() - 
bufferedPartialLine;
+                            final SplitInfo info = locateSplitPoint(in, 
maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
+                            if 
(context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
+                                bufferedPartialLine = info.bufferedBytes;
                             }
-
                             if (info.endOfStream) {
+                                // stream is out of data
+                                if (info.lengthBytes > 0) {
+                                    info.offsetBytes = beforeReadingLines;
+                                    splitInfos.add(info);
+                                    final long procNanos = System.nanoTime() - 
startNanos;
+                                    final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
+                                    logger.debug("Detected start of Split File 
in {} at byte offset {} with a length of {} bytes; "
+                                                    + "total splits = {}; 
total processing time = {} ms",
+                                            new Object[]{flowFile, 
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
+                                }
                                 break;
+                            } else {
+                                if (info.lengthBytes != 0) {
+                                    info.offsetBytes = beforeReadingLines;
+                                    info.lengthBytes -= bufferedPartialLine;
+                                    splitInfos.add(info);
+                                    final long procNanos = System.nanoTime() - 
startNanos;
+                                    final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
+                                    logger.debug("Detected start of Split File 
in {} at byte offset {} with a length of {} bytes; "
+                                                    + "total splits = {}; 
total processing time = {} ms",
+                                            new Object[]{flowFile, 
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
+                                }
                             }
                         }
                     }
@@ -398,7 +557,7 @@ public class SplitText extends AbstractProcessor {
         if (errorMessage.get() != null) {
             logger.error("Unable to split {} due to {}; routing to failure", 
new Object[]{flowFile, errorMessage.get()});
             session.transfer(flowFile, REL_FAILURE);
-            if (splits != null && !splits.isEmpty()) {
+            if (!splits.isEmpty()) {
                 session.remove(splits);
             }
             return;
@@ -409,6 +568,7 @@ public class SplitText extends AbstractProcessor {
             for (final SplitInfo info : splitInfos) {
                 FlowFile split = session.clone(flowFile, info.offsetBytes, 
info.lengthBytes);
                 split = session.putAttribute(split, SPLIT_LINE_COUNT, 
String.valueOf(info.lengthLines));
+                split = session.putAttribute(split, FRAGMENT_SIZE, 
String.valueOf(info.lengthBytes));
                 splits.add(split);
             }
         }
@@ -443,26 +603,20 @@ public class SplitText extends AbstractProcessor {
     }
 
     private static class SplitInfo {
+
         public long offsetBytes;
         public long lengthBytes;
         public long lengthLines;
+        public long bufferedBytes;
         public boolean endOfStream;
 
         public SplitInfo() {
-            super();
             this.offsetBytes = 0L;
             this.lengthBytes = 0L;
             this.lengthLines = 0L;
+            this.bufferedBytes = 0L;
             this.endOfStream = false;
         }
-
-        @SuppressWarnings("unused")
-        public SplitInfo(long offsetBytes, long lengthBytes, long lengthLines) 
{
-            super();
-            this.offsetBytes = offsetBytes;
-            this.lengthBytes = lengthBytes;
-            this.lengthLines = lengthLines;
-        }
     }
 
     public static class EndOfLineBuffer {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac44a0a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
index f8b75e1..c7ac4f6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
@@ -39,6 +39,303 @@ public class TestSplitText {
             + 
"\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nLastLine\n";
 
     @Test
+    public void testLastLineExceedsSizeLimit() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
+        runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "20 B");
+
+        runner.enqueue("Line #1\nLine #2\nLine #3\nLong line exceeding limit");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 3);
+    }
+
+    @Test
+    public void testIncompleteHeader() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
+        runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
+
+        runner.enqueue("Header Line #1");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 1);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 0);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+    }
+
+    @Test
+    public void testSingleCharacterHeaderMarker() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "7");
+        runner.setProperty(SplitText.HEADER_MARKER, "H");
+        runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
+
+        runner.enqueue(file);
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 2);
+
+        final List<MockFlowFile> splits = 
runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "7");
+        splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "86");
+        splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "3");
+        splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "54");
+        final String fragmentUUID = 
splits.get(0).getAttribute("fragment.identifier");
+        for (int i = 0; i < splits.size(); i++) {
+            final MockFlowFile split = splits.get(i);
+            split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, 
String.valueOf(i+1));
+            split.assertAttributeEquals(SplitText.FRAGMENT_ID, fragmentUUID);
+            split.assertAttributeEquals(SplitText.FRAGMENT_COUNT, 
String.valueOf(splits.size()));
+            split.assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, 
file.getFileName().toString());
+        }
+    }
+
+    @Test
+    public void testMultipleHeaderIndicators() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
+        runner.setProperty(SplitText.HEADER_MARKER, "Head");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5");
+        runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
+
+        runner.enqueue(file);
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 3);
+
+        final List<MockFlowFile> splits = 
runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
+        splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "62");
+        splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
+        splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "55");
+        splits.get(2).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1");
+        splits.get(2).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "23");
+        final String fragmentUUID = 
splits.get(0).getAttribute("fragment.identifier");
+        for (int i = 0; i < splits.size(); i++) {
+            final MockFlowFile split = splits.get(i);
+            split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, 
String.valueOf(i + 1));
+            split.assertAttributeEquals(SplitText.FRAGMENT_ID, fragmentUUID);
+            split.assertAttributeEquals(SplitText.FRAGMENT_COUNT, 
String.valueOf(splits.size()));
+            split.assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, 
file.getFileName().toString());
+        }
+    }
+
+    @Test
+    public void testZeroLinesNoMaxSize() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "0");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testMultipleSplitDirectives() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
+        runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
+        runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
+
+        runner.enqueue("Header Line #1\nHeader Line #2\nLine #1\nLine #2\n"
+                + "Line #3 This line has additional text added so that it 
exceeds the maximum fragment size\n"
+                + "Line #4\nLine #5\nLine #6\nLine #7\nLine #8\nLine #9\nLine 
#10\n");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 6);
+
+        final List<MockFlowFile> splits = 
runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        assertEquals(46, splits.get(0).getSize());
+        assertEquals(119, splits.get(1).getSize());
+        assertEquals(46, splits.get(2).getSize());
+        assertEquals(46, splits.get(3).getSize());
+        assertEquals(46, splits.get(4).getSize());
+        assertEquals(39, splits.get(5).getSize());
+    }
+
+    @Test
+    public void testFlowFileIsOnlyHeader() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
+        runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
+        runner.setProperty(SplitText.HEADER_MARKER, "Head");
+
+        runner.enqueue("Header Line #1\nHeaderLine#2\n");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+
+        // repeat with header cou8nt versus header marker
+        runner.clearTransferState();
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
+        runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
+
+        runner.enqueue("Header Line #1\nHeaderLine #2\n");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+
+        // repeat single header line with no newline characters
+        runner.clearTransferState();
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
+        runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
+
+        runner.enqueue("Header Line #1");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+    }
+
+    @Test
+    public void testMaxSizeExceeded() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "0");
+        runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "71 B");
+        runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
+
+        runner.enqueue(file);
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 2);
+
+        List<MockFlowFile> splits = 
runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        String fragmentUUID = 
splits.get(0).getAttribute("fragment.identifier");
+        for (int i = 0; i < splits.size(); i++) {
+            final MockFlowFile split = splits.get(i);
+            split.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
+            split.assertAttributeEquals(SplitText.FRAGMENT_SIZE, "70");
+            split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, 
String.valueOf(i + 1));
+            split.assertAttributeEquals(SplitText.FRAGMENT_ID, fragmentUUID);
+            split.assertAttributeEquals(SplitText.FRAGMENT_COUNT, 
String.valueOf(splits.size()));
+            split.assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, 
file.getFileName().toString());
+        }
+
+        // Repeat test without header
+        runner.clearTransferState();
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "0");
+        runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "71 B");
+        runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
+
+        runner.enqueue(file);
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 2);
+
+        splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
+        splits.get(0).assertContentEquals("Header Line #1\nHeader Line 
#2\nLine #1\nLine #2\nLine #3\nLine #4\nLine #5\n");
+        splits.get(1).assertContentEquals("Line #6\nLine #7\nLine #8\nLine 
#9\nLine #10");
+        splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "7");
+        splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "70");
+        splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1");
+        splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_ID, 
fragmentUUID);
+        splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
+        
splits.get(0).assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, 
file.getFileName().toString());
+        splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
+        splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "40");
+        splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_INDEX, "2");
+        splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_ID, 
fragmentUUID);
+        splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
+        
splits.get(1).assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, 
file.getFileName().toString());
+    }
+
+    @Test
+    public void testSplitWithOnlyCarriageReturn() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
+
+        runner.enqueue("H1\rH2\r1\r2\r3\r\r\r\r\r\r\r10\r11\r12\r");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 4);
+
+        final List<MockFlowFile> splits = 
runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        splits.get(0).assertContentEquals("H1\rH2\r1\r2\r3");
+        splits.get(1).assertContentEquals("H1\rH2");
+        splits.get(2).assertContentEquals("H1\rH2");
+        splits.get(3).assertContentEquals("H1\rH2\r10\r11\r12");
+
+        runner.clearTransferState();
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
+
+        runner.enqueue("1\r2\r3\r\r\r\r\r\r\r10\r11\r12\r");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 2);
+
+        final List<MockFlowFile> splitsWithNoHeader = 
runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        splitsWithNoHeader.get(0).assertContentEquals("1\r2\r3");
+        splitsWithNoHeader.get(1).assertContentEquals("10\r11\r12");
+
+    }
+
+    @Test
+    public void testSplitWithCarriageReturnAndNewLines() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitText());
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
+
+        
runner.enqueue("H1\r\nH2\r\n1\r\n2\r\n3\r\n\r\n\r\n\r\n\r\n\r\n\r\n10\r\n11\r\n12\r\n");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 4);
+
+        final List<MockFlowFile> splits 
=runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        splits.get(0).assertContentEquals("H1\r\nH2\r\n1\r\n2\r\n3");
+        splits.get(1).assertContentEquals("H1\r\nH2");
+        splits.get(2).assertContentEquals("H1\r\nH2");
+        splits.get(3).assertContentEquals("H1\r\nH2\r\n10\r\n11\r\n12");
+
+        runner.clearTransferState();
+        runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
+        runner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
+
+        
runner.enqueue("1\r\n2\r\n3\r\n\r\n\r\n\r\n\r\n\r\n\r\n10\r\n11\r\n12\r\n");
+        runner.run();
+
+        runner.assertTransferCount(SplitText.REL_FAILURE, 0);
+        runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 2);
+
+        final List<MockFlowFile> splitsWithNoHeader 
=runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        splitsWithNoHeader.get(0).assertContentEquals("1\r\n2\r\n3");
+        splitsWithNoHeader.get(1).assertContentEquals("10\r\n11\r\n12");
+    }
+
+    @Test
     public void testRoutesToFailureIfHeaderLinesNotAllPresent() throws 
IOException {
         final TestRunner runner = TestRunners.newTestRunner(new SplitText());
         runner.setProperty(SplitText.HEADER_LINE_COUNT, "100");

Reply via email to