[
https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16135101#comment-16135101
]
Shashank Agarwal commented on FLINK-7484:
-----------------------------------------
Hi [~yew1eb]
It's very big code base, With lot of business logics. But i can give you
overview and scenario. We are using scala streams. But due to some cassandra
sink issue we convert scala stream to Java stream. So we convert internal
objects to java object also.
As per logs i guess this is the scenario. We use a ItemPojo class.
{code:java}
@SerialVersionUID(224567L)
@UDT(keyspace = "cstable", name = "item")
case class ItemPojo(
@BeanProperty var item_id: String,
@BeanProperty var product_title: String,
@BeanProperty var price: String
) extends Serializable
{
def this() {
this(null, null, null)
}
}
{code}
In a stream object we use java.util.List[ItemPojo] , It's not creating any
issue till now we were using lot of CEP's and we were using in global window
also. But after some time due to some need we have iterate over that list in
global window. Than we are getting this error some time and application got
crashed.
{code:java}
for (cItem <- cItemList) {
some logic here.
}
{code}
I think may be this is issue cause i am getting error after these.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException:
> Index: 7, Size: 5
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-7484
> URL: https://issues.apache.org/jira/browse/FLINK-7484
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, Scala API
> Affects Versions: 1.3.2
> Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
> Reporter: Shashank Agarwal
>
> I am using many CEP's and Global Window. I am getting following error
> sometimes and application crashes. I have checked logically there's no flow
> in the program. Here ItemPojo is a Pojo class and we are using
> java.utill.List[ItemPojo]. We are using Scala DataStream API please find
> attached logs.
> {code}
> 2017-08-17 10:04:12,814 INFO org.apache.flink.runtime.taskmanager.Task
> - TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
> co.thirdwatch.trigger.TransactionTrigger@5707c1cb,
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink:
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched
> from RUNNING to FAILED.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException:
> Index: 7, Size: 5
> Serialization trace:
> category (co.thirdwatch.pojo.ItemPojo)
> underlying (scala.collection.convert.Wrappers$SeqWrapper)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
> at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
> at
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> ... 22 more
> 2017-08-17 10:04:12,816 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
> co.thirdwatch.trigger.TransactionTrigger@5707c1cb,
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink:
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1).
> 2017-08-17 10:04:12,816 INFO org.apache.flink.runtime.taskmanager.Task
> - Ensuring all FileSystem streams are closed for task
> TriggerWindow(GlobalWindows(),
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
> co.thirdwatch.trigger.TransactionTrigger@5707c1cb,
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink:
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) [FAILED]
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)