[ 
https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michał Ciesielczyk updated FLINK-10714:
---------------------------------------
     Attachment: FailingJob.scala
    Description: 
I'm sometimes getting error while creating a checkpointing using a filesystem 
state backend. This ONLY happens when asynchronous snapshots are enabled using 
the FileSystem State Backend. When RocksDB is enabled everything works fine.

 

I'm using a simple KeyedStream,mapWithState function with a ValueState holding 
a  hashmap (scala.collection.immutable.Map). It's hard to reproduce the error 
using a simple code snippet, as the error occurs randomly.

 

This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), 
but I'm still experiencing such behavior.
  

Stacktrace:

 
{code:java}
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172]
    at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172]
    at 
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
 ~[kryo-shaded-4.0.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) 
~[kryo-shaded-4.0.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) 
~[kryo-shaded-4.0.0.jar:?]
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
 ~[flink-core-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
~[scala-library-2.11.12.jar:?]
    at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
~[scala-library-2.11.12.jar:?]
    at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
 ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
 ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
 ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
 ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) 
~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 
[flink-runtime_2.11-1.6.1.jar:1.6.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]

{code}
 

Edit:

I've attached a code sample for a Flink job that should reproduce the error 
(FailingJob.scala). Bare in mind, that the failures are not deterministic. On 
my local env, Flink usually creates from 10 to 2000 checkpoints before it fails.

 

  was:
I'm sometimes getting error while creating a checkpointing using a filesystem 
state backend. This ONLY happens when asynchronous snapshots are enabled using 
the FileSystem State Backend. When RocksDB is enabled everything works fine.

 

I'm using a simple KeyedStream,mapWithState function with a ValueState holding 
a  hashmap (scala.collection.immutable.Map). It's hard to reproduce the error 
using a simple code snippet, as the error occurs randomly.

 

This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), 
but I'm still experiencing such behavior.
  

Stacktrace:

 
{code:java}
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172]
    at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172]
    at 
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
 ~[kryo-shaded-4.0.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) 
~[kryo-shaded-4.0.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) 
~[kryo-shaded-4.0.0.jar:?]
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
 ~[flink-core-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
~[scala-library-2.11.12.jar:?]
    at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
~[scala-library-2.11.12.jar:?]
    at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 ~[flink-scala_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
 ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
 ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
 ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
 ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) 
~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 
[flink-runtime_2.11-1.6.1.jar:1.6.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]

{code}
 


> java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10714
>                 URL: https://issues.apache.org/jira/browse/FLINK-10714
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 1.5.5, 1.6.2
>         Environment: Flink 1.6.2, FsStateBackend
>            Reporter: Michał Ciesielczyk
>            Priority: Blocker
>             Fix For: 1.7.0
>
>         Attachments: FailingJob.scala
>
>
> I'm sometimes getting error while creating a checkpointing using a filesystem 
> state backend. This ONLY happens when asynchronous snapshots are enabled 
> using the FileSystem State Backend. When RocksDB is enabled everything works 
> fine.
>  
> I'm using a simple KeyedStream,mapWithState function with a ValueState 
> holding a  hashmap (scala.collection.immutable.Map). It's hard to reproduce 
> the error using a simple code snippet, as the error occurs randomly.
>  
> This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), 
> but I'm still experiencing such behavior.
>   
> Stacktrace:
>  
> {code:java}
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>     at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172]
>     at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172]
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
>  ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>  ~[flink-core-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 
> [flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
> {code}
>  
> Edit:
> I've attached a code sample for a Flink job that should reproduce the error 
> (FailingJob.scala). Bare in mind, that the failures are not deterministic. On 
> my local env, Flink usually creates from 10 to 2000 checkpoints before it 
> fails.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to