Hello,

this is a rather subtle issue you stumbled upon here.

The ExecutionGraph is not serializable. The only reason why the WebInterface can access it is because it runs in the same JVM as the JobManager.

I'm not sure if there is a way for what you are trying to do.

Regards,
Chesnay

On 21.09.2016 06:11, Chawla,Sumit wrote:
Hi All


I am trying to get JOB  accumulators.  ( I am aware that I can get the
accumulators through REST APIs as well, but i wanted to avoid JSON
parsing).

Looking at JobAccumulatorsHandler i am trying to get execution graph for
currently running job.  Following is my code:

   InetSocketAddress initialJobManagerAddress=new
InetSocketAddress(hostName,port);
             InetAddress ownHostname;
             ownHostname=
ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);

             ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration,
                     new Some(new
Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));

             FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS);

             ActorGateway akkaActorGateway=
LeaderRetrievalUtils.retrieveLeaderGateway(

LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
                     actorSystem,timeout
             );


             Future<Object> future=akkaActorGateway.ask(new
RequestJobDetails(true,false),timeout);

             MultipleJobsDetails result=(MultipleJobsDetails)
Await.result(future,timeout);
             ExecutionGraphHolder executionGraphHolder=new
ExecutionGraphHolder(timeout);
             LOG.info(result.toString());
             for(JobDetails detail:result.getRunningJobs()){
                 LOG.info(detail.getJobName() + "  ID " + detail.getJobId());

*                ExecutionGraph
executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);*
                LOG.info("Accumulators " +
executionGraph.aggregateUserAccumulators());
             }


However, i am receiving following error in Flink:

2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
ERROR akka.remote.EndpointWriter - Transient association error (association
remains live)
java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
CheckpointCoordinator
         at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[?:1.8.0_92]
         at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
         at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
         at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
         at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
         at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
         at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
         at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
         at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
         at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_92]
         at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
         at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
         at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
         at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
~[akka-actor_2.10-2.3.7.jar:?]
         at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
~[akka-remote_2.10-2.3.7.jar:?]
         at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
         at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
         at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
~[akka-remote_2.10-2.3.7.jar:?]

Any reason why its failing? This code works when invoked through
WebRuntimeMonitor.

Regards
Sumit Chawla


Reply via email to