Github user markobean commented on a diff in the pull request:
https://github.com/apache/nifi/pull/135#discussion_r50989494
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
---
@@ -230,100 +270,112 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final ProcessorLog logger = getLogger();
final int headerCount =
context.getProperty(HEADER_LINE_COUNT).asInteger();
final int splitCount =
context.getProperty(LINE_SPLIT_COUNT).asInteger();
+ final double maxFragmentSize;
+ if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
+ maxFragmentSize =
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B);
+ } else {
+ maxFragmentSize = Integer.MAX_VALUE;
+ }
+ final String headerMarker =
context.getProperty(HEADER_MARKER).getValue();
final boolean removeTrailingNewlines =
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
-
final ObjectHolder<String> errorMessage = new ObjectHolder<>(null);
- final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
-
final long startNanos = System.nanoTime();
final List<FlowFile> splits = new ArrayList<>();
+
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws
IOException {
try (final BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn);
final ByteCountingInputStream in = new
ByteCountingInputStream(bufferedIn)) {
- // if we have header lines, copy them into a
ByteArrayOutputStream
+ // Identify header, if any
final ByteArrayOutputStream headerStream = new
ByteArrayOutputStream();
- final int headerLinesCopied = readLines(in,
headerCount, headerStream, true);
- if (headerLinesCopied < headerCount) {
- errorMessage.set("Header Line Count is set to " +
headerCount + " but file had only " + headerLinesCopied + " lines");
+ final SplitInfo headerInfo = readHeader(headerCount,
headerMarker, in, headerStream, true);
+ if (headerInfo.lengthLines < headerCount) {
+ errorMessage.set("Header Line Count is set to " +
headerCount + " but file had only "
+ + headerInfo.lengthLines + " lines");
return;
}
while (true) {
- if (headerCount > 0) {
- // if we have header lines, create a new
FlowFile, copy the header lines to that file,
- // and then start copying lines
- final IntegerHolder linesCopied = new
IntegerHolder(0);
- FlowFile splitFile = session.create(flowFile);
- try {
- splitFile = session.write(splitFile, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream
rawOut) throws IOException {
- try (final BufferedOutputStream
out = new BufferedOutputStream(rawOut)) {
+ FlowFile splitFile = session.create(flowFile);
+ final SplitInfo flowFileInfo = new SplitInfo();
+
+ // if we have header lines, write them out
+ // and then start copying lines
+ try {
+ splitFile = session.write(splitFile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream
rawOut) throws IOException {
+ try (final BufferedOutputStream out =
new BufferedOutputStream(rawOut)) {
+ long lineCount = 0;
+ long byteCount = 0;
+ // Process header
+ if (headerInfo.lengthLines > 0) {
+ flowFileInfo.lengthBytes =
headerInfo.lengthBytes;
+ byteCount =
headerInfo.lengthBytes;
headerStream.writeTo(out);
- linesCopied.set(readLines(in,
splitCount, out, !removeTrailingNewlines));
+ }
+
+ // Process body
+ while (true) {
+ final ByteArrayOutputStream
dataStream = new ByteArrayOutputStream();
+ in.mark(1);
--- End diff --
This "in.mark()" is used to rollback the entire line if its inclusion in
the current split exceeds maxFragmentSize. (Reset on line 336.) However, its
readlimit should not be 1; it should be the maximum possible length of a line.
Since the length of a given line is unknown, recommend using
"Integer.MAX_VALUE."
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---