[ 
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14978888#comment-14978888
 ] 

ASF GitHub Bot commented on FLINK-2800:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1308

    [FLINK-2800] [kryo] Fix Kryo serialization to clear buffered data

    The Kryo serializer uses Kryo's Output class to buffer individual write 
operations before
    it is written to the underlying output stream. This Output class is flushed 
by Flink's
    KryoSerializer upon finishing its serialize call. However, in case of an 
exception when
    flushing the Output, the buffered data is kept in the buffer. Since Flink 
uses EOFExceptions
    to mark that an underlying buffer is full and has to be spilled, for 
example, it can happen
    that the record triggering the spilling is written twice after it is 
rewritten. The reason
    is that Kryo's Output buffer still contains the serialization data of the 
failed attempt which
    is also flushed to the emptied output stream.
    
    This duplication of records can lead to corrupted data which eventually 
let's the Flink program
    crash. The problem is solved by clearing Kryo's Output when the flush 
operation was not successful.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixKryoSerialization

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1308
    
----
commit 5618cf7ff65c972dc33c77ba953966224e8c2a1e
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2015-10-28T17:40:41Z

    [FLINK-2800] [kryo] Fix Kryo serialization to clear buffered data
    
    The Kryo serializer uses Kryo's Output class to buffer individual write 
operations before
    it is written to the underlying output stream. This Output class is flushed 
by Flink's
    KryoSerializer upon finishing its serialize call. However, in case of an 
exception when
    flushing the Output, the buffered data is kept in the buffer. Since Flink 
uses EOFExceptions
    to mark that an underlying buffer is full and has to be spilled, for 
example, it can happen
    that the record triggering the spilling is written twice after it is 
rewritten. The reason
    is that Kryo's Output buffer still contains the serialization data of the 
failed attempt which
    is also flushed to the emptied output stream.
    
    This duplication of records can lead to corrupted data which eventually 
let's the Flink program
    crash. The problem is solved by clearing Kryo's Output when the flush 
operation was not successful.

----


> kryo serialization problem
> --------------------------
>
>                 Key: FLINK-2800
>                 URL: https://issues.apache.org/jira/browse/FLINK-2800
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 0.10
>         Environment: linux ubuntu 12.04 LTS, Java 7
>            Reporter: Stefano Bortoli
>            Assignee: Till Rohrmann
>
> Performing a cross of two dataset of POJOs I have got the exception below. 
> The first time I run the process, there was no problem. When I run it the 
> second time, I have got the exception. My guess is that it could be a race 
> condition related to the reuse of the Kryo serializer object. However, it 
> could also be "a bug where type registrations are not properly forwarded to 
> all Serializers", as suggested by Stephan.
> ------------------------------------------------------------------------
> 2015-10-01 18:18:21 INFO  JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at 
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 114
>       at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>       at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
>       at 
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
>       at 
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
>       at 
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>       at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to