[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325823#comment-14325823 ]
Stephan Ewen commented on FLINK-1421: ------------------------------------- My guess is that the Samoa adapter (specifically the SamoaTypeSerializer) has not implemented the dynamic class loading. It looks like all classes are always sent through standard Java serialization. Durong deserialization, the java.io.ObjectInputStream needs to resolve the class it encounteres and it uses some class loader for that. It is important that this class loader is the usercode class loader. You can usually grab this through {{Thread.currentThread().getContextClassLoader()}}. BTW: I think that this is an extremely inefficient way of exchanging data. While feasible for a first prototype, this should be on the list to be improved. Here is the stack trace is a nicer format {code} org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.RuntimeException: org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.yahoo.labs.samoa.examples.HelloWorldContentEvent at org.apache.flink.streaming.api.streamvertex.OutputHandler.invokeUserFunction(OutputHandler.java:232) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:121) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.yahoo.labs.samoa.examples.HelloWorldContentEvent at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:165) at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:192) at com.yahoo.labs.flink.SamoaTypeSerializer.deserialize(SamoaTypeSerializer.java:84) at com.yahoo.labs.flink.SamoaTypeSerializer.deserialize(SamoaTypeSerializer.java:33) at org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:107) at org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:66) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:33) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.streaming.api.invokable.StreamInvokable.readNext(StreamInvokable.java:102) at com.yahoo.labs.flink.topology.impl.FlinkProcessingItem.invoke(FlinkProcessingItem.java:143) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invokeUserFunction(StreamVertex.java:85) at org.apache.flink.streaming.api.streamvertex.OutputHandler.invokeUserFunction(OutputHandler.java:229) ... 3 more Caused by: java.lang.ClassNotFoundException: com.yahoo.labs.samoa.examples.HelloWorldContentEvent at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:340) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:162) ... 17 more at org.apache.flink.client.program.Client.run(Client.java:345) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:49) at com.yahoo.labs.flink.FlinkDoTask.main(FlinkDoTask.java:88) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:250) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:374) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:347) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1115) {code} > Implement a SAMOA Adapter for Flink Streaming > --------------------------------------------- > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming > Reporter: Paris Carbone > Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)