[ 
https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16135101#comment-16135101
 ] 

Shashank Agarwal commented on FLINK-7484:
-----------------------------------------

Hi [~yew1eb]
It's very big code base, With lot of business logics. But i can give you 
overview and scenario. We are using scala streams. But due to some cassandra 
sink issue we convert scala stream to Java stream. So we convert internal 
objects to java object also.

As per logs i guess this is the scenario.  We use a ItemPojo class. 


{code:java}
@SerialVersionUID(224567L)
@UDT(keyspace = "cstable", name = "item")
case class ItemPojo(
                          @BeanProperty var item_id: String,
                          @BeanProperty var product_title: String,
                          @BeanProperty var price: String
                     ) extends Serializable
{
  def this() {
    this(null, null, null)
  }
}
{code}

In a stream object we use java.util.List[ItemPojo] , It's not creating any 
issue till now we were using lot of CEP's and we were using in global window 
also. But after some time due to some need we have iterate over that list in 
global window. Than we are getting this error some time and application got 
crashed. 


{code:java}
for (cItem <- cItemList) {
some logic here.
}
{code}

I think may be this is issue cause i am getting error after these. 



> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7484
>                 URL: https://issues.apache.org/jira/browse/FLINK-7484
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Scala API
>    Affects Versions: 1.3.2
>         Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
>            Reporter: Shashank Agarwal
>
> I am using many CEP's and Global Window. I am getting following error 
> sometimes and application  crashes. I have checked logically there's no flow 
> in the program. Here ItemPojo is a Pojo class and we are using 
> java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
> attached logs.
> {code}
> 2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched 
> from RUNNING to FAILED.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> Serialization trace:
> category (co.thirdwatch.pojo.ItemPojo)
> underlying (scala.collection.convert.Wrappers$SeqWrapper)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
>       at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>       at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>       at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
>       at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>       at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>       at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
>       at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>       at java.util.ArrayList.get(ArrayList.java:429)
>       at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>       at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>       at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>       ... 22 more
> 2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Freeing task resources for TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1).
> 2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Ensuring all FileSystem streams are closed for task 
> TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) [FAILED]
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to