Github user markobean commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/135#discussion_r50989494
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -230,100 +270,112 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
             final ProcessorLog logger = getLogger();
             final int headerCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
             final int splitCount = 
context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        final double maxFragmentSize;
    +        if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    +            maxFragmentSize = 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B);
    +        } else {
    +            maxFragmentSize = Integer.MAX_VALUE;
    +        }
    +        final String headerMarker = 
context.getProperty(HEADER_MARKER).getValue();
             final boolean removeTrailingNewlines = 
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
             final ObjectHolder<String> errorMessage = new ObjectHolder<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
             final long startNanos = System.nanoTime();
             final List<FlowFile> splits = new ArrayList<>();
    +
             session.read(flowFile, new InputStreamCallback() {
                 @Override
                 public void process(final InputStream rawIn) throws 
IOException {
                     try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn);
                             final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
     
    -                    // if we have header lines, copy them into a 
ByteArrayOutputStream
    +                    // Identify header, if any
                         final ByteArrayOutputStream headerStream = new 
ByteArrayOutputStream();
    -                    final int headerLinesCopied = readLines(in, 
headerCount, headerStream, true);
    -                    if (headerLinesCopied < headerCount) {
    -                        errorMessage.set("Header Line Count is set to " + 
headerCount + " but file had only " + headerLinesCopied + " lines");
    +                    final SplitInfo headerInfo = readHeader(headerCount, 
headerMarker, in, headerStream, true);
    +                    if (headerInfo.lengthLines < headerCount) {
    +                        errorMessage.set("Header Line Count is set to " + 
headerCount + " but file had only "
    +                                + headerInfo.lengthLines + " lines");
                             return;
                         }
     
                         while (true) {
    -                        if (headerCount > 0) {
    -                            // if we have header lines, create a new 
FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final IntegerHolder linesCopied = new 
IntegerHolder(0);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new 
OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream 
rawOut) throws IOException {
    -                                        try (final BufferedOutputStream 
out = new BufferedOutputStream(rawOut)) {
    +                        FlowFile splitFile = session.create(flowFile);
    +                        final SplitInfo flowFileInfo = new SplitInfo();
    +
    +                        // if we have header lines, write them out
    +                        // and then start copying lines
    +                        try {
    +                            splitFile = session.write(splitFile, new 
OutputStreamCallback() {
    +                                @Override
    +                                public void process(final OutputStream 
rawOut) throws IOException {
    +                                    try (final BufferedOutputStream out = 
new BufferedOutputStream(rawOut)) {
    +                                        long lineCount = 0;
    +                                        long byteCount = 0;
    +                                        // Process header
    +                                        if (headerInfo.lengthLines > 0) {
    +                                            flowFileInfo.lengthBytes = 
headerInfo.lengthBytes;
    +                                            byteCount = 
headerInfo.lengthBytes;
                                                 headerStream.writeTo(out);
    -                                            linesCopied.set(readLines(in, 
splitCount, out, !removeTrailingNewlines));
    +                                        }
    +
    +                                        // Process body
    +                                        while (true) {
    +                                            final ByteArrayOutputStream 
dataStream = new ByteArrayOutputStream();
    +                                            in.mark(1);
    --- End diff --
    
    This "in.mark()" is used to rollback the entire line if its inclusion in 
the current split exceeds maxFragmentSize. (Reset on line 336.) However, its 
readlimit should not be 1; it should be the maximum possible length of a line. 
Since the length of a given line is unknown, recommend using 
"Integer.MAX_VALUE."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to