Fabian,

I tried running it again and I noticed there were some more exceptions in
the log. I fixed those and I don’t see the original error but I do see
other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I didn’t
even enable that yet like you suggested). Examples:

1)

10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput
      - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 255
        at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectInt
Map.java:364)
        at 
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResol
ver.java:47)
        at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
a:95)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
a:21)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K
ryoSerializer.java:186)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe
rializer.java:372)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
alize(StreamRecordSerializer.java:89)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
alize(StreamRecordSerializer.java:29)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization
Delegate.java:51)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali
zer.addRecord(SpanningRecordSerializer.java:76)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit
er.java:83)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW
riter.java:58)
        at 
org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.
java:62)
        at 
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector
Wrapper.java:40)
        at 
org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc
e.java:40)
        at com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
        at java.lang.Thread.run(Thread.java:745)



2)
10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput
      - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 334
        at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntM
ap.java:207)
        at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMa
p.java:117)
        at 
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapRef
erenceResolver.java:23)
        at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
a:88)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav
a:21)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K
ryoSerializer.java:186)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe
rializer.java:372)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
alize(StreamRecordSerializer.java:89)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri
alize(StreamRecordSerializer.java:29)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization
Delegate.java:51)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali
zer.addRecord(SpanningRecordSerializer.java:76)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit
er.java:83)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW
riter.java:58)
        at 
org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput.
java:62)
        at 
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector
Wrapper.java:40)
        at 
org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc
e.java:40)
        at com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
        at java.lang.Thread.run(Thread.java:745)

3)

com.esotericsoftware.kryo.KryoException: Encountered unregistered class
ID: 106
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassR
esolver.java:119)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java
:135)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java
:21)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize
(KryoSerializer.java:211)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize
(KryoSerializer.java:225)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo
Serializer.java:499)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese
rialize(StreamRecordSerializer.java:102)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese
rialize(StreamRecordSerializer.java:29)
        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi
ngDeserializationDelegate.java:57)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann
ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali
zer.java:110)
        at 
org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext
Record(StreamingAbstractRecordReader.java:80)
        at 
org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str
eamingMutableRecordReader.java:36)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.
java:59)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp
utStreamTask.java:68)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput
StreamTask.java:101)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

4)

java.lang.IllegalArgumentException: You can store only Strings, Integer
and Longs in the ProtocolDetailMap, not: 'false' for 'null'
        at 
io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100)
        at 
io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java
:144)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java
:21)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize
(KryoSerializer.java:211)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize
(KryoSerializer.java:225)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo
Serializer.java:499)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese
rialize(StreamRecordSerializer.java:102)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese
rialize(StreamRecordSerializer.java:29)
        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi
ngDeserializationDelegate.java:57)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann
ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali
zer.java:110)
        at 
org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext
Record(StreamingAbstractRecordReader.java:80)
        at 
org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str
eamingMutableRecordReader.java:36)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.
java:59)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp
utStreamTask.java:68)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput
StreamTask.java:101)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

5)


java.lang.IndexOutOfBoundsException: Index: 85, Size: 9
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.get(ArrayList.java:429)
        at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefere
nceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java
:135)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java
:21)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize
(KryoSerializer.java:211)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize
(KryoSerializer.java:225)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo
Serializer.java:499)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese
rialize(StreamRecordSerializer.java:102)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese
rialize(StreamRecordSerializer.java:29)
        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi
ngDeserializationDelegate.java:57)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann
ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali
zer.java:110)
        at 
org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext
Record(StreamingAbstractRecordReader.java:80)
        at 
org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str
eamingMutableRecordReader.java:36)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.
java:59)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp
utStreamTask.java:68)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput
StreamTask.java:101)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)



As you can tell, there’s a common theme there which is the MapSerializer
in Kryo. The source of my grief seems to be the two java.util.Map
implementations ProtocolAttributeMap and ProtocolDetailMap. They’re custom
implementations and they’re anal about what types of objects you can have
as values.


Here’s the output of the gist you asked me to run:

class org.apache.flink.api.java.typeutils.PojoTypeInfo : class
io.pivotal.rti.protocols.ProtocolEvent
(
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
java.lang.Long
class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
java.lang.Long
class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
java.lang.Long
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
class org.apache.flink.api.java.typeutils.GenericTypeInfo : class
io.pivotal.rti.protocols.ProtocolAttributeMap
class org.apache.flink.api.java.typeutils.GenericTypeInfo : class
io.pivotal.rti.protocols.ProtocolDetailMap
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
java.lang.Short
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
java.lang.Short
class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
java.lang.String
)


Right now, I’m using the gson library to convert the ProtocolEvent
instance to JSON and back. I think I have to write a custom converter to
make ProtocolEvent a proper POJO in order for it to work with the
serializers in Flink.

Sorry for the long email.

Thanks,
Ali


On 2015-11-11, 10:02 AM, "Fabian Hueske" <fhue...@gmail.com> wrote:

>Hi Ali,
>
>I looked into this issue. This problem seems to be caused because the
>deserializer reads more data than it should read.
>This might happen because of two reasons:
>  1) the meta information of how much data is safe to read is incorrect.
>  2) the serializer and deserializer logic are not in sync which can cause
>the deserializer to read more data than the serializer wrote.
>
>The first case is less likely: Flink writes the binary length of each
>record in front of its serialized representation. This happens whenever
>data is sent over the network, regardless of the data type. A bug in this
>part would be very crucial, but is also less likely because this happens
>very often and has not occurred yet.
>
>IMO, this looks like an issue of the serialization logic. Looking at your
>code, the problem occurs when deserializing ProtocolEvent objects.
>Is it possible that you share this class with me?
>
>If it is not possible to share the class, it would be good, to know the
>field types of the Pojo and the associated TypeInformation.
>For that you can run the code in this gist [1] which will recursively
>print
>the field types and their TypeInformation.
>
>As a temporal workaround, you can try to use Kryo to serialize and
>deserialize your Pojos as follows:
>ExecutionEnvironment env = ...
>env.getConfig().enableForceKryo();
>
>Best,
>Fabian
>
>[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b
>
>2015-11-11 10:38 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Ali,
>>
>> one more thing. Did that error occur once or is it reproducable?
>>
>> Thanks for your help,
>> Fabian
>>
>> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <u...@apache.org>:
>>
>>> Hey Ali,
>>>
>>> thanks for sharing the code. I assume that the custom
>>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They
>>> should not be a problem. I think this is a bug in Flink 0.9.1.
>>>
>>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8)
>>> version and report back?
>>>
>>> 1) Add
>>> https://repository.apache.org/content/repositories/orgapacheflink-1055
>>> as a
>>> snapshot repository
>>>
>>> <repositories>
>>> <repository>
>>> <id>apache.snapshots</id>
>>> <name>Apache Development Snapshot Repository</name>
>>> <url>
>>> https://repository.apache.org/content/repositories/orgapacheflink-1055
>>> </url>
>>> <releases>
>>> <enabled>false</enabled>
>>> </releases>
>>> <snapshots>
>>> <enabled>true</enabled>
>>> </snapshots>
>>> </repository>
>>> </repositories>
>>>
>>> 2) Set the Flink dependency version to 0.10.0
>>>
>>> 3) Use the Flink binary matching your Hadoop installation from here:
>>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java,
>>>you
>>> can go with the Scala 2.10 builds)
>>>
>>> Sorry for the inconvenience! The release is about to be finished (the
>>> voting process is already going on).
>>>
>>> ­ Ufuk
>>>
>>>
>>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <ali.kash...@emc.com>
>>> wrote:
>>>
>>> > Thanks for the quick reply guys! A lot of interest in this one. I¹ve
>>> > attached the source code is attached. There are other supporting
>>> > modules/classes but the main flink component is in the included zip
>>> file.
>>> >
>>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off
>>> the
>>> > website (flink-0.9.1-bin-hadoop1.tgz).
>>> >
>>> > In answer to Ufuk¹s question: Yes I¹m using custom data types.
>>> >
>>> > Thanks,
>>> > Ali
>>> >
>>> >
>>> >
>>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <u...@apache.org> wrote:
>>> >
>>> > >Thanks for reporting this. Are you using any custom data types?
>>> > >
>>> > >If you can share your code, it would be very helpful in order to
>>>debug
>>> > >this.
>>> > >
>>> > >­ Ufuk
>>> > >
>>> > >On Tuesday, 10 November 2015, Fabian Hueske <fhue...@gmail.com>
>>>wrote:
>>> > >
>>> > >> I agree with Robert. Looks like a bug in Flink.
>>> > >> Maybe an off-by-one issue (violating index is 32768 and the
>>>default
>>> > >>memory
>>> > >> segment size is 32KB).
>>> > >>
>>> > >> Which Flink version are you using?
>>> > >> In case you are using a custom build, can you share the commit ID
>>>(is
>>> > >> reported in the first lines of the JobManager log file)?
>>> > >>
>>> > >> Thanks, Fabian
>>> > >>
>>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <rmetz...@apache.org
>>> > >> <javascript:;>>:
>>> > >>
>>> > >> > Hi Ali,
>>> > >> >
>>> > >> > this could be a bug in Flink.
>>> > >> > Can you share the code of your program with us to debug the
>>>issue?
>>> > >> >
>>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali
>>><ali.kash...@emc.com
>>> > >> <javascript:;>> wrote:
>>> > >> >
>>> > >> > > Hello,
>>> > >> > >
>>> > >> > > I¹m getting this error while running a streaming module on a
>>> cluster
>>> > >> of 3
>>> > >> > > nodes:
>>> > >> > >
>>> > >> > >
>>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768
>>> > >> > >
>>> > >> > > at
>>> > >>
>>> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive
>>>>>Spa
>>> >
>>> 
>>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSp
>>>>>ann
>>> > >>ingRecordDeserializer.java:214)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive
>>>>>Spa
>>> >
>>> 
>>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAd
>>>>>apt
>>> > >>iveSpanningRecordDeserializer.java:219)
>>> > >> > >
>>> > >> > > at
>>> > >>org.apache.flink.types.StringValue.readString(StringValue.java:764)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ
>>>>>e(S
>>> > >>tringSerializer.java:68)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ
>>>>>e(S
>>> > >>tringSerializer.java:73)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ
>>>>>e(S
>>> > >>tringSerializer.java:28)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize
>>>>>(Po
>>> > >>joSerializer.java:499)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> > >>
>>> >
>>> 
>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.d
>>>e
>>> > >>serialize(StreamRecordSerializer.java:102)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> > >>
>>> >
>>> 
>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.d
>>>e
>>> > >>serialize(StreamRecordSerializer.java:29)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
>>>>>Reu
>>> > >>singDeserializationDelegate.java:57)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive
>>>>>Spa
>>> >
>>> 
>>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDe
>>>>>ser
>>> > >>ializer.java:110)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge
>>>>>tNe
>>> > >>xtRecord(StreamingAbstractRecordReader.java:80)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.nex
>>>>>t(S
>>> > >>treamingMutableRecordReader.java:36)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter
>>>>>ato
>>> > >>r.java:59)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(O
>>>>>neI
>>> > >>nputStreamTask.java:68)
>>> > >> > >
>>> > >> > > at
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> 
>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(One
>>>>>Inp
>>> > >>utStreamTask.java:101)
>>> > >> > >
>>> > >> > > at 
>>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> > >> > >
>>> > >> > > at java.lang.Thread.run(Thread.java:745)
>>> > >> > >
>>> > >> > >
>>> > >> > > Here¹s the configuration for each node:
>>> > >> > >
>>> > >> > >
>>> > >> > > jobmanager.heap.mb: 2048
>>> > >> > >
>>> > >> > > taskmanager.heap.mb: 4096
>>> > >> > >
>>> > >> > > taskmanager.numberOfTaskSlots: 5
>>> > >> > >
>>> > >> > >
>>> > >> > > I¹m not even sure where to start with this one so any help is
>>> > >> > appreciated.
>>> > >> > >
>>> > >> > >
>>> > >> > > Thanks,
>>> > >> > >
>>> > >> > > Ali
>>> > >> > >
>>> > >> >
>>> > >>
>>> >
>>> >
>>>
>>
>>

Reply via email to