[
https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572529#comment-15572529
]
ASF GitHub Bot commented on NIFI-2851:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1116#discussion_r83256378
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
---
@@ -150,548 +145,320 @@
.description("If a file cannot be split for some reason, the
original file will be routed to this destination and nothing will be routed
elsewhere")
.build();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> properties;
+ private static final Set<Relationship> relationships;
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
+ static {
+ properties = new ArrayList<>();
properties.add(LINE_SPLIT_COUNT);
properties.add(FRAGMENT_MAX_SIZE);
properties.add(HEADER_LINE_COUNT);
properties.add(HEADER_MARKER);
properties.add(REMOVE_TRAILING_NEWLINES);
- this.properties = Collections.unmodifiableList(properties);
- final Set<Relationship> relationships = new HashSet<>();
+ relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_SPLITS);
relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
}
- @Override
- protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
- List<ValidationResult> results = new ArrayList<>();
-
- final boolean invalidState =
(validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
- &&
!validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
-
- results.add(new ValidationResult.Builder()
- .subject("Maximum Fragment Size")
- .valid(!invalidState)
- .explanation("Property must be specified when Line Split Count
is 0")
- .build()
- );
- return results;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- private int readLines(final InputStream in, final int maxNumLines,
final long maxByteCount, final OutputStream out,
- final boolean includeLineDelimiter, final byte[]
leadingNewLineBytes) throws IOException {
- final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
- byte[] leadingBytes = leadingNewLineBytes;
- int numLines = 0;
- long totalBytes = 0L;
- for (int i = 0; i < maxNumLines; i++) {
- final EndOfLineMarker eolMarker = countBytesToSplitPoint(in,
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
- final long bytes = eolMarker.getBytesConsumed();
- leadingBytes = eolMarker.getLeadingNewLineBytes();
-
- if (includeLineDelimiter && out != null) {
- if (leadingBytes != null) {
- out.write(leadingBytes);
- leadingBytes = null;
- }
- eolBuffer.drainTo(out);
- }
- totalBytes += bytes;
- if (bytes <= 0) {
- return numLines;
- }
- numLines++;
- if (totalBytes >= maxByteCount) {
- break;
- }
- }
- return numLines;
- }
-
- private EndOfLineMarker countBytesToSplitPoint(final InputStream in,
final OutputStream out, final long bytesReadSoFar, final long maxSize,
- final boolean
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[]
leadingNewLineBytes) throws IOException {
- long bytesRead = 0L;
- final ByteArrayOutputStream buffer;
- if (out != null) {
- buffer = new ByteArrayOutputStream();
- } else {
- buffer = null;
- }
- byte[] bytesToWriteFirst = leadingNewLineBytes;
-
- in.mark(Integer.MAX_VALUE);
- while (true) {
- final int nextByte = in.read();
-
- // if we hit end of stream we're done
- if (nextByte == -1) {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- }
- return new EndOfLineMarker(bytesRead, eolBuffer, true,
bytesToWriteFirst); // bytesToWriteFirst should be "null"?
- }
+ private volatile boolean removeTrailingNewLines;
- // Verify leading bytes do not violate size limitation
- if (bytesToWriteFirst != null && (bytesToWriteFirst.length +
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
- return new EndOfLineMarker(-1, eolBuffer, false,
leadingNewLineBytes);
- }
- // Write leadingNewLines, if appropriate
- if ( buffer != null && includeLineDelimiter &&
bytesToWriteFirst != null) {
- bytesRead += bytesToWriteFirst.length;
- buffer.write(bytesToWriteFirst);
- bytesToWriteFirst = null;
- }
- // buffer the output
- bytesRead++;
- if (buffer != null && nextByte != '\n' && nextByte != '\r') {
- if (bytesToWriteFirst != null) {
- buffer.write(bytesToWriteFirst);
- }
- bytesToWriteFirst = null;
- eolBuffer.drainTo(buffer);
- eolBuffer.clear();
- buffer.write(nextByte);
- }
+ private volatile long maxSplitSize;
- // check the size limit
- if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar >
0) {
- in.reset();
- if (buffer != null) {
- buffer.close();
- }
- return new EndOfLineMarker(-1, eolBuffer, false,
leadingNewLineBytes);
- }
+ private volatile int lineCount;
- // if we have a new line, then we're done
- if (nextByte == '\n') {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- eolBuffer.addEndOfLine(false, true);
- }
- return new EndOfLineMarker(bytesRead, eolBuffer, false,
bytesToWriteFirst);
- }
+ private volatile int headerLineCount;
- // Determine if \n follows \r; in either case, end of line has
been reached
- if (nextByte == '\r') {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- }
- in.mark(1);
- final int lookAheadByte = in.read();
- if (lookAheadByte == '\n') {
- eolBuffer.addEndOfLine(true, true);
- return new EndOfLineMarker(bytesRead + 1, eolBuffer,
false, bytesToWriteFirst);
- } else {
- in.reset();
- eolBuffer.addEndOfLine(true, false);
- return new EndOfLineMarker(bytesRead, eolBuffer,
false, bytesToWriteFirst);
- }
- }
- }
- }
+ private volatile String headerMarker;
- private SplitInfo locateSplitPoint(final InputStream in, final int
numLines, final boolean keepAllNewLines, final long maxSize,
- final long bufferedBytes) throws
IOException {
- final SplitInfo info = new SplitInfo();
- final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
- int lastByte = -1;
- info.lengthBytes = bufferedBytes;
- long lastEolBufferLength = 0L;
-
- while ((info.lengthLines < numLines || (info.lengthLines ==
numLines && lastByte == '\r'))
- && (((info.lengthBytes + eolBuffer.length()) < maxSize) ||
info.lengthLines == 0)
- && eolBuffer.length() < maxSize) {
- in.mark(1);
- final int nextByte = in.read();
- // Check for \n following \r on last line
- if (info.lengthLines == numLines && lastByte == '\r' &&
nextByte != '\n') {
- in.reset();
- break;
- }
- switch (nextByte) {
- case -1:
- info.endOfStream = true;
- if (keepAllNewLines) {
- info.lengthBytes += eolBuffer.length();
- }
- if (lastByte != '\r') {
- info.lengthLines++;
- }
- info.bufferedBytes = 0;
- return info;
- case '\r':
- eolBuffer.addEndOfLine(true, false);
- info.lengthLines++;
- info.bufferedBytes = 0;
- break;
- case '\n':
- eolBuffer.addEndOfLine(false, true);
- if (lastByte != '\r') {
- info.lengthLines++;
- }
- info.bufferedBytes = 0;
- break;
- default:
- if (eolBuffer.length() > 0) {
- info.lengthBytes += eolBuffer.length();
- lastEolBufferLength = eolBuffer.length();
- eolBuffer.clear();
- }
- info.lengthBytes++;
- info.bufferedBytes++;
- break;
- }
- lastByte = nextByte;
- }
- // if current line exceeds size and not keeping eol characters,
remove previously applied eol characters
- if ((info.lengthBytes + eolBuffer.length()) >= maxSize &&
!keepAllNewLines) {
- info.lengthBytes -= lastEolBufferLength;
- }
- if (keepAllNewLines) {
- info.lengthBytes += eolBuffer.length();
- }
- return info;
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.unmodifiableSet(relationships);
}
- private int countHeaderLines(final ByteCountingInputStream in,
- final String headerMarker) throws
IOException {
- int headerInfo = 0;
-
- final BufferedReader br = new BufferedReader(new
InputStreamReader(in));
- in.mark(Integer.MAX_VALUE);
- String line = br.readLine();
- while (line != null) {
- // if line is not a header line, reset stream and return
header counts
- if (!line.startsWith(headerMarker)) {
- in.reset();
- return headerInfo;
- } else {
- headerInfo++;
- }
- line = br.readLine();
- }
- in.reset();
- return headerInfo;
+ /**
+ *
+ */
+ @OnScheduled
+ public void onSchedule(ProcessContext context) {
+ this.removeTrailingNewLines =
context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
+ ?
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
+ this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
+ ?
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() :
Long.MAX_VALUE;
+ this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
+ this.headerLineCount =
context.getProperty(HEADER_LINE_COUNT).asInteger();
+ this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
}
+ /**
+ * Will split the incoming stream releasing all splits as FlowFile at
once.
+ */
@Override
- public void onTrigger(final ProcessContext context, final
ProcessSession session) {
- final FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final ComponentLog logger = getLogger();
- final int headerCount =
context.getProperty(HEADER_LINE_COUNT).asInteger();
- final int maxLineCount =
(context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
- ? Integer.MAX_VALUE :
context.getProperty(LINE_SPLIT_COUNT).asInteger();
- final long maxFragmentSize =
context.getProperty(FRAGMENT_MAX_SIZE).isSet()
- ?
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() :
Long.MAX_VALUE;
- final String headerMarker =
context.getProperty(HEADER_MARKER).getValue();
- final boolean includeLineDelimiter =
!context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
-
- final AtomicReference<String> errorMessage = new
AtomicReference<>(null);
- final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
-
- final long startNanos = System.nanoTime();
- final List<FlowFile> splits = new ArrayList<>();
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws
IOException {
- try (final BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn);
- final ByteCountingInputStream in = new
ByteCountingInputStream(bufferedIn)) {
-
- long bufferedPartialLine = 0;
-
- // if we have header lines, copy them into a
ByteArrayOutputStream
- final ByteArrayOutputStream headerStream = new
ByteArrayOutputStream();
- // Determine the number of lines of header, priority
given to HEADER_LINE_COUNT property
- int headerInfoLineCount = 0;
- if (headerCount > 0) {
- headerInfoLineCount = headerCount;
- } else {
- if (headerMarker != null) {
- headerInfoLineCount = countHeaderLines(in,
headerMarker);
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile != null) {
+ AtomicBoolean error = new AtomicBoolean();
+ List<FlowFile> splitFlowFiles = new ArrayList<>();
+ List<SplitInfo> computedSplitsInfo = new ArrayList<>();
+ AtomicReference<SplitInfo> headerSplitInfoRef = new
AtomicReference<>();
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ TextLineDemarcator demarcator = new
TextLineDemarcator(in);
+ SplitInfo splitInfo = null;
+ long startOffset = 0;
+
+ // Compute fragment representing the header (if
available)
+ long start = System.nanoTime();
+ try {
+ if (SplitText.this.headerLineCount > 0) {
+ splitInfo =
SplitText.this.computeHeader(demarcator, startOffset,
SplitText.this.headerLineCount, null, null);
+ if (splitInfo.lineCount <
SplitText.this.headerLineCount) {
+ error.set(true);
+ getLogger().error("Unable to split " +
flowFile + " due to insufficient amount of header lines. Required "
+ + SplitText.this.headerLineCount +
" but was " + splitInfo.lineCount + ". Routing to failure.");
+ }
+ } else if (SplitText.this.headerMarker != null) {
+ splitInfo =
SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE,
SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
}
+ headerSplitInfoRef.set(splitInfo);
+ } catch (IllegalStateException e) {
+ error.set(true);
+ getLogger().error(e.getMessage() + " Routing to
failure.");
--- End diff --
We should probably be logging e.toString() instead of e.getMessage(), as it
provides more details about what went wrong. We should also be logging a
toString() of the FlowFile that is being routed to failure.
> Improve performance of SplitText
> --------------------------------
>
> Key: NIFI-2851
> URL: https://issues.apache.org/jira/browse/NIFI-2851
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Oleg Zhurakousky
> Fix For: 1.1.0
>
>
> SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a
> 1.4 million line text file into 5k line chunks and then splits those 5k line
> chunks into 1 line chunks is only capable of pushing through about 10k lines
> per second. This equates to about 10 MB/sec. JVisualVM shows that the
> majority of the time is spent in the locateSplitPoint() method. Isolating
> this code and inspecting how it works, and using some micro-benchmarking, it
> appears that if we refactor the calls to InputStream.read() to instead read
> into a byte array, we can improve performance.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)