[ 
https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572531#comment-15572531
 ] 

ASF GitHub Bot commented on NIFI-2851:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83256735
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count 
is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + 
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && 
bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 
0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, 
bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has 
been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, 
false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, 
false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int 
numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws 
IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == 
numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || 
info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && 
nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, 
remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && 
!keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws 
IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return 
header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = 
context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? 
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at 
once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = 
(context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : 
context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = 
context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
    -        final String headerMarker = 
context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = 
!context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new 
AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws 
IOException {
    -                try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a 
ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new 
ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority 
given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, 
headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new 
AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new 
TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if 
available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = 
SplitText.this.computeHeader(demarcator, startOffset, 
SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < 
SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + 
flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + 
" but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = 
SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, 
SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to 
failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, 
headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to 
" + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the 
individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and 
remaining text
    -                        final byte[] headerBytes = 
headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = 
SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, 
splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = 
headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new 
byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 
headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, 
headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new 
byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, 
headerBytesWithoutTrailingNewLines, 0, headerBytes.length - 
headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + 
(stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new 
FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new 
AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new 
AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new 
OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream 
rawOut) throws IOException {
    -                                        try (final BufferedOutputStream 
out = new BufferedOutputStream(rawOut);
    -                                                final 
ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            
countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of 
countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, 
maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, 
headerNewLineBytes));
    -                                            
bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = 
session.putAttribute(splitFile, SPLIT_LINE_COUNT, 
String.valueOf(linesCopied.get()));
    -                                splitFile = 
session.putAttribute(splitFile, FRAGMENT_SIZE, 
String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with 
{} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), 
bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    --- End diff --
    
    We should fail-fast here - if error.get() then transfer to failure and 
return, rather than indenting the next 50 lines or so within a new block


> Improve performance of SplitText
> --------------------------------
>
>                 Key: NIFI-2851
>                 URL: https://issues.apache.org/jira/browse/NIFI-2851
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Oleg Zhurakousky
>             Fix For: 1.1.0
>
>
> SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a 
> 1.4 million line text file into 5k line chunks and then splits those 5k line 
> chunks into 1 line chunks is only capable of pushing through about 10k lines 
> per second. This equates to about 10 MB/sec. JVisualVM shows that the 
> majority of the time is spent in the locateSplitPoint() method. Isolating 
> this code and inspecting how it works, and using some micro-benchmarking, it 
> appears that if we refactor the calls to InputStream.read() to instead read 
> into a byte array, we can improve performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to