[ 
https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572536#comment-15572536
 ] 

ASF GitHub Bot commented on NIFI-2851:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83260605
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count 
is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + 
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && 
bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 
0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, 
bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has 
been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, 
false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, 
false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int 
numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws 
IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == 
numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || 
info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && 
nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, 
remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && 
!keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws 
IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return 
header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    --- End diff --
    
    Should remove blank javadoc lines


> Improve performance of SplitText
> --------------------------------
>
>                 Key: NIFI-2851
>                 URL: https://issues.apache.org/jira/browse/NIFI-2851
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Oleg Zhurakousky
>             Fix For: 1.1.0
>
>
> SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a 
> 1.4 million line text file into 5k line chunks and then splits those 5k line 
> chunks into 1 line chunks is only capable of pushing through about 10k lines 
> per second. This equates to about 10 MB/sec. JVisualVM shows that the 
> majority of the time is spent in the locateSplitPoint() method. Isolating 
> this code and inspecting how it works, and using some micro-benchmarking, it 
> appears that if we refactor the calls to InputStream.read() to instead read 
> into a byte array, we can improve performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to