[
https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572526#comment-15572526
]
ASF GitHub Bot commented on NIFI-2851:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1116#discussion_r83260507
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
---
@@ -150,548 +145,320 @@
.description("If a file cannot be split for some reason, the
original file will be routed to this destination and nothing will be routed
elsewhere")
.build();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> properties;
+ private static final Set<Relationship> relationships;
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
+ static {
+ properties = new ArrayList<>();
properties.add(LINE_SPLIT_COUNT);
properties.add(FRAGMENT_MAX_SIZE);
properties.add(HEADER_LINE_COUNT);
properties.add(HEADER_MARKER);
properties.add(REMOVE_TRAILING_NEWLINES);
- this.properties = Collections.unmodifiableList(properties);
- final Set<Relationship> relationships = new HashSet<>();
+ relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_SPLITS);
relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
}
- @Override
- protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
- List<ValidationResult> results = new ArrayList<>();
-
- final boolean invalidState =
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
- &&
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
- results.add(new ValidationResult.Builder()
- .subject("Maximum Fragment Size")
- .valid(!invalidState)
- .explanation("Property must be specified when Line Split Count
is 0")
- .build()
- );
- return results;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- private int readLines(final InputStream in, final int maxNumLines,
final long maxByteCount, final OutputStream out,
- final boolean includeLineDelimiter, final byte[]
leadingNewLineBytes) throws IOException {
- final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
- byte[] leadingBytes = leadingNewLineBytes;
- int numLines = 0;
- long totalBytes = 0L;
- for (int i = 0; i < maxNumLines; i++) {
- final EndOfLineMarker eolMarker = countBytesToSplitPoint(in,
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
- final long bytes = eolMarker.getBytesConsumed();
- leadingBytes = eolMarker.getLeadingNewLineBytes();
-
- if (includeLineDelimiter && out != null) {
- if (leadingBytes != null) {
- out.write(leadingBytes);
- leadingBytes = null;
- }
- eolBuffer.drainTo(out);
- }
- totalBytes += bytes;
- if (bytes <= 0) {
- return numLines;
- }
- numLines++;
- if (totalBytes >= maxByteCount) {
- break;
- }
- }
- return numLines;
- }
-
- private EndOfLineMarker countBytesToSplitPoint(final InputStream in,
final OutputStream out, final long bytesReadSoFar, final long maxSize,
- final boolean
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[]
leadingNewLineBytes) throws IOException {
- long bytesRead = 0L;
- final ByteArrayOutputStream buffer;
- if (out != null) {
- buffer = new ByteArrayOutputStream();
- } else {
- buffer = null;
- }
- byte[] bytesToWriteFirst = leadingNewLineBytes;
-
- in.mark(Integer.MAX_VALUE);
- while (true) {
- final int nextByte = in.read();
-
- // if we hit end of stream we're done
- if (nextByte == -1) {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- }
- return new EndOfLineMarker(bytesRead, eolBuffer, true,
bytesToWriteFirst); // bytesToWriteFirst should be "null"?
- }
+ private volatile boolean removeTrailingNewLines;
- // Verify leading bytes do not violate size limitation
- if (bytesToWriteFirst != null && (bytesToWriteFirst.length +
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
- return new EndOfLineMarker(-1, eolBuffer, false,
leadingNewLineBytes);
- }
- // Write leadingNewLines, if appropriate
- if ( buffer != null && includeLineDelimiter &&
bytesToWriteFirst != null) {
- bytesRead += bytesToWriteFirst.length;
- buffer.write(bytesToWriteFirst);
- bytesToWriteFirst = null;
- }
- // buffer the output
- bytesRead++;
- if (buffer != null && nextByte != '\n' && nextByte != '\r') {
- if (bytesToWriteFirst != null) {
- buffer.write(bytesToWriteFirst);
- }
- bytesToWriteFirst = null;
- eolBuffer.drainTo(buffer);
- eolBuffer.clear();
- buffer.write(nextByte);
- }
+ private volatile long maxSplitSize;
- // check the size limit
- if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar >
0) {
- in.reset();
- if (buffer != null) {
- buffer.close();
- }
- return new EndOfLineMarker(-1, eolBuffer, false,
leadingNewLineBytes);
- }
+ private volatile int lineCount;
- // if we have a new line, then we're done
- if (nextByte == '\n') {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- eolBuffer.addEndOfLine(false, true);
- }
- return new EndOfLineMarker(bytesRead, eolBuffer, false,
bytesToWriteFirst);
- }
+ private volatile int headerLineCount;
- // Determine if \n follows \r; in either case, end of line has
been reached
- if (nextByte == '\r') {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- }
- in.mark(1);
- final int lookAheadByte = in.read();
- if (lookAheadByte == '\n') {
- eolBuffer.addEndOfLine(true, true);
- return new EndOfLineMarker(bytesRead + 1, eolBuffer,
false, bytesToWriteFirst);
- } else {
- in.reset();
- eolBuffer.addEndOfLine(true, false);
- return new EndOfLineMarker(bytesRead, eolBuffer,
false, bytesToWriteFirst);
- }
- }
- }
- }
+ private volatile String headerMarker;
- private SplitInfo locateSplitPoint(final InputStream in, final int
numLines, final boolean keepAllNewLines, final long maxSize,
- final long bufferedBytes) throws
IOException {
- final SplitInfo info = new SplitInfo();
- final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
- int lastByte = -1;
- info.lengthBytes = bufferedBytes;
- long lastEolBufferLength = 0L;
-
- while ((info.lengthLines < numLines || (info.lengthLines ==
numLines && lastByte == '\r'))
- && (((info.lengthBytes + eolBuffer.length()) < maxSize) ||
info.lengthLines == 0)
- && eolBuffer.length() < maxSize) {
- in.mark(1);
- final int nextByte = in.read();
- // Check for \n following \r on last line
- if (info.lengthLines == numLines && lastByte == '\r' &&
nextByte != '\n') {
- in.reset();
- break;
- }
- switch (nextByte) {
- case -1:
- info.endOfStream = true;
- if (keepAllNewLines) {
- info.lengthBytes += eolBuffer.length();
- }
- if (lastByte != '\r') {
- info.lengthLines++;
- }
- info.bufferedBytes = 0;
- return info;
- case '\r':
- eolBuffer.addEndOfLine(true, false);
- info.lengthLines++;
- info.bufferedBytes = 0;
- break;
- case '\n':
- eolBuffer.addEndOfLine(false, true);
- if (lastByte != '\r') {
- info.lengthLines++;
- }
- info.bufferedBytes = 0;
- break;
- default:
- if (eolBuffer.length() > 0) {
- info.lengthBytes += eolBuffer.length();
- lastEolBufferLength = eolBuffer.length();
- eolBuffer.clear();
- }
- info.lengthBytes++;
- info.bufferedBytes++;
- break;
- }
- lastByte = nextByte;
- }
- // if current line exceeds size and not keeping eol characters,
remove previously applied eol characters
- if ((info.lengthBytes + eolBuffer.length()) >= maxSize &&
!keepAllNewLines) {
- info.lengthBytes -= lastEolBufferLength;
- }
- if (keepAllNewLines) {
- info.lengthBytes += eolBuffer.length();
- }
- return info;
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.unmodifiableSet(relationships);
}
- private int countHeaderLines(final ByteCountingInputStream in,
- final String headerMarker) throws
IOException {
- int headerInfo = 0;
-
- final BufferedReader br = new BufferedReader(new
InputStreamReader(in));
- in.mark(Integer.MAX_VALUE);
- String line = br.readLine();
- while (line != null) {
- // if line is not a header line, reset stream and return
header counts
- if (!line.startsWith(headerMarker)) {
- in.reset();
- return headerInfo;
- } else {
- headerInfo++;
- }
- line = br.readLine();
- }
- in.reset();
- return headerInfo;
+ /**
+ *
+ */
+ @OnScheduled
+ public void onSchedule(ProcessContext context) {
+ this.removeTrailingNewLines =
context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
+ ?
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
+ this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
+ ?
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() :
Long.MAX_VALUE;
+ this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
+ this.headerLineCount =
context.getProperty(HEADER_LINE_COUNT).asInteger();
+ this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
}
+ /**
+ * Will split the incoming stream releasing all splits as FlowFile at
once.
+ */
@Override
- public void onTrigger(final ProcessContext context, final
ProcessSession session) {
- final FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final ComponentLog logger = getLogger();
- final int headerCount =
context.getProperty(HEADER_LINE_COUNT).asInteger();
- final int maxLineCount =
(context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
- ? Integer.MAX_VALUE :
context.getProperty(LINE_SPLIT_COUNT).asInteger();
- final long maxFragmentSize =
context.getProperty(FRAGMENT_MAX_SIZE).isSet()
- ?
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() :
Long.MAX_VALUE;
- final String headerMarker =
context.getProperty(HEADER_MARKER).getValue();
- final boolean includeLineDelimiter =
!context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
-
- final AtomicReference<String> errorMessage = new
AtomicReference<>(null);
- final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
-
- final long startNanos = System.nanoTime();
- final List<FlowFile> splits = new ArrayList<>();
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws
IOException {
- try (final BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn);
- final ByteCountingInputStream in = new
ByteCountingInputStream(bufferedIn)) {
-
- long bufferedPartialLine = 0;
-
- // if we have header lines, copy them into a
ByteArrayOutputStream
- final ByteArrayOutputStream headerStream = new
ByteArrayOutputStream();
- // Determine the number of lines of header, priority
given to HEADER_LINE_COUNT property
- int headerInfoLineCount = 0;
- if (headerCount > 0) {
- headerInfoLineCount = headerCount;
- } else {
- if (headerMarker != null) {
- headerInfoLineCount = countHeaderLines(in,
headerMarker);
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile != null) {
+ AtomicBoolean error = new AtomicBoolean();
+ List<FlowFile> splitFlowFiles = new ArrayList<>();
+ List<SplitInfo> computedSplitsInfo = new ArrayList<>();
+ AtomicReference<SplitInfo> headerSplitInfoRef = new
AtomicReference<>();
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ TextLineDemarcator demarcator = new
TextLineDemarcator(in);
+ SplitInfo splitInfo = null;
+ long startOffset = 0;
+
+ // Compute fragment representing the header (if
available)
+ long start = System.nanoTime();
+ try {
+ if (SplitText.this.headerLineCount > 0) {
+ splitInfo =
SplitText.this.computeHeader(demarcator, startOffset,
SplitText.this.headerLineCount, null, null);
+ if (splitInfo.lineCount <
SplitText.this.headerLineCount) {
+ error.set(true);
+ getLogger().error("Unable to split " +
flowFile + " due to insufficient amount of header lines. Required "
+ + SplitText.this.headerLineCount +
" but was " + splitInfo.lineCount + ". Routing to failure.");
+ }
+ } else if (SplitText.this.headerMarker != null) {
+ splitInfo =
SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE,
SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
}
+ headerSplitInfoRef.set(splitInfo);
+ } catch (IllegalStateException e) {
+ error.set(true);
+ getLogger().error(e.getMessage() + " Routing to
failure.");
}
- final byte[] headerNewLineBytes;
- final byte[] headerBytesWithoutTrailingNewLines;
- if (headerInfoLineCount > 0) {
- final int headerLinesCopied = readLines(in,
headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
-
- if (headerLinesCopied < headerInfoLineCount) {
- errorMessage.set("Header Line Count is set to
" + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
- return;
+ // Compute and collect fragments representing the
individual splits
+ if (!error.get()) {
+ if (headerSplitInfoRef.get() != null) {
+ startOffset = headerSplitInfoRef.get().length;
}
-
- // Break header apart into trailing newlines and
remaining text
- final byte[] headerBytes =
headerStream.toByteArray();
- int headerNewLineByteCount = 0;
- for (int i = headerBytes.length - 1; i >= 0; i--) {
- final byte headerByte = headerBytes[i];
-
- if (headerByte == '\r' || headerByte == '\n') {
- headerNewLineByteCount++;
- } else {
- break;
- }
+ long preAccumulatedLength = startOffset;
+ while ((splitInfo =
SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount,
splitInfo, preAccumulatedLength)) != null) {
+ computedSplitsInfo.add(splitInfo);
+ startOffset += splitInfo.length;
}
-
- if (headerNewLineByteCount == 0) {
- headerNewLineBytes = null;
- headerBytesWithoutTrailingNewLines =
headerBytes;
- } else {
- headerNewLineBytes = new
byte[headerNewLineByteCount];
- System.arraycopy(headerBytes,
headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0,
headerNewLineByteCount);
-
- headerBytesWithoutTrailingNewLines = new
byte[headerBytes.length - headerNewLineByteCount];
- System.arraycopy(headerBytes, 0,
headerBytesWithoutTrailingNewLines, 0, headerBytes.length -
headerNewLineByteCount);
+ long stop = System.nanoTime();
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Computed splits in " +
(stop - start) + " milliseconds.");
}
- } else {
- headerBytesWithoutTrailingNewLines = null;
- headerNewLineBytes = null;
}
-
- while (true) {
- if (headerInfoLineCount > 0) {
- // if we have header lines, create a new
FlowFile, copy the header lines to that file,
- // and then start copying lines
- final AtomicInteger linesCopied = new
AtomicInteger(0);
- final AtomicLong bytesCopied = new
AtomicLong(0L);
- FlowFile splitFile = session.create(flowFile);
- try {
- splitFile = session.write(splitFile, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream
rawOut) throws IOException {
- try (final BufferedOutputStream
out = new BufferedOutputStream(rawOut);
- final
ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
-
countingOut.write(headerBytesWithoutTrailingNewLines);
- //readLines has an offset of
countingOut.getBytesWritten() to allow for header bytes written already
- linesCopied.set(readLines(in,
maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
- includeLineDelimiter,
headerNewLineBytes));
-
bytesCopied.set(countingOut.getBytesWritten());
- }
- }
- });
- splitFile =
session.putAttribute(splitFile, SPLIT_LINE_COUNT,
String.valueOf(linesCopied.get()));
- splitFile =
session.putAttribute(splitFile, FRAGMENT_SIZE,
String.valueOf(bytesCopied.get()));
- logger.debug("Created Split File {} with
{} lines, {} bytes", new Object[]{splitFile, linesCopied.get(),
bytesCopied.get()});
- } finally {
- if (linesCopied.get() > 0) {
- splits.add(splitFile);
+ }
+ });
+ if (!error.get()) {
+ FlowFile headerFlowFile = null;
+ long headerCrlfLength = 0;
+ if (headerSplitInfoRef.get() != null) {
+ headerFlowFile = session.clone(flowFile,
headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
+ headerCrlfLength =
headerSplitInfoRef.get().trimmedLength;
+ }
+ int fragmentIndex = 1; // set to 1 to preserve the
existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2...
scheme
+ String fragmentId = UUID.randomUUID().toString();
+
+ if (computedSplitsInfo.size() == 0) {
+ FlowFile splitFlowFile = session.clone(flowFile, 0,
headerFlowFile.getSize() - headerCrlfLength);
+ splitFlowFile =
SplitText.this.updateAttributes(session, splitFlowFile, 0,
splitFlowFile.getSize(),
+ fragmentId, fragmentIndex++, 0,
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ splitFlowFiles.add(splitFlowFile);
+ } else {
+ for (SplitInfo computedSplitInfo : computedSplitsInfo)
{
+ long length =
SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength :
computedSplitInfo.length;
+ boolean proceedWithClone = headerFlowFile != null
|| length > 0;
+ if (proceedWithClone) {
+ FlowFile splitFlowFile = null;
+ if (headerFlowFile != null) {
+ if (length > 0) {
+ splitFlowFile =
session.clone(flowFile, computedSplitInfo.startOffset, length);
+ splitFlowFile = session.merge(
Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
} else {
- // if the number of content lines is a
multiple of the SPLIT_LINE_COUNT,
- // the last flow file will contain
just a header; don't forward that one
- session.remove(splitFile);
+ splitFlowFile =
session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); //
trim the last CRLF if split consists of only HEADER
}
- }
-
- // Check for EOF
- in.mark(1);
- if (in.read() == -1) {
- break;
- }
- in.reset();
-
- } else {
- // We have no header lines, so we can simply
demarcate the original File via the
- // ProcessSession#clone method.
- long beforeReadingLines =
in.getBytesConsumed() - bufferedPartialLine;
- final SplitInfo info = locateSplitPoint(in,
maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
- if
(context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
- bufferedPartialLine = info.bufferedBytes;
- }
- if (info.endOfStream) {
- // stream is out of data
- if (info.lengthBytes > 0) {
- info.offsetBytes = beforeReadingLines;
- splitInfos.add(info);
- final long procNanos =
System.nanoTime() - startNanos;
- final long procMillis =
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
- logger.debug("Detected start of Split
File in {} at byte offset {} with a length of {} bytes; "
- + "total splits = {};
total processing time = {} ms",
- new Object[]{flowFile,
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
- }
- break;
} else {
- if (info.lengthBytes != 0) {
- info.offsetBytes = beforeReadingLines;
- info.lengthBytes -=
bufferedPartialLine;
- splitInfos.add(info);
- final long procNanos =
System.nanoTime() - startNanos;
- final long procMillis =
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
- logger.debug("Detected start of Split
File in {} at byte offset {} with a length of {} bytes; "
- + "total splits = {};
total processing time = {} ms",
- new Object[]{flowFile,
beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
- }
+ splitFlowFile = session.clone(flowFile,
computedSplitInfo.startOffset, length);
}
+
+ splitFlowFile =
SplitText.this.updateAttributes(session, splitFlowFile,
computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId,
fragmentIndex++,
+ computedSplitsInfo.size(),
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ splitFlowFiles.add(splitFlowFile);
}
}
}
- }
- });
- if (errorMessage.get() != null) {
- logger.error("Unable to split {} due to {}; routing to
failure", new Object[]{flowFile, errorMessage.get()});
- session.transfer(flowFile, REL_FAILURE);
- if (!splits.isEmpty()) {
- session.remove(splits);
+ getLogger().info("Split " + flowFile + " into " +
splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing
headers." : "."));
+ if (headerFlowFile != null) {
+ session.remove(headerFlowFile);
+ }
}
- return;
- }
- if (!splitInfos.isEmpty()) {
- // Create the splits
- for (final SplitInfo info : splitInfos) {
- FlowFile split = session.clone(flowFile, info.offsetBytes,
info.lengthBytes);
- split = session.putAttribute(split, SPLIT_LINE_COUNT,
String.valueOf(info.lengthLines));
- split = session.putAttribute(split, FRAGMENT_SIZE,
String.valueOf(info.lengthBytes));
- splits.add(split);
+ if (error.get()) {
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ session.transfer(flowFile, REL_ORIGINAL);
+ session.transfer(splitFlowFiles, REL_SPLITS);
}
- }
- finishFragmentAttributes(session, flowFile, splits);
-
- if (splits.size() > 10) {
- logger.info("Split {} into {} files", new Object[]{flowFile,
splits.size()});
} else {
- logger.info("Split {} into {} files: {}", new
Object[]{flowFile, splits.size(), splits});
+ context.yield();
}
-
- session.transfer(flowFile, REL_ORIGINAL);
- session.transfer(splits, REL_SPLITS);
}
- private void finishFragmentAttributes(final ProcessSession session,
final FlowFile source, final List<FlowFile> splits) {
- final String originalFilename =
source.getAttribute(CoreAttributes.FILENAME.key());
-
- final String fragmentId = UUID.randomUUID().toString();
- final ArrayList<FlowFile> newList = new ArrayList<>(splits);
- splits.clear();
- for (int i = 1; i <= newList.size(); i++) {
- FlowFile ff = newList.get(i - 1);
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(FRAGMENT_ID, fragmentId);
- attributes.put(FRAGMENT_INDEX, String.valueOf(i));
- attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
- attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
- FlowFile newFF = session.putAllAttributes(ff, attributes);
- splits.add(newFF);
- }
+ /**
+ *
+ */
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ List<ValidationResult> results = new ArrayList<>();
+ boolean invalidState =
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
+ &&
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
+ results.add(new ValidationResult.Builder().subject("Maximum
Fragment Size").valid(!invalidState)
+ .explanation("Property must be specified when Line Split
Count is 0").build());
+ return results;
}
- private static class SplitInfo {
-
- public long offsetBytes;
- public long lengthBytes;
- public long lengthLines;
- public long bufferedBytes;
- public boolean endOfStream;
-
- public SplitInfo() {
- this.offsetBytes = 0L;
- this.lengthBytes = 0L;
- this.lengthLines = 0L;
- this.bufferedBytes = 0L;
- this.endOfStream = false;
- }
+ /**
+ *
+ */
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Collections.unmodifiableList(properties);
}
- public static class EndOfLineBuffer {
- private static final byte CARRIAGE_RETURN = (byte) '\r';
- private static final byte NEWLINE = (byte) '\n';
-
- private final BitSet buffer = new BitSet();
- private int index = 0;
-
- public void clear() {
- index = 0;
- }
-
- public void addEndOfLine(final boolean carriageReturn, final
boolean newLine) {
- buffer.set(index++, carriageReturn);
- buffer.set(index++, newLine);
- }
-
- private void drainTo(final OutputStream out) throws IOException {
- for (int i = 0; i < index; i += 2) {
- final boolean cr = buffer.get(i);
- final boolean nl = buffer.get(i + 1);
-
- // we've consumed all data in the buffer
- if (!cr && !nl) {
- return;
- }
-
- if (cr) {
- out.write(CARRIAGE_RETURN);
+ /**
--- End diff --
Should remove blank javadoc lines
> Improve performance of SplitText
> --------------------------------
>
> Key: NIFI-2851
> URL: https://issues.apache.org/jira/browse/NIFI-2851
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Oleg Zhurakousky
> Fix For: 1.1.0
>
>
> SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a
> 1.4 million line text file into 5k line chunks and then splits those 5k line
> chunks into 1 line chunks is only capable of pushing through about 10k lines
> per second. This equates to about 10 MB/sec. JVisualVM shows that the
> majority of the time is spent in the locateSplitPoint() method. Isolating
> this code and inspecting how it works, and using some micro-benchmarking, it
> appears that if we refactor the calls to InputStream.read() to instead read
> into a byte array, we can improve performance.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)