Hi Greg,

could you share an example program with us which reproduces the problem? I
suspect that, somehow, your user code class BlockInfo is sent directly to
the JobManager where it is deserialized without the user code class loader.

Cheers,
Till
​

On Tue, Mar 15, 2016 at 4:19 PM, Greg Hogan <c...@greghogan.com> wrote:

> I am seeing a failure running my code starting with commit 0f8d76c6
> (ExecutionConfig to JobGraph).
>
> Logs and stack trace are below.
>
> Using default configuration so a single TaskManager. From the web UI, data
> port is 33245 and path is akka.tcp://
> flink@192.168.14.134:41339/user/taskmanager.
>
> Placing a breakpoint in ReliableDeliverySupervisor.supervisorStrategy,
> e.detailMessage is "java.lang.ClassNotFoundException:
> generator.rmat.random.BlockInfo" and remoteAddress is "akka.tcp://
> flink@127.0.0.1:51428".
>
> I have an old version of BlockInfo which works without error:
>
> public class BlockInfo {
>
>     public long seed;
>
>     public long edges;
> }
>
> The new version of BlockInfo leading to the error:
>
> public class BlockInfo<T extends RandomGenerator> {
>
>     private final RandomGenerable<T> randomGenerable;
>
>     private final int blockIndex;
>
>     private final long firstElement;
>
>     private final long elementCount;
>
>     ...
> }
>
> In this execution the RandomGenerable has only a single field, a long. I am
> puzzled and unsure where to look next.
>
> Greg
>
>
>
> Client log:
>
> 2016-03-15 10:07:25,745 WARN
> akka.remote.ReliableDeliverySupervisor                        - Association
> with remote system [akka.tcp://flink@127.0.0.1:6123] has failed, address
> is
> now gated for [5000] ms. Reason is: [Disassociated].
> 2016-03-15 10:08:25,692 INFO
> org.apache.flink.runtime.client.JobClientActor                - Terminate
> JobClientActor.
>
>
> JobManager log:
>
> 2016-03-15 10:07:25,514 DEBUG
> akka.serialization.Serialization(akka://flink)                - Using
> serializer[akka.serialization.JavaSerializer] for message
> [akka.actor.ActorIdentity]
> 2016-03-15 10:07:25,533 DEBUG
> akka.serialization.Serialization(akka://flink)                - Using
> serializer[akka.serialization.JavaSerializer] for message
> [java.lang.Integer]
> 2016-03-15 10:07:25,547 DEBUG
> org.apache.flink.runtime.blob.BlobServerConnection            - Received
> PUT request for content addressable BLOB
> 2016-03-15 10:07:25,731 WARN
> akka.remote.ReliableDeliverySupervisor                        - Association
> with remote system [akka.tcp://flink@127.0.0.1:49738] has failed, address
> is now gated for [5000] ms. Reason is: [generator.rmat.random.BlockInfo].
>
>
> Client stack trace:
>
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Communication with JobManager failed: Job submission to
> the JobManager timed out.
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>     at
>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>     at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>     at Driver.main(Driver.java:462)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>     at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>     at
>
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Communication with JobManager failed: Job submission to the JobManager
> timed out.
>     at
>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
>     at org.apache.flink.client.program.Client.runBlocking(Client.java:379)
>     ... 16 more
> Caused by:
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out.
>     at
>
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256)
>     at
>
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
>     at
>
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>     at
>
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>     at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>

Reply via email to