Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1116#discussion_r83258027
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
---
@@ -150,548 +145,320 @@
.description("If a file cannot be split for some reason, the
original file will be routed to this destination and nothing will be routed
elsewhere")
.build();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> properties;
+ private static final Set<Relationship> relationships;
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
+ static {
+ properties = new ArrayList<>();
properties.add(LINE_SPLIT_COUNT);
properties.add(FRAGMENT_MAX_SIZE);
properties.add(HEADER_LINE_COUNT);
properties.add(HEADER_MARKER);
properties.add(REMOVE_TRAILING_NEWLINES);
- this.properties = Collections.unmodifiableList(properties);
- final Set<Relationship> relationships = new HashSet<>();
+ relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_SPLITS);
relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
}
- @Override
- protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
- List<ValidationResult> results = new ArrayList<>();
-
- final boolean invalidState =
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
- &&
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
- results.add(new ValidationResult.Builder()
- .subject("Maximum Fragment Size")
- .valid(!invalidState)
- .explanation("Property must be specified when Line Split Count
is 0")
- .build()
- );
- return results;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- private int readLines(final InputStream in, final int maxNumLines,
final long maxByteCount, final OutputStream out,
- final boolean includeLineDelimiter, final byte[]
leadingNewLineBytes) throws IOException {
- final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
- byte[] leadingBytes = leadingNewLineBytes;
- int numLines = 0;
- long totalBytes = 0L;
- for (int i = 0; i < maxNumLines; i++) {
- final EndOfLineMarker eolMarker = countBytesToSplitPoint(in,
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
- final long bytes = eolMarker.getBytesConsumed();
- leadingBytes = eolMarker.getLeadingNewLineBytes();
-
- if (includeLineDelimiter && out != null) {
- if (leadingBytes != null) {
- out.write(leadingBytes);
- leadingBytes = null;
- }
- eolBuffer.drainTo(out);
- }
- totalBytes += bytes;
- if (bytes <= 0) {
- return numLines;
- }
- numLines++;
- if (totalBytes >= maxByteCount) {
- break;
- }
- }
- return numLines;
- }
-
- private EndOfLineMarker countBytesToSplitPoint(final InputStream in,
final OutputStream out, final long bytesReadSoFar, final long maxSize,
- final boolean
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[]
leadingNewLineBytes) throws IOException {
- long bytesRead = 0L;
- final ByteArrayOutputStream buffer;
- if (out != null) {
- buffer = new ByteArrayOutputStream();
- } else {
- buffer = null;
- }
- byte[] bytesToWriteFirst = leadingNewLineBytes;
-
- in.mark(Integer.MAX_VALUE);
- while (true) {
- final int nextByte = in.read();
-
- // if we hit end of stream we're done
- if (nextByte == -1) {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- }
- return new EndOfLineMarker(bytesRead, eolBuffer, true,
bytesToWriteFirst); // bytesToWriteFirst should be "null"?
- }
+ private volatile boolean removeTrailingNewLines;
- // Verify leading bytes do not violate size limitation
- if (bytesToWriteFirst != null && (bytesToWriteFirst.length +
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
- return new EndOfLineMarker(-1, eolBuffer, false,
leadingNewLineBytes);
- }
- // Write leadingNewLines, if appropriate
- if ( buffer != null && includeLineDelimiter &&
bytesToWriteFirst != null) {
- bytesRead += bytesToWriteFirst.length;
- buffer.write(bytesToWriteFirst);
- bytesToWriteFirst = null;
- }
- // buffer the output
- bytesRead++;
- if (buffer != null && nextByte != '\n' && nextByte != '\r') {
- if (bytesToWriteFirst != null) {
- buffer.write(bytesToWriteFirst);
- }
- bytesToWriteFirst = null;
- eolBuffer.drainTo(buffer);
- eolBuffer.clear();
- buffer.write(nextByte);
- }
+ private volatile long maxSplitSize;
- // check the size limit
- if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar >
0) {
- in.reset();
- if (buffer != null) {
- buffer.close();
- }
- return new EndOfLineMarker(-1, eolBuffer, false,
leadingNewLineBytes);
- }
+ private volatile int lineCount;
- // if we have a new line, then we're done
- if (nextByte == '\n') {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- eolBuffer.addEndOfLine(false, true);
- }
- return new EndOfLineMarker(bytesRead, eolBuffer, false,
bytesToWriteFirst);
- }
+ private volatile int headerLineCount;
- // Determine if \n follows \r; in either case, end of line has
been reached
- if (nextByte == '\r') {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- }
- in.mark(1);
- final int lookAheadByte = in.read();
- if (lookAheadByte == '\n') {
- eolBuffer.addEndOfLine(true, true);
- return new EndOfLineMarker(bytesRead + 1, eolBuffer,
false, bytesToWriteFirst);
- } else {
- in.reset();
- eolBuffer.addEndOfLine(true, false);
- return new EndOfLineMarker(bytesRead, eolBuffer,
false, bytesToWriteFirst);
- }
- }
- }
- }
+ private volatile String headerMarker;
- private SplitInfo locateSplitPoint(final InputStream in, final int
numLines, final boolean keepAllNewLines, final long maxSize,
- final long bufferedBytes) throws
IOException {
- final SplitInfo info = new SplitInfo();
- final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
- int lastByte = -1;
- info.lengthBytes = bufferedBytes;
- long lastEolBufferLength = 0L;
-
- while ((info.lengthLines < numLines || (info.lengthLines ==
numLines && lastByte == '\r'))
- && (((info.lengthBytes + eolBuffer.length()) < maxSize) ||
info.lengthLines == 0)
- && eolBuffer.length() < maxSize) {
- in.mark(1);
- final int nextByte = in.read();
- // Check for \n following \r on last line
- if (info.lengthLines == numLines && lastByte == '\r' &&
nextByte != '\n') {
- in.reset();
- break;
- }
- switch (nextByte) {
- case -1:
- info.endOfStream = true;
- if (keepAllNewLines) {
- info.lengthBytes += eolBuffer.length();
- }
- if (lastByte != '\r') {
- info.lengthLines++;
- }
- info.bufferedBytes = 0;
- return info;
- case '\r':
- eolBuffer.addEndOfLine(true, false);
- info.lengthLines++;
- info.bufferedBytes = 0;
- break;
- case '\n':
- eolBuffer.addEndOfLine(false, true);
- if (lastByte != '\r') {
- info.lengthLines++;
- }
- info.bufferedBytes = 0;
- break;
- default:
- if (eolBuffer.length() > 0) {
- info.lengthBytes += eolBuffer.length();
- lastEolBufferLength = eolBuffer.length();
- eolBuffer.clear();
- }
- info.lengthBytes++;
- info.bufferedBytes++;
- break;
- }
- lastByte = nextByte;
- }
- // if current line exceeds size and not keeping eol characters,
remove previously applied eol characters
- if ((info.lengthBytes + eolBuffer.length()) >= maxSize &&
!keepAllNewLines) {
- info.lengthBytes -= lastEolBufferLength;
- }
- if (keepAllNewLines) {
- info.lengthBytes += eolBuffer.length();
- }
- return info;
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.unmodifiableSet(relationships);
}
- private int countHeaderLines(final ByteCountingInputStream in,
- final String headerMarker) throws
IOException {
- int headerInfo = 0;
-
- final BufferedReader br = new BufferedReader(new
InputStreamReader(in));
- in.mark(Integer.MAX_VALUE);
- String line = br.readLine();
- while (line != null) {
- // if line is not a header line, reset stream and return
header counts
- if (!line.startsWith(headerMarker)) {
- in.reset();
- return headerInfo;
- } else {
- headerInfo++;
- }
- line = br.readLine();
- }
- in.reset();
- return headerInfo;
+ /**
+ *
+ */
+ @OnScheduled
+ public void onSchedule(ProcessContext context) {
+ this.removeTrailingNewLines =
context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
+ ?
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
+ this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
+ ?
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() :
Long.MAX_VALUE;
+ this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
+ this.headerLineCount =
context.getProperty(HEADER_LINE_COUNT).asInteger();
+ this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
}
+ /**
+ * Will split the incoming stream releasing all splits as FlowFile at
once.
+ */
@Override
- public void onTrigger(final ProcessContext context, final
ProcessSession session) {
- final FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final ComponentLog logger = getLogger();
- final int headerCount =
context.getProperty(HEADER_LINE_COUNT).asInteger();
- final int maxLineCount =
(context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
- ? Integer.MAX_VALUE :
context.getProperty(LINE_SPLIT_COUNT).asInteger();
- final long maxFragmentSize =
context.getProperty(FRAGMENT_MAX_SIZE).isSet()
- ?
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() :
Long.MAX_VALUE;
- final String headerMarker =
context.getProperty(HEADER_MARKER).getValue();
- final boolean includeLineDelimiter =
!context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
-
- final AtomicReference<String> errorMessage = new
AtomicReference<>(null);
- final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
-
- final long startNanos = System.nanoTime();
- final List<FlowFile> splits = new ArrayList<>();
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws
IOException {
- try (final BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn);
- final ByteCountingInputStream in = new
ByteCountingInputStream(bufferedIn)) {
-
- long bufferedPartialLine = 0;
-
- // if we have header lines, copy them into a
ByteArrayOutputStream
- final ByteArrayOutputStream headerStream = new
ByteArrayOutputStream();
- // Determine the number of lines of header, priority
given to HEADER_LINE_COUNT property
- int headerInfoLineCount = 0;
- if (headerCount > 0) {
- headerInfoLineCount = headerCount;
- } else {
- if (headerMarker != null) {
- headerInfoLineCount = countHeaderLines(in,
headerMarker);
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile != null) {
+ AtomicBoolean error = new AtomicBoolean();
+ List<FlowFile> splitFlowFiles = new ArrayList<>();
+ List<SplitInfo> computedSplitsInfo = new ArrayList<>();
+ AtomicReference<SplitInfo> headerSplitInfoRef = new
AtomicReference<>();
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ TextLineDemarcator demarcator = new
TextLineDemarcator(in);
+ SplitInfo splitInfo = null;
+ long startOffset = 0;
+
+ // Compute fragment representing the header (if
available)
+ long start = System.nanoTime();
+ try {
+ if (SplitText.this.headerLineCount > 0) {
+ splitInfo =
SplitText.this.computeHeader(demarcator, startOffset,
SplitText.this.headerLineCount, null, null);
+ if (splitInfo.lineCount <
SplitText.this.headerLineCount) {
+ error.set(true);
+ getLogger().error("Unable to split " +
flowFile + " due to insufficient amount of header lines. Required "
+ + SplitText.this.headerLineCount +
" but was " + splitInfo.lineCount + ". Routing to failure.");
+ }
+ } else if (SplitText.this.headerMarker != null) {
+ splitInfo =
SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE,
SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
}
+ headerSplitInfoRef.set(splitInfo);
+ } catch (IllegalStateException e) {
+ error.set(true);
+ getLogger().error(e.getMessage() + " Routing to
failure.");
}
- final byte[] headerNewLineBytes;
- final byte[] headerBytesWithoutTrailingNewLines;
- if (headerInfoLineCount > 0) {
- final int headerLinesCopied = readLines(in,
headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
-
- if (headerLinesCopied < headerInfoLineCount) {
- errorMessage.set("Header Line Count is set to
" + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
- return;
+ // Compute and collect fragments representing the
individual splits
+ if (!error.get()) {
+ if (headerSplitInfoRef.get() != null) {
+ startOffset = headerSplitInfoRef.get().length;
}
-
- // Break header apart into trailing newlines and
remaining text
- final byte[] headerBytes =
headerStream.toByteArray();
- int headerNewLineByteCount = 0;
- for (int i = headerBytes.length - 1; i >= 0; i--) {
- final byte headerByte = headerBytes[i];
-
- if (headerByte == '\r' || headerByte == '\n') {
- headerNewLineByteCount++;
- } else {
- break;
- }
+ long preAccumulatedLength = startOffset;
+ while ((splitInfo =
SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount,
splitInfo, preAccumulatedLength)) != null) {
+ computedSplitsInfo.add(splitInfo);
+ startOffset += splitInfo.length;
}
-
- if (headerNewLineByteCount == 0) {
- headerNewLineBytes = null;
- headerBytesWithoutTrailingNewLines =
headerBytes;
- } else {
- headerNewLineBytes = new
byte[headerNewLineByteCount];
- System.arraycopy(headerBytes,
headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0,
headerNewLineByteCount);
-
- headerBytesWithoutTrailingNewLines = new
byte[headerBytes.length - headerNewLineByteCount];
- System.arraycopy(headerBytes, 0,
headerBytesWithoutTrailingNewLines, 0, headerBytes.length -
headerNewLineByteCount);
+ long stop = System.nanoTime();
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Computed splits in " +
(stop - start) + " milliseconds.");
}
- } else {
- headerBytesWithoutTrailingNewLines = null;
- headerNewLineBytes = null;
}
-
- while (true) {
- if (headerInfoLineCount > 0) {
- // if we have header lines, create a new
FlowFile, copy the header lines to that file,
- // and then start copying lines
- final AtomicInteger linesCopied = new
AtomicInteger(0);
- final AtomicLong bytesCopied = new
AtomicLong(0L);
- FlowFile splitFile = session.create(flowFile);
- try {
- splitFile = session.write(splitFile, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream
rawOut) throws IOException {
- try (final BufferedOutputStream
out = new BufferedOutputStream(rawOut);
- final
ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
-
countingOut.write(headerBytesWithoutTrailingNewLines);
- //readLines has an offset of
countingOut.getBytesWritten() to allow for header bytes written already
- linesCopied.set(readLines(in,
maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
- includeLineDelimiter,
headerNewLineBytes));
-
bytesCopied.set(countingOut.getBytesWritten());
- }
- }
- });
- splitFile =
session.putAttribute(splitFile, SPLIT_LINE_COUNT,
String.valueOf(linesCopied.get()));
- splitFile =
session.putAttribute(splitFile, FRAGMENT_SIZE,
String.valueOf(bytesCopied.get()));
- logger.debug("Created Split File {} with
{} lines, {} bytes", new Object[]{splitFile, linesCopied.get(),
bytesCopied.get()});
- } finally {
- if (linesCopied.get() > 0) {
- splits.add(splitFile);
+ }
+ });
+ if (!error.get()) {
+ FlowFile headerFlowFile = null;
+ long headerCrlfLength = 0;
+ if (headerSplitInfoRef.get() != null) {
+ headerFlowFile = session.clone(flowFile,
headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
+ headerCrlfLength =
headerSplitInfoRef.get().trimmedLength;
+ }
+ int fragmentIndex = 1; // set to 1 to preserve the
existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2...
scheme
--- End diff --
I believe that leaving this as 1 is correct, and that it should not be
changed to 0. Changing this would break backward compatibility and provides no
real benefit. Though 0 is a more common starting point for developers, 1 is
also used commonly enough, and 1 was used as the starting point here
specifically because if you look at FlowFile attributes and see, for example, a
3, it is more intuitive to think that this is the 3rd one in a series, rather
than the 4th one.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---