[ 
https://issues.apache.org/jira/browse/FLUME-2182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13780110#comment-13780110
 ] 

Sven Meys commented on FLUME-2182:
----------------------------------

It is related, but not the same.

In the case of FLUME-2052, they are dealing with corrupt characters that cause 
the readCahr method to throw a MalformedException.

In this specific case no such exceptions are thrown at all. Even when 
explicitly turning on reporting in the decoder. Instead, the readChar method 
returns a -1, which denotes end-of-file, and continues to do so in further 
attempts to read new characters.

I did some additional investigation. The workaround works as a temporary patch. 
The fix we proposed here by checking for minimum character width, is also 
kind-of a hack. But I found the real culprit and a fix that makes the code more 
robust.

The culprit: delta. When calling decoder.decode on an incomplete character (eg: 
two bytes of a three-byte char), the decoder does not ingest the chars, nor 
advances the position of the byte buffer.

This results in an empty char buffer, which returns -1 as expected, but also in 
a delta(number of bytes processed) that is zero. At the end of the method, the 
filepointer(position) is increased by delta (0 in this case).

Any following call to readChar will return -1, as the byte buffer did not 
advance its position and won't refill.

Forcing a call to refillBuffers at this point will result in strange behavior 
as your byte buffer still has the bytes of an incomplete character in it and 
the file pointer is positioned before that exact same set of bytes.

The fix: Check if delta is 0. If so, clear the bytebuffer, refill it, continue.

Here's the fixed function (note, this might not fix FLUME-2052):
{code:java}
    @Override
    public synchronized int readChar() throws IOException {
        if (!buf.hasRemaining()) {
            refillBuf();
        }

        int start = buf.position();
        charBuf.clear();

        boolean isEndOfInput = false;
        if (position >= fileSize) {
            isEndOfInput = true;
        }

        CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
        int delta = buf.position() - start;

        if(delta == 0 && buf.hasRemaining()) {
            logger.debug("Incomplete character detected! Reloading buffer");
            buf.clear();
            buf.flip();
            refillBuf();

            start = buf.position();
            res = decoder.decode(buf, charBuf, isEndOfInput);
            delta = buf.position() - start;
        }

        if (res.isMalformed() || res.isUnmappable()) {
            res.throwException();
        }

        charBuf.flip();
        if (charBuf.hasRemaining()) {
            char c = charBuf.get();
            // don't increment the persisted location if we are in between a
            // surrogate pair, otherwise we may never recover if we seek() to 
this
            // location!
            incrPosition(delta, !Character.isHighSurrogate(c));
            return c;

            // there may be a partial character in the decoder buffer
        } else {
            incrPosition(delta, false);
            return -1;
        }
    }
{code}
                
> Spooling Directory Source can't ingest data completely, when a file contain 
> some wide character, such as chinese character.
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2182
>                 URL: https://issues.apache.org/jira/browse/FLUME-2182
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.4.0
>            Reporter: syntony liu
>            Priority: Critical
>              Labels: workaround
>         Attachments: ModifiedLineDeserializer.java
>
>
> the bug is in ResettableFileInputStream.java: int readChar().
> if the last byte of buf is only a partial of a wide character, readChar()  
> shouldn't return -1(ResettableFileInputStream.java:186). it 
> loses the remanent data  in a file.
> I fix it such as: 
> public synchronized int readChar() throws IOException {
>    // if (!buf.hasRemaining()) {
>    if(buf.limit()- buf.position < 10){
>       refillBuf();
>     }
>     int start = buf.position();
>     charBuf.clear();
>     boolean isEndOfInput = false;
>     if (position >= fileSize) {
>       isEndOfInput = true;
>     }
>     CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
>     if (res.isMalformed() || res.isUnmappable()) {
>       res.throwException();
>     }
>     int delta = buf.position() - start;
>     charBuf.flip();
>     if (charBuf.hasRemaining()) {
>       char c = charBuf.get();
>       // don't increment the persisted location if we are in between a
>       // surrogate pair, otherwise we may never recover if we seek() to this
>       // location!
>       incrPosition(delta, !Character.isHighSurrogate(c));
>       return c;
>     // there may be a partial character in the decoder buffer
>     } else {
>       incrPosition(delta, false);
>       return -1;
>     }
>   }
> it avoid a partial character, but have new issue. sometime, some lines of a 
> log file have a repeated character.
> eg. 
>    original file: 123456
>    sink file:     1233456

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to