Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83255847
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count 
is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + 
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && 
bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 
0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, 
bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has 
been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, 
false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, 
false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int 
numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws 
IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == 
numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || 
info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && 
nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, 
remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && 
!keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws 
IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return 
header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = 
context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? 
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at 
once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = 
(context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : 
context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = 
context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
    -        final String headerMarker = 
context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = 
!context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new 
AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws 
IOException {
    -                try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a 
ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new 
ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority 
given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, 
headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    --- End diff --
    
    We should fail-fast here and follow the existing convention of using `if 
(flowFile == null) { return; }` rather than indenting the entire method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to