[
https://issues.apache.org/jira/browse/NIFI-1118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310953#comment-15310953
]
ASF GitHub Bot commented on NIFI-1118:
--------------------------------------
Github user mosermw commented on a diff in the pull request:
https://github.com/apache/nifi/pull/444#discussion_r65425886
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
---
@@ -147,138 +193,200 @@ protected void init(final
ProcessorInitializationContext context) {
return properties;
}
- private int readLines(final InputStream in, final int maxNumLines,
final OutputStream out, final boolean keepAllNewLines, final byte[]
leadingNewLineBytes) throws IOException {
+ private int readLines(final InputStream in, final int maxNumLines,
final long maxByteCount, final OutputStream out,
+ final boolean includeLineDelimiter, final byte[]
leadingNewLineBytes) throws IOException {
final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
- int numLines = 0;
byte[] leadingBytes = leadingNewLineBytes;
+ int numLines = 0;
+ long totalBytes = 0L;
for (int i = 0; i < maxNumLines; i++) {
- final EndOfLineMarker eolMarker = locateEndOfLine(in, out,
false, eolBuffer, leadingBytes);
+ final EndOfLineMarker eolMarker = countBytesToSplitPoint(in,
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
+ final long bytes = eolMarker.getBytesConsumed();
leadingBytes = eolMarker.getLeadingNewLineBytes();
- if (keepAllNewLines && out != null) {
+ if (includeLineDelimiter && out != null) {
if (leadingBytes != null) {
out.write(leadingBytes);
leadingBytes = null;
}
-
eolBuffer.drainTo(out);
}
-
- if (eolBuffer.length() > 0 || eolMarker.getBytesConsumed() >
0L) {
- numLines++;
+ totalBytes += bytes;
+ if (bytes <= 0) {
+ return numLines;
}
-
- if (eolMarker.isStreamEnded()) {
+ numLines++;
+ if (totalBytes >= maxByteCount) {
break;
}
}
-
return numLines;
}
- private EndOfLineMarker locateEndOfLine(final InputStream in, final
OutputStream out, final boolean includeLineDelimiter,
- final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes)
throws IOException {
-
+ private EndOfLineMarker countBytesToSplitPoint(final InputStream in,
final OutputStream out, final long bytesReadSoFar, final long maxSize,
+ final boolean
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[]
leadingNewLineBytes) throws IOException {
int lastByte = -1;
long bytesRead = 0L;
+ final ByteArrayOutputStream buffer;
+ if (out != null) {
+ buffer = new ByteArrayOutputStream();
+ } else {
+ buffer = null;
+ }
byte[] bytesToWriteFirst = leadingNewLineBytes;
+ in.mark(Integer.MAX_VALUE);
while (true) {
- in.mark(1);
final int nextByte = in.read();
- final boolean isNewLineChar = nextByte == '\r' || nextByte ==
'\n';
-
- // if we hit end of stream or new line we're done
+ // if we hit end of stream we're done
if (nextByte == -1) {
- if (lastByte == '\r') {
- eolBuffer.addEndOfLine(true, false);
+ if (buffer != null) {
+ buffer.writeTo(out);
+ buffer.close();
}
-
- return new EndOfLineMarker(bytesRead, eolBuffer, true,
bytesToWriteFirst);
+ return new EndOfLineMarker(bytesRead, eolBuffer, true,
bytesToWriteFirst); // bytesToWriteFirst should be "null"?
}
- // If we get a character that's not an end-of-line char, then
- // we need to write out the EOL's that we have buffered (if
out != null).
- // Then, we need to reset our EOL buffer because we no longer
have consecutive EOL's
- if (!isNewLineChar) {
+ // Verify leading bytes do not violate size limitation
+ if (bytesToWriteFirst != null && (bytesToWriteFirst.length +
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
+ return new EndOfLineMarker(-1, eolBuffer, false,
leadingNewLineBytes);
+ }
+ // Write leadingNewLines, if appropriate
+ if ( buffer != null && includeLineDelimiter &&
bytesToWriteFirst != null) {
+ bytesRead += bytesToWriteFirst.length;
+ buffer.write(bytesToWriteFirst);
+ bytesToWriteFirst = null;
+ }
+ // buffer the output
+ bytesRead++;
+ if (buffer != null && nextByte != '\n' && nextByte != '\r') {
if (bytesToWriteFirst != null) {
- if (out != null) {
- out.write(bytesToWriteFirst);
- }
-
- bytesToWriteFirst = null;
- }
-
- if (out != null) {
- eolBuffer.drainTo(out);
+ buffer.write(bytesToWriteFirst);
}
-
+ bytesToWriteFirst = null;
+ eolBuffer.drainTo(buffer);
eolBuffer.clear();
+ buffer.write(nextByte);
}
- // if there's an OutputStream to copy the data to, copy it, if
appropriate.
- // "if appropriate" means that it's not a line delimiter or
that we want to copy line delimiters
- bytesRead++;
- if (out != null && (includeLineDelimiter || !isNewLineChar)) {
- if (bytesToWriteFirst != null) {
- out.write(bytesToWriteFirst);
- bytesToWriteFirst = null;
+ // check the size limit
+ if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar >
0) {
+ in.reset();
+ if (buffer != null) {
+ buffer.close();
}
-
- out.write(nextByte);
+ return new EndOfLineMarker(-1, eolBuffer, false,
leadingNewLineBytes);
}
// if we have a new line, then we're done
if (nextByte == '\n') {
- eolBuffer.addEndOfLine(lastByte == '\r', true);
+ if (buffer != null) {
+ buffer.writeTo(out);
+ buffer.close();
+ eolBuffer.addEndOfLine(false, true); //TODO: verify
"false" is equivalent to "lastByte == '\r'"
--- End diff --
Is this TODO a reminder to future @markobean? Does a current unit test
verify it?
> Enable SplitText processor to limit line length and filter header lines
> -----------------------------------------------------------------------
>
> Key: NIFI-1118
> URL: https://issues.apache.org/jira/browse/NIFI-1118
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Mark Bean
> Assignee: Mark Bean
> Fix For: 0.7.0
>
>
> Include the following functionality to the SplitText processor:
> 1) Maximum size limit of the split file(s)
> A new split file will be created if the next line to be added to the current
> split file exceeds a user-defined maximum file size
> 2) Header line marker
> User-defined character(s) can be used to identify the header line(s) of the
> data file rather than a predetermined number of lines
> These changes are additions, not a replacement of any property or behavior.
> In the case of header line marker, the existing property "Header Line Count"
> must be zero for the new property and behavior to be used.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)