[ 
https://issues.apache.org/jira/browse/NIFI-2876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880555#comment-15880555
 ] 

ASF GitHub Bot commented on NIFI-2876:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1214#discussion_r102713854
  
    --- Diff: 
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractDemarcator.java
 ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.stream.io.util;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +import org.apache.nifi.stream.io.exception.TokenTooLargeException;
    +
    +/**
    + * Base class for implementing streaming demarcators.
    + * <p>
    + * NOTE: Not intended for multi-thread usage hence not Thread-safe.
    + * </p>
    + */
    +abstract class AbstractDemarcator implements Closeable {
    +
    +    final static int INIT_BUFFER_SIZE = 8192;
    +
    +    private final InputStream is;
    +
    +    private final int initialBufferSize;
    +
    +    private final int maxDataSize;
    +
    +    byte[] buffer;
    +
    +    int index;
    +
    +    int mark;
    +
    +    long offset;
    +
    +    int bufferLength;
    +
    +    /**
    +     * Constructs an instance of demarcator with provided {@link 
InputStream}
    +     * and max buffer size. Each demarcated token must fit within max 
buffer
    +     * size, otherwise the exception will be raised.
    +     */
    +    AbstractDemarcator(InputStream is, int maxDataSize) {
    +        this(is, maxDataSize, INIT_BUFFER_SIZE);
    +    }
    +
    +    /**
    +     * Constructs an instance of demarcator with provided {@link 
InputStream}
    +     * and max buffer size and initial buffer size. Each demarcated token 
must
    +     * fit within max buffer size, otherwise the exception will be raised.
    +     */
    +    AbstractDemarcator(InputStream is, int maxDataSize, int 
initialBufferSize) {
    +        this.validate(is, maxDataSize, initialBufferSize);
    +        this.is = is;
    +        this.initialBufferSize = initialBufferSize;
    +        this.buffer = new byte[initialBufferSize];
    +        this.maxDataSize = maxDataSize;
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        // noop
    +    }
    +
    +    /**
    +     * Will fill the current buffer from current 'index' position, 
expanding it
    +     * and or shuffling it if necessary. If buffer exceeds max buffer size 
a
    +     * {@link TokenTooLargeException} will be thrown.
    +     *
    +     * @throws IOException
    +     *             if unable to read from the stream
    +     */
    +    void fill() throws IOException {
    +        if (this.index >= this.buffer.length) {
    +            if (this.mark == 0) { // expand
    +                byte[] newBuff = new byte[this.buffer.length + 
this.initialBufferSize];
    +                System.arraycopy(this.buffer, 0, newBuff, 0, 
this.buffer.length);
    +                this.buffer = newBuff;
    +            } else { // shuffle
    +                int length = this.index - this.mark;
    +                System.arraycopy(this.buffer, this.mark, this.buffer, 0, 
length);
    +                this.index = length;
    +                this.mark = 0;
    +            }
    +        }
    +
    +        int bytesRead;
    +        do {
    +            bytesRead = this.is.read(this.buffer, this.index, 
this.buffer.length - this.index);
    +        } while (bytesRead == 0);
    +        this.bufferLength = bytesRead != -1 ? this.index + bytesRead : -1;
    +        if (this.bufferLength > this.maxDataSize) {
    +            throw new TokenTooLargeException("A message in the stream 
exceeds the maximum allowed message size of "
    +                    + this.maxDataSize + " bytes.");
    +        }
    +    }
    +
    +    /**
    +     * Will extract data token of the provided length from the current 
buffer.
    +     * The length of the data token is between the current 'mark' and 
'index'.
    --- End diff --
    
    I don't understand this comment. The length of the data token is whatever 
value is passed in, not "between the current 'mark' and 'index'." Does this 
mean that the length passed in is expected to be that? If so, why take in a 
length at all? Why not just calculate it within the method?


> Refactor TextLineDemarcator and StreamDemarcator into a common abstract class
> -----------------------------------------------------------------------------
>
>                 Key: NIFI-2876
>                 URL: https://issues.apache.org/jira/browse/NIFI-2876
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> Based on the work that has been performed as part of the NIFI-2851 we now 
> have a new class with a significantly faster logic to perform demarcation of 
> the InputStream (TextLineDemarcator). This new class's initial starting point 
> was the existing LineDemarcator. They both now share ~60-70% of common code 
> which would be important to extract into a common abstract class as well as 
> incorporate the new (faster) demarcation logic int StreamDemarcator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to