[ 
https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572528#comment-15572528
 ] 

ASF GitHub Bot commented on NIFI-2851:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83260752
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count 
is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + 
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && 
bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 
0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, 
bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has 
been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, 
false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, 
false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int 
numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws 
IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == 
numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || 
info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && 
nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, 
remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && 
!keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws 
IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return 
header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = 
context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? 
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at 
once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = 
(context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : 
context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = 
context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
    -        final String headerMarker = 
context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = 
!context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new 
AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws 
IOException {
    -                try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a 
ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new 
ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority 
given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, 
headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new 
AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new 
TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if 
available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = 
SplitText.this.computeHeader(demarcator, startOffset, 
SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < 
SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + 
flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + 
" but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = 
SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, 
SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to 
failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, 
headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to 
" + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the 
individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and 
remaining text
    -                        final byte[] headerBytes = 
headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = 
SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, 
splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = 
headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new 
byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 
headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, 
headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new 
byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, 
headerBytesWithoutTrailingNewLines, 0, headerBytes.length - 
headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + 
(stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new 
FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new 
AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new 
AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new 
OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream 
rawOut) throws IOException {
    -                                        try (final BufferedOutputStream 
out = new BufferedOutputStream(rawOut);
    -                                                final 
ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            
countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of 
countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, 
maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, 
headerNewLineBytes));
    -                                            
bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = 
session.putAttribute(splitFile, SPLIT_LINE_COUNT, 
String.valueOf(linesCopied.get()));
    -                                splitFile = 
session.putAttribute(splitFile, FRAGMENT_SIZE, 
String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with 
{} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), 
bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, 
headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = 
headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the 
existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... 
scheme
    +                String fragmentId = UUID.randomUUID().toString();
    +
    +                if (computedSplitsInfo.size() == 0) {
    +                    FlowFile splitFlowFile = session.clone(flowFile, 0, 
headerFlowFile.getSize() - headerCrlfLength);
    +                    splitFlowFile = 
SplitText.this.updateAttributes(session, splitFlowFile, 0, 
splitFlowFile.getSize(),
    +                            fragmentId, fragmentIndex++, 0, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                    splitFlowFiles.add(splitFlowFile);
    +                } else {
    +                    for (SplitInfo computedSplitInfo : computedSplitsInfo) 
{
    +                        long length = 
SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : 
computedSplitInfo.length;
    +                        boolean proceedWithClone = headerFlowFile != null 
|| length > 0;
    +                        if (proceedWithClone) {
    +                            FlowFile splitFlowFile = null;
    +                            if (headerFlowFile != null) {
    +                                if (length > 0) {
    +                                    splitFlowFile = 
session.clone(flowFile, computedSplitInfo.startOffset, length);
    +                                    splitFlowFile = session.merge( 
Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                                     } else {
    -                                    // if the number of content lines is a 
multiple of the SPLIT_LINE_COUNT,
    -                                    // the last flow file will contain 
just a header; don't forward that one
    -                                    session.remove(splitFile);
    +                                    splitFlowFile = 
session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // 
trim the last CRLF if split consists of only HEADER
                                     }
    -                            }
    -
    -                            // Check for EOF
    -                            in.mark(1);
    -                            if (in.read() == -1) {
    -                                break;
    -                            }
    -                            in.reset();
    -
    -                        } else {
    -                            // We have no header lines, so we can simply 
demarcate the original File via the
    -                            // ProcessSession#clone method.
    -                            long beforeReadingLines = 
in.getBytesConsumed() - bufferedPartialLine;
    -                            final SplitInfo info = locateSplitPoint(in, 
maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
    -                            if 
(context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    -                                bufferedPartialLine = info.bufferedBytes;
    -                            }
    -                            if (info.endOfStream) {
    -                                // stream is out of data
    -                                if (info.lengthBytes > 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = 
System.nanoTime() - startNanos;
    -                                    final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split 
File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; 
total processing time = {} ms",
    -                                            new Object[]{flowFile, 
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    -                                break;
                                 } else {
    -                                if (info.lengthBytes != 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    info.lengthBytes -= 
bufferedPartialLine;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = 
System.nanoTime() - startNanos;
    -                                    final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split 
File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; 
total processing time = {} ms",
    -                                            new Object[]{flowFile, 
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    +                                splitFlowFile = session.clone(flowFile, 
computedSplitInfo.startOffset, length);
                                 }
    +
    +                            splitFlowFile = 
SplitText.this.updateAttributes(session, splitFlowFile, 
computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, 
fragmentIndex++,
    +                                    computedSplitsInfo.size(), 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                            splitFlowFiles.add(splitFlowFile);
                             }
                         }
                     }
    -            }
    -        });
     
    -        if (errorMessage.get() != null) {
    -            logger.error("Unable to split {} due to {}; routing to 
failure", new Object[]{flowFile, errorMessage.get()});
    -            session.transfer(flowFile, REL_FAILURE);
    -            if (!splits.isEmpty()) {
    -                session.remove(splits);
    +                getLogger().info("Split " + flowFile + " into " + 
splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing 
headers." : "."));
    +                if (headerFlowFile != null) {
    +                    session.remove(headerFlowFile);
    +                }
                 }
    -            return;
    -        }
     
    -        if (!splitInfos.isEmpty()) {
    -            // Create the splits
    -            for (final SplitInfo info : splitInfos) {
    -                FlowFile split = session.clone(flowFile, info.offsetBytes, 
info.lengthBytes);
    -                split = session.putAttribute(split, SPLIT_LINE_COUNT, 
String.valueOf(info.lengthLines));
    -                split = session.putAttribute(split, FRAGMENT_SIZE, 
String.valueOf(info.lengthBytes));
    -                splits.add(split);
    +            if (error.get()) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(splitFlowFiles, REL_SPLITS);
                 }
    -        }
    -        finishFragmentAttributes(session, flowFile, splits);
    -
    -        if (splits.size() > 10) {
    -            logger.info("Split {} into {} files", new Object[]{flowFile, 
splits.size()});
             } else {
    -            logger.info("Split {} into {} files: {}", new 
Object[]{flowFile, splits.size(), splits});
    +            context.yield();
             }
    -
    -        session.transfer(flowFile, REL_ORIGINAL);
    -        session.transfer(splits, REL_SPLITS);
         }
     
    -    private void finishFragmentAttributes(final ProcessSession session, 
final FlowFile source, final List<FlowFile> splits) {
    -        final String originalFilename = 
source.getAttribute(CoreAttributes.FILENAME.key());
    -
    -        final String fragmentId = UUID.randomUUID().toString();
    -        final ArrayList<FlowFile> newList = new ArrayList<>(splits);
    -        splits.clear();
    -        for (int i = 1; i <= newList.size(); i++) {
    -            FlowFile ff = newList.get(i - 1);
    -            final Map<String, String> attributes = new HashMap<>();
    -            attributes.put(FRAGMENT_ID, fragmentId);
    -            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
    -            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
    -            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
    -            FlowFile newFF = session.putAllAttributes(ff, attributes);
    -            splits.add(newFF);
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        List<ValidationResult> results = new ArrayList<>();
    +        boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    +                && 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    +        results.add(new ValidationResult.Builder().subject("Maximum 
Fragment Size").valid(!invalidState)
    +                .explanation("Property must be specified when Line Split 
Count is 0").build());
    +        return results;
         }
     
    -    private static class SplitInfo {
    -
    -        public long offsetBytes;
    -        public long lengthBytes;
    -        public long lengthLines;
    -        public long bufferedBytes;
    -        public boolean endOfStream;
    -
    -        public SplitInfo() {
    -            this.offsetBytes = 0L;
    -            this.lengthBytes = 0L;
    -            this.lengthLines = 0L;
    -            this.bufferedBytes = 0L;
    -            this.endOfStream = false;
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return Collections.unmodifiableList(properties);
    --- End diff --
    
    Since we already have a List<PropertyDescriptor> as a member variable, why 
not just make the member variable an unmodifiable list and return that instead 
of creating a new one each time?


> Improve performance of SplitText
> --------------------------------
>
>                 Key: NIFI-2851
>                 URL: https://issues.apache.org/jira/browse/NIFI-2851
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Oleg Zhurakousky
>             Fix For: 1.1.0
>
>
> SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a 
> 1.4 million line text file into 5k line chunks and then splits those 5k line 
> chunks into 1 line chunks is only capable of pushing through about 10k lines 
> per second. This equates to about 10 MB/sec. JVisualVM shows that the 
> majority of the time is spent in the locateSplitPoint() method. Isolating 
> this code and inspecting how it works, and using some micro-benchmarking, it 
> appears that if we refactor the calls to InputStream.read() to instead read 
> into a byte array, we can improve performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to