[ 
https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325823#comment-14325823
 ] 

Stephan Ewen commented on FLINK-1421:
-------------------------------------

My guess is that the Samoa adapter (specifically the SamoaTypeSerializer) has 
not implemented the dynamic class loading. It looks like all classes are always 
sent through standard Java serialization.

Durong deserialization, the java.io.ObjectInputStream needs to resolve the 
class it encounteres and it uses some class loader for that. It is important 
that this class loader is the usercode class loader. You can usually grab this 
through {{Thread.currentThread().getContextClassLoader()}}.

BTW: I think that this is an extremely inefficient way of exchanging data. 
While feasible for a first prototype, this should be on the list to be improved.

Here is the stack trace is a nicer format
{code}
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: java.lang.RuntimeException: 
org.apache.commons.lang.SerializationException: 
java.lang.ClassNotFoundException: 
com.yahoo.labs.samoa.examples.HelloWorldContentEvent 
  at 
org.apache.flink.streaming.api.streamvertex.OutputHandler.invokeUserFunction(OutputHandler.java:232)
  at 
org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:121)
 
  at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
 
  at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.commons.lang.SerializationException: 
java.lang.ClassNotFoundException: 
com.yahoo.labs.samoa.examples.HelloWorldContentEvent
 at 
org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:165)
 at 
org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:192)
 at 
com.yahoo.labs.flink.SamoaTypeSerializer.deserialize(SamoaTypeSerializer.java:84)
 at 
com.yahoo.labs.flink.SamoaTypeSerializer.deserialize(SamoaTypeSerializer.java:33)
 at 
org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:107)
 at 
org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:29)
 at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
 at 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
 at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:66)
 at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:33)
 at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
 at 
org.apache.flink.streaming.api.invokable.StreamInvokable.readNext(StreamInvokable.java:102)
 at 
com.yahoo.labs.flink.topology.impl.FlinkProcessingItem.invoke(FlinkProcessingItem.java:143)
 at 
org.apache.flink.streaming.api.streamvertex.StreamVertex.invokeUserFunction(StreamVertex.java:85)
 at 
org.apache.flink.streaming.api.streamvertex.OutputHandler.invokeUserFunction(OutputHandler.java:229)
 ... 3 more
Caused by: java.lang.ClassNotFoundException: 
com.yahoo.labs.samoa.examples.HelloWorldContentEvent
 at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:340)
 at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at 
org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:162)
 ... 17 more
 at org.apache.flink.client.program.Client.run(Client.java:345)
 at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
 at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:49)
 at com.yahoo.labs.flink.FlinkDoTask.main(FlinkDoTask.java:88)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:483)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
 at org.apache.flink.client.program.Client.run(Client.java:250)
 at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:374)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:347)
 at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1115)
{code}

> Implement a SAMOA Adapter for Flink Streaming
> ---------------------------------------------
>
>                 Key: FLINK-1421
>                 URL: https://issues.apache.org/jira/browse/FLINK-1421
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Yahoo's Samoa is an experimental incremental machine learning library that 
> builds on an abstract compositional data streaming model to write streaming 
> algorithms. The task is to provide an adapter from SAMOA topologies to 
> Flink-streaming job graphs in order to support Flink as a backend engine for 
> SAMOA tasks.
> A statup guide can be viewed here :
> https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub
> The main working branch of the adapter :
> https://github.com/senorcarbone/samoa/tree/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to