Hi Greg, could you share an example program with us which reproduces the problem? I suspect that, somehow, your user code class BlockInfo is sent directly to the JobManager where it is deserialized without the user code class loader.
Cheers, Till On Tue, Mar 15, 2016 at 4:19 PM, Greg Hogan <c...@greghogan.com> wrote: > I am seeing a failure running my code starting with commit 0f8d76c6 > (ExecutionConfig to JobGraph). > > Logs and stack trace are below. > > Using default configuration so a single TaskManager. From the web UI, data > port is 33245 and path is akka.tcp:// > flink@192.168.14.134:41339/user/taskmanager. > > Placing a breakpoint in ReliableDeliverySupervisor.supervisorStrategy, > e.detailMessage is "java.lang.ClassNotFoundException: > generator.rmat.random.BlockInfo" and remoteAddress is "akka.tcp:// > flink@127.0.0.1:51428". > > I have an old version of BlockInfo which works without error: > > public class BlockInfo { > > public long seed; > > public long edges; > } > > The new version of BlockInfo leading to the error: > > public class BlockInfo<T extends RandomGenerator> { > > private final RandomGenerable<T> randomGenerable; > > private final int blockIndex; > > private final long firstElement; > > private final long elementCount; > > ... > } > > In this execution the RandomGenerable has only a single field, a long. I am > puzzled and unsure where to look next. > > Greg > > > > Client log: > > 2016-03-15 10:07:25,745 WARN > akka.remote.ReliableDeliverySupervisor - Association > with remote system [akka.tcp://flink@127.0.0.1:6123] has failed, address > is > now gated for [5000] ms. Reason is: [Disassociated]. > 2016-03-15 10:08:25,692 INFO > org.apache.flink.runtime.client.JobClientActor - Terminate > JobClientActor. > > > JobManager log: > > 2016-03-15 10:07:25,514 DEBUG > akka.serialization.Serialization(akka://flink) - Using > serializer[akka.serialization.JavaSerializer] for message > [akka.actor.ActorIdentity] > 2016-03-15 10:07:25,533 DEBUG > akka.serialization.Serialization(akka://flink) - Using > serializer[akka.serialization.JavaSerializer] for message > [java.lang.Integer] > 2016-03-15 10:07:25,547 DEBUG > org.apache.flink.runtime.blob.BlobServerConnection - Received > PUT request for content addressable BLOB > 2016-03-15 10:07:25,731 WARN > akka.remote.ReliableDeliverySupervisor - Association > with remote system [akka.tcp://flink@127.0.0.1:49738] has failed, address > is now gated for [5000] ms. Reason is: [generator.rmat.random.BlockInfo]. > > > Client stack trace: > > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Communication with JobManager failed: Job submission to > the JobManager timed out. > at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > at org.apache.flink.client.program.Client.runBlocking(Client.java:315) > at > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > at > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) > at Driver.main(Driver.java:462) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Communication with JobManager failed: Job submission to the JobManager > timed out. > at > > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141) > at org.apache.flink.client.program.Client.runBlocking(Client.java:379) > ... 16 more > Caused by: > org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: > Job submission to the JobManager timed out. > at > > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256) > at > > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88) > at > > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >