fyi The problem seems to be that samoa-api uses Kryo 2.17 and Flink 2.24.0. All flink-related tests pass if I upgrade samoa to 2.24.0. You can also ask at the samoa-incubating dev-list if that will be ok to change. Maybe it would be good to test the same version on storm, samza and s4 respectively to be sure.
Paris > On 28 Jan 2015, at 12:52, F. Beligianni <faybeligia...@gmail.com> wrote: > > Hello, > > I am currently working on the integration of Flink Streaming API to > SAMOA and I have some problems with an exception that I take from the kryo > serialiser: > > Caused by: java.lang.ArrayIndexOutOfBoundsException > at java.lang.System.arraycopy(Native Method) > at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:238) > at > org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read(SpillingAdaptiveSpanningRecordDeserializer.java:410) > at > org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:134) > at com.esotericsoftware.kryo.io.Input.require(Input.java:154) > at com.esotericsoftware.kryo.io.Input.readInt(Input.java:303) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:103) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:596) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:707) > at > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:195) > > > Specifically, I am working with Flink-0.9-SNAPSHOT and the exception is > received on the custom class "FlinkProcessingItem" which extends > "StreamInvokable" class, in "invoke" function when the readNext() function > of StreamInvokable is called. > > The object that is supposed to be received by "readNext" function is a > custom Tuple3 object, called SamoaType and defined like this: > "SamoaType extends Tuple3<String, ContentEvent, String>", where > ContentEvent is an interface of SAMOA. > > The type information of the custom SamoaType is added to the source in the > following way: "TypeExtractor.getForObject" > > The ContentEvent object that's sent between the two Invokables is of type > "InstanceContentEvent" which implements ContentEvent, which you can find in > the following link: > InstanceContentEvent > <https://github.com/yahoo/samoa/blob/master/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java> > . > > We managed to reproduce the exception in the following test program; > TestSerialization > <https://github.com/senorcarbone/samoa/commit/9eba049031aee85d1bef58dcdaf37110b9fe4505> > . > > > Lastly, I should mention that the same example runs in Storm, even though > Storm also uses kryo. > > Thank you, > Fay