[ 
https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788315#comment-16788315
 ] 

Jürgen Kreileder commented on FLINK-11420:
------------------------------------------

Yep, concurrent access:
{code:java}
2019-03-08 21:11:55,674 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Cleanup 
AsyncCheckpointRunnable for checkpoint 9 of Generate Finding Commands 377823 
(1/1).
2019-03-08 21:11:55,680 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Generate Finding Commands 377823 (1/1) 
(8bed70dd3de95f9c0c82daa19679a4be) switched from RUNNING to FAILED.
java.lang.IllegalStateException: Concurrent access to KryoSerializer. Thread 1: 
Generate Finding Commands 377823 (1/1) , Thread 2: pool-138-thread-1
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.enterExclusiveThread(KryoSerializer.java:630)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:242)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:537)
        at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:506)
        at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
        at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:297)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:321)
        at 
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
{code}
The slow-down in job submission is unrelated. I think it's caused by 
FLINK-11539.

> Serialization of case classes containing a Map[String, Any] sometimes throws 
> ArrayIndexOutOfBoundsException
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11420
>                 URL: https://issues.apache.org/jira/browse/FLINK-11420
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>    Affects Versions: 1.7.1
>            Reporter: Jürgen Kreileder
>            Assignee: Dawid Wysakowicz
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.7.3, 1.8.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> We frequently run into random ArrayIndexOutOfBounds exceptions when flink 
> tries to serialize Scala case classes containing a Map[String, Any] (Any 
> being String, Long, Int, or Boolean) with the FsStateBackend. (This probably 
> happens with any case class containing a type requiring Kryo, see this thread 
> for instance: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e])
> Disabling asynchronous snapshots seems to work around the problem, so maybe 
> something is not thread-safe in CaseClassSerializer.
> Our objects look like this:
> {code}
> case class Event(timestamp: Long, [...], content: Map[String, Any]
> case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any])
> {code}
> I've looked at a few of the exceptions in a debugger. It always happens when 
> serializing the right-hand side a tuple from EnrichedEvent -> Event -> 
> content, e.g: 13 from ("foo", 13) or false from ("bar", false).
> Stacktrace:
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0
>  at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>  at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>  at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69)
>  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to