To give a bit of context for the exception: To execute a program, the classes of the user functions need to be available the executing TaskManagers.
- If you execute locally from the IDE, all classes are in the classpath anyways. - If you use the remote environment, you need to attach the jar file to environment. - In your case (repl), you need to make sure that the generated classes are given to the TaskManager. In that sense, the approach is probably similar to the case of executing with a remote environment - only that you do not have a jar file up front, but need to generate it on the fly. As Robert mentioned, https://github.com/apache/flink/pull/35 may have a first solution to that. Other approaches are also possible, like simply always bundling all classes in the directory where the repl puts its generated classes. Greetings, Stephan On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > I will look into it once I have some time (end of this week, or next > week probably) > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <rmetz...@apache.org> > wrote: > > Hey Nikolaas, > > > > Thank you for posting on the mailing list. I've met Nikolaas today in > > person and we were talking a bit about an interactive shell for Flink, > > potentially also an integration with Zeppelin. > > > > Great stuff I'm really looking forward to :) > > > > We were wondering if somebody from the list has some experience with the > > scala shell. > > I've pointed Nikolaas also to this PR: > > https://github.com/apache/flink/pull/35. > > > > Best, > > Robert > > > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <nikolaas.steenber...@gmail.com > > > > wrote: > > > >> Hi! > >> I am trying to implement a scala shell for flink. > >> > >> I've started with a simple scala object who's main function will drop > the > >> user to the interactive scala shell (repl) at one point: > >> > >> > >> > >> > >> import scala.tools.nsc.interpreter.ILoop > >> import scala.tools.nsc.Settings > >> > >> object Job { > >> def main(args: Array[String]) { > >> > >> val repl = new ILoop() > >> repl.settings = new Settings() > >> > >> // enable this line to use scala in intellij > >> repl.settings.usejavacp.value = true > >> > >> repl.createInterpreter() > >> > >> // start scala interpreter shell > >> repl.process(repl.settings) > >> > >> repl.closeInterpreter() > >> } > >> } > >> > >> > >> > >> > >> Now I am trying to execute the word count example as in: > >> > >> > >> > >> > >> scala> import org.apache.flink.api.scala._ > >> > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment > >> > >> scala> val text = env.fromElements("To be, or not to be,--that is the > >> question:--","Whether 'tis nobler in the mind to suffer", "The slings > and > >> arrows of outrageous fortune","Or to take arms against a sea of > troubles,") > >> > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { > (_, > >> 1) }.groupBy(0).sum(1) > >> > >> scala> counts.print() > >> > >> scala> env.execute("Flink Scala Api Skeleton") > >> > >> > >> > >> > >> > >> > >> However I am running into following error: > >> > >> env.execute("Flink Scala Api Skeleton") > >> org.apache.flink.runtime.client.JobExecutionException: > >> java.lang.RuntimeException: The initialization of the DataSource's > outputs > >> caused an error: The type serializer factory could not load its > parameters > >> from the configuration due to missing classes. > >> at > >> > >> > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) > >> at > >> > >> > org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) > >> at > >> > >> > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > >> at > >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:606) > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) > >> Caused by: java.lang.RuntimeException: The type serializer factory could > >> not load its parameters from the configuration due to missing classes. > >> at > >> > >> > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) > >> at > >> > >> > org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) > >> at > >> > >> > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) > >> at > >> > >> > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) > >> at > >> > >> > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) > >> at > >> > >> > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) > >> ... 8 more > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > >> at java.security.AccessController.doPrivileged(Native Method) > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > >> at java.lang.Class.forName0(Native Method) > >> at java.lang.Class.forName(Class.java:274) > >> at > >> > >> > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) > >> at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > >> at > >> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > >> at > >> > >> > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) > >> at > >> > >> > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) > >> at > >> > >> > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) > >> at > >> > >> > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) > >> ... 13 more > >> > >> at > >> > >> > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > >> at > >> > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > >> at > >> > >> > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > >> at > >> > >> > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) > >> at .<init>(<console>:12) > >> at .<clinit>(<console>) > >> at .<init>(<console>:7) > >> at .<clinit>(<console>) > >> at $print(<console>) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > >> at > >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:606) > >> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > >> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > >> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > >> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > >> at > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > >> at > >> > >> > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > >> at > >> > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > >> at > >> > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > >> at > >> > >> > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > >> at org.myorg.quickstart.Job$.main(Job.scala:37) > >> at org.myorg.quickstart.Job.main(Job.scala) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > >> at > >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:606) > >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > >> > >> > >> > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion or > can > >> point me in some direction? > >> > >> thanks, > >> Nikolaas > >> >