[ 
https://issues.apache.org/jira/browse/NIFI-1118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197273#comment-15197273
 ] 

ASF GitHub Bot commented on NIFI-1118:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/280#discussion_r56329206
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -143,72 +199,82 @@ protected void init(final 
ProcessorInitializationContext context) {
             return properties;
         }
     
    -    private int readLines(final InputStream in, final int maxNumLines, 
final OutputStream out, final boolean keepAllNewLines) throws IOException {
    +    private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out) throws IOException {
             int numLines = 0;
    +        long totalBytes = 0L;
             for (int i = 0; i < maxNumLines; i++) {
    -            final long bytes = countBytesToSplitPoint(in, out, 
keepAllNewLines || (i != maxNumLines - 1));
    +            final long bytes = countBytesToSplitPoint(in, out, totalBytes, 
maxByteCount);
    +            totalBytes += bytes;
                 if (bytes <= 0) {
                     return numLines;
                 }
    -
                 numLines++;
    +            if (totalBytes >= maxByteCount && numLines > maxNumLines) {
    +                break;
    +            }
             }
    -
             return numLines;
         }
     
    -    private long countBytesToSplitPoint(final InputStream in, final 
OutputStream out, final boolean includeLineDelimiter) throws IOException {
    -        int lastByte = -1;
    +    private long countBytesToSplitPoint(final InputStream in, final 
OutputStream out, final long bytesReadSoFar, final long maxSize) throws 
IOException {
             long bytesRead = 0L;
    +        final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
     
    +        in.mark(Integer.MAX_VALUE);
             while (true) {
    -            in.mark(1);
                 final int nextByte = in.read();
     
    -            // if we hit end of stream or new line we're done
    +            // if we hit end of stream we're done
                 if (nextByte == -1) {
    -                if (lastByte == '\r') {
    -                    return includeLineDelimiter ? bytesRead : bytesRead - 
1;
    -                } else {
    -                    return bytesRead;
    +                if (out != null) {
    +                    buffer.writeTo(out);
                     }
    +                buffer.close();
    +                return bytesRead;
                 }
     
    -            // if there's an OutputStream to copy the data to, copy it, if 
appropriate.
    -            // "if appropriate" means that it's not a line delimiter or 
that we want to copy line delimiters
    +            // buffer the output
                 bytesRead++;
    -            if (out != null && (includeLineDelimiter || (nextByte != '\n' 
&& nextByte != '\r'))) {
    -                out.write(nextByte);
    +            buffer.write(nextByte);
    +
    +            // check the size limit
    +            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 
0) {
    +                in.reset();
    +                buffer.close();
    +                return -1;
                 }
     
                 // if we have a new line, then we're done
                 if (nextByte == '\n') {
    -                if (includeLineDelimiter) {
    -                    return bytesRead;
    -                } else {
    -                    return (lastByte == '\r') ? bytesRead - 2 : bytesRead 
- 1;
    +                if (out != null) {
    +                    buffer.writeTo(out);
                     }
    +                buffer.close();
    +                return bytesRead;
                 }
     
    -            // we didn't get a new line but if last byte was carriage 
return we've reached a new-line.
    -            // so we roll back the last byte that we read and return
    -            if (lastByte == '\r') {
    -                in.reset();
    -                bytesRead--;    // we reset the stream by 1 byte so 
decrement the number of bytes read by 1
    -                return includeLineDelimiter ? bytesRead : bytesRead - 1;
    +            // Determine if \n follows \r; for both cases, end of line has 
been reached
    +            if (nextByte == '\r') {
    +                buffer.writeTo(out);
    --- End diff --
    
    This will throw a NullPointerException if out == null


> Enable SplitText processor to limit line length and filter header lines
> -----------------------------------------------------------------------
>
>                 Key: NIFI-1118
>                 URL: https://issues.apache.org/jira/browse/NIFI-1118
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Mark Bean
>            Assignee: Joe Skora
>             Fix For: 0.6.0
>
>
> Include the following functionality to the SplitText processor:
> 1) Maximum size limit of the split file(s)
> A new split file will be created if the next line to be added to the current 
> split file exceeds a user-defined maximum file size
> 2) Header line marker
> User-defined character(s) can be used to identify the header line(s) of the 
> data file rather than a predetermined number of lines
> These changes are additions, not a replacement of any property or behavior. 
> In the case of header line marker, the existing property "Header Line Count" 
> must be zero for the new property and behavior to be used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to