[ 
https://issues.apache.org/jira/browse/NIFI-1118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15119295#comment-15119295
 ] 

ASF GitHub Bot commented on NIFI-1118:
--------------------------------------

Github user markobean commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/135#discussion_r50989494
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -230,100 +270,112 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
             final ProcessorLog logger = getLogger();
             final int headerCount = 
context.getProperty(HEADER_LINE_COUNT).asInteger();
             final int splitCount = 
context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        final double maxFragmentSize;
    +        if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    +            maxFragmentSize = 
context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B);
    +        } else {
    +            maxFragmentSize = Integer.MAX_VALUE;
    +        }
    +        final String headerMarker = 
context.getProperty(HEADER_MARKER).getValue();
             final boolean removeTrailingNewlines = 
context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
             final ObjectHolder<String> errorMessage = new ObjectHolder<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
             final long startNanos = System.nanoTime();
             final List<FlowFile> splits = new ArrayList<>();
    +
             session.read(flowFile, new InputStreamCallback() {
                 @Override
                 public void process(final InputStream rawIn) throws 
IOException {
                     try (final BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn);
                             final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
     
    -                    // if we have header lines, copy them into a 
ByteArrayOutputStream
    +                    // Identify header, if any
                         final ByteArrayOutputStream headerStream = new 
ByteArrayOutputStream();
    -                    final int headerLinesCopied = readLines(in, 
headerCount, headerStream, true);
    -                    if (headerLinesCopied < headerCount) {
    -                        errorMessage.set("Header Line Count is set to " + 
headerCount + " but file had only " + headerLinesCopied + " lines");
    +                    final SplitInfo headerInfo = readHeader(headerCount, 
headerMarker, in, headerStream, true);
    +                    if (headerInfo.lengthLines < headerCount) {
    +                        errorMessage.set("Header Line Count is set to " + 
headerCount + " but file had only "
    +                                + headerInfo.lengthLines + " lines");
                             return;
                         }
     
                         while (true) {
    -                        if (headerCount > 0) {
    -                            // if we have header lines, create a new 
FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final IntegerHolder linesCopied = new 
IntegerHolder(0);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new 
OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream 
rawOut) throws IOException {
    -                                        try (final BufferedOutputStream 
out = new BufferedOutputStream(rawOut)) {
    +                        FlowFile splitFile = session.create(flowFile);
    +                        final SplitInfo flowFileInfo = new SplitInfo();
    +
    +                        // if we have header lines, write them out
    +                        // and then start copying lines
    +                        try {
    +                            splitFile = session.write(splitFile, new 
OutputStreamCallback() {
    +                                @Override
    +                                public void process(final OutputStream 
rawOut) throws IOException {
    +                                    try (final BufferedOutputStream out = 
new BufferedOutputStream(rawOut)) {
    +                                        long lineCount = 0;
    +                                        long byteCount = 0;
    +                                        // Process header
    +                                        if (headerInfo.lengthLines > 0) {
    +                                            flowFileInfo.lengthBytes = 
headerInfo.lengthBytes;
    +                                            byteCount = 
headerInfo.lengthBytes;
                                                 headerStream.writeTo(out);
    -                                            linesCopied.set(readLines(in, 
splitCount, out, !removeTrailingNewlines));
    +                                        }
    +
    +                                        // Process body
    +                                        while (true) {
    +                                            final ByteArrayOutputStream 
dataStream = new ByteArrayOutputStream();
    +                                            in.mark(1);
    --- End diff --
    
    This "in.mark()" is used to rollback the entire line if its inclusion in 
the current split exceeds maxFragmentSize. (Reset on line 336.) However, its 
readlimit should not be 1; it should be the maximum possible length of a line. 
Since the length of a given line is unknown, recommend using 
"Integer.MAX_VALUE."


> Enable SplitText processor to limit line length and filter header lines
> -----------------------------------------------------------------------
>
>                 Key: NIFI-1118
>                 URL: https://issues.apache.org/jira/browse/NIFI-1118
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Mark Bean
>            Assignee: Joe Skora
>             Fix For: 0.6.0
>
>
> Include the following functionality to the SplitText processor:
> 1) Maximum size limit of the split file(s)
> A new split file will be created if the next line to be added to the current 
> split file exceeds a user-defined maximum file size
> 2) Header line marker
> User-defined character(s) can be used to identify the header line(s) of the 
> data file rather than a predetermined number of lines
> These changes are additions, not a replacement of any property or behavior. 
> In the case of header line marker, the existing property "Header Line Count" 
> must be zero for the new property and behavior to be used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to