Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83261038
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the 
original file will be routed to this destination and nothing will be routed 
elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count 
is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + 
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && 
bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 
0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, 
bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has 
been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, 
false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, 
false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int 
numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws 
IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == 
numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || 
info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && 
nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, 
remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && 
!keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws 
IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new 
InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return 
header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = 
context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? 
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at 
once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = 
(context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : 
context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = 
context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : 
Long.MAX_VALUE;
    -        final String headerMarker = 
context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = 
!context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new 
AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws 
IOException {
    -                try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a 
ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new 
ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority 
given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, 
headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new 
AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new 
TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if 
available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = 
SplitText.this.computeHeader(demarcator, startOffset, 
SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < 
SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + 
flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + 
" but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = 
SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, 
SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to 
failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, 
headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to 
" + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the 
individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and 
remaining text
    -                        final byte[] headerBytes = 
headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = 
SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, 
splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = 
headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new 
byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 
headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, 
headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new 
byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, 
headerBytesWithoutTrailingNewLines, 0, headerBytes.length - 
headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + 
(stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new 
FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new 
AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new 
AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new 
OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream 
rawOut) throws IOException {
    -                                        try (final BufferedOutputStream 
out = new BufferedOutputStream(rawOut);
    -                                                final 
ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            
countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of 
countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, 
maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, 
headerNewLineBytes));
    -                                            
bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = 
session.putAttribute(splitFile, SPLIT_LINE_COUNT, 
String.valueOf(linesCopied.get()));
    -                                splitFile = 
session.putAttribute(splitFile, FRAGMENT_SIZE, 
String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with 
{} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), 
bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, 
headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = 
headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the 
existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... 
scheme
    +                String fragmentId = UUID.randomUUID().toString();
    +
    +                if (computedSplitsInfo.size() == 0) {
    +                    FlowFile splitFlowFile = session.clone(flowFile, 0, 
headerFlowFile.getSize() - headerCrlfLength);
    +                    splitFlowFile = 
SplitText.this.updateAttributes(session, splitFlowFile, 0, 
splitFlowFile.getSize(),
    +                            fragmentId, fragmentIndex++, 0, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                    splitFlowFiles.add(splitFlowFile);
    +                } else {
    +                    for (SplitInfo computedSplitInfo : computedSplitsInfo) 
{
    +                        long length = 
SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : 
computedSplitInfo.length;
    +                        boolean proceedWithClone = headerFlowFile != null 
|| length > 0;
    +                        if (proceedWithClone) {
    +                            FlowFile splitFlowFile = null;
    +                            if (headerFlowFile != null) {
    +                                if (length > 0) {
    +                                    splitFlowFile = 
session.clone(flowFile, computedSplitInfo.startOffset, length);
    +                                    splitFlowFile = session.merge( 
Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                                     } else {
    -                                    // if the number of content lines is a 
multiple of the SPLIT_LINE_COUNT,
    -                                    // the last flow file will contain 
just a header; don't forward that one
    -                                    session.remove(splitFile);
    +                                    splitFlowFile = 
session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // 
trim the last CRLF if split consists of only HEADER
                                     }
    -                            }
    -
    -                            // Check for EOF
    -                            in.mark(1);
    -                            if (in.read() == -1) {
    -                                break;
    -                            }
    -                            in.reset();
    -
    -                        } else {
    -                            // We have no header lines, so we can simply 
demarcate the original File via the
    -                            // ProcessSession#clone method.
    -                            long beforeReadingLines = 
in.getBytesConsumed() - bufferedPartialLine;
    -                            final SplitInfo info = locateSplitPoint(in, 
maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
    -                            if 
(context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    -                                bufferedPartialLine = info.bufferedBytes;
    -                            }
    -                            if (info.endOfStream) {
    -                                // stream is out of data
    -                                if (info.lengthBytes > 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = 
System.nanoTime() - startNanos;
    -                                    final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split 
File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; 
total processing time = {} ms",
    -                                            new Object[]{flowFile, 
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    -                                break;
                                 } else {
    -                                if (info.lengthBytes != 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    info.lengthBytes -= 
bufferedPartialLine;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = 
System.nanoTime() - startNanos;
    -                                    final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split 
File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; 
total processing time = {} ms",
    -                                            new Object[]{flowFile, 
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    +                                splitFlowFile = session.clone(flowFile, 
computedSplitInfo.startOffset, length);
                                 }
    +
    +                            splitFlowFile = 
SplitText.this.updateAttributes(session, splitFlowFile, 
computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, 
fragmentIndex++,
    +                                    computedSplitsInfo.size(), 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                            splitFlowFiles.add(splitFlowFile);
                             }
                         }
                     }
    -            }
    -        });
     
    -        if (errorMessage.get() != null) {
    -            logger.error("Unable to split {} due to {}; routing to 
failure", new Object[]{flowFile, errorMessage.get()});
    -            session.transfer(flowFile, REL_FAILURE);
    -            if (!splits.isEmpty()) {
    -                session.remove(splits);
    +                getLogger().info("Split " + flowFile + " into " + 
splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing 
headers." : "."));
    +                if (headerFlowFile != null) {
    +                    session.remove(headerFlowFile);
    +                }
                 }
    -            return;
    -        }
     
    -        if (!splitInfos.isEmpty()) {
    -            // Create the splits
    -            for (final SplitInfo info : splitInfos) {
    -                FlowFile split = session.clone(flowFile, info.offsetBytes, 
info.lengthBytes);
    -                split = session.putAttribute(split, SPLIT_LINE_COUNT, 
String.valueOf(info.lengthLines));
    -                split = session.putAttribute(split, FRAGMENT_SIZE, 
String.valueOf(info.lengthBytes));
    -                splits.add(split);
    +            if (error.get()) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(splitFlowFiles, REL_SPLITS);
                 }
    -        }
    -        finishFragmentAttributes(session, flowFile, splits);
    -
    -        if (splits.size() > 10) {
    -            logger.info("Split {} into {} files", new Object[]{flowFile, 
splits.size()});
             } else {
    -            logger.info("Split {} into {} files: {}", new 
Object[]{flowFile, splits.size(), splits});
    +            context.yield();
             }
    -
    -        session.transfer(flowFile, REL_ORIGINAL);
    -        session.transfer(splits, REL_SPLITS);
         }
     
    -    private void finishFragmentAttributes(final ProcessSession session, 
final FlowFile source, final List<FlowFile> splits) {
    -        final String originalFilename = 
source.getAttribute(CoreAttributes.FILENAME.key());
    -
    -        final String fragmentId = UUID.randomUUID().toString();
    -        final ArrayList<FlowFile> newList = new ArrayList<>(splits);
    -        splits.clear();
    -        for (int i = 1; i <= newList.size(); i++) {
    -            FlowFile ff = newList.get(i - 1);
    -            final Map<String, String> attributes = new HashMap<>();
    -            attributes.put(FRAGMENT_ID, fragmentId);
    -            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
    -            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
    -            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
    -            FlowFile newFF = session.putAllAttributes(ff, attributes);
    -            splits.add(newFF);
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        List<ValidationResult> results = new ArrayList<>();
    +        boolean invalidState = 
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    +                && 
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    +        results.add(new ValidationResult.Builder().subject("Maximum 
Fragment Size").valid(!invalidState)
    +                .explanation("Property must be specified when Line Split 
Count is 0").build());
    +        return results;
         }
     
    -    private static class SplitInfo {
    -
    -        public long offsetBytes;
    -        public long lengthBytes;
    -        public long lengthLines;
    -        public long bufferedBytes;
    -        public boolean endOfStream;
    -
    -        public SplitInfo() {
    -            this.offsetBytes = 0L;
    -            this.lengthBytes = 0L;
    -            this.lengthLines = 0L;
    -            this.bufferedBytes = 0L;
    -            this.endOfStream = false;
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return Collections.unmodifiableList(properties);
         }
     
    -    public static class EndOfLineBuffer {
    -        private static final byte CARRIAGE_RETURN = (byte) '\r';
    -        private static final byte NEWLINE = (byte) '\n';
    -
    -        private final BitSet buffer = new BitSet();
    -        private int index = 0;
    -
    -        public void clear() {
    -            index = 0;
    -        }
    -
    -        public void addEndOfLine(final boolean carriageReturn, final 
boolean newLine) {
    -            buffer.set(index++, carriageReturn);
    -            buffer.set(index++, newLine);
    -        }
    -
    -        private void drainTo(final OutputStream out) throws IOException {
    -            for (int i = 0; i < index; i += 2) {
    -                final boolean cr = buffer.get(i);
    -                final boolean nl = buffer.get(i + 1);
    -
    -                // we've consumed all data in the buffer
    -                if (!cr && !nl) {
    -                    return;
    -                }
    -
    -                if (cr) {
    -                    out.write(CARRIAGE_RETURN);
    +    /**
    +     *
    +     */
    +   private FlowFile updateAttributes(ProcessSession session, FlowFile 
splitFlowFile, long lCount, long fSize, String fId, int fIdx, int fCount, 
String origFname) {
    --- End diff --
    
    should avoid abbreviations of variable names - nifi tends to spell 
everything out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to