Hi, there is ClusterClient.getAccumulators(JobID jobID) which should be able to get the accumulators for a running job. If you can construct a ClusterClient that should be a good solution.
Cheers, Aljoscha On Wed, 21 Sep 2016 at 21:15 Chawla,Sumit <sumitkcha...@gmail.com> wrote: > Hi Sean > > My goal here is to get User Accumulators. I know there exists the REST > Calls. But since i am running my code in the same JVM, i wanted to avoid > go over HTTP. I saw this code in JobAccumulatorsHandler and tried to use > this. Would you suggest some alternative approach to avoid this over the > network serialization for Akka? > > Regards > Sumit Chawla > > > On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <se...@apache.org> wrote: > > > Between two different actor systems in the same JVM, messages are still > > serialized (they go through a local socket, I think). > > > > Getting the execution graph is not easily possible, and not intended, as > it > > actually contains RPC resources, etc. > > > > What do you need from the execution graph? Maybe there is another way to > > achieve that... > > > > On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <sumitkcha...@gmail.com> > > wrote: > > > > > Hi Chesney > > > > > > I am actually running this code in the same JVM as the WebInterface and > > > JobManager. I am programmatically, starting the JobManager. and then > > > running this code in same JVM to query metrics. Only difference could > be > > > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure > if > > it > > > forces it to execute the code as if request is coming over the wire. I > > am > > > not very well aware of Akka internals, so may be somebody can shed some > > > light on it. > > > > > > Regards > > > Sumit Chawla > > > > > > > > > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <ches...@apache.org> > > > wrote: > > > > > > > Hello, > > > > > > > > this is a rather subtle issue you stumbled upon here. > > > > > > > > The ExecutionGraph is not serializable. The only reason why the > > > > WebInterface can access it is because it runs in the same JVM as the > > > > JobManager. > > > > > > > > I'm not sure if there is a way for what you are trying to do. > > > > > > > > Regards, > > > > Chesnay > > > > > > > > > > > > On 21.09.2016 06:11, Chawla,Sumit wrote: > > > > > > > >> Hi All > > > >> > > > >> > > > >> I am trying to get JOB accumulators. ( I am aware that I can get > the > > > >> accumulators through REST APIs as well, but i wanted to avoid JSON > > > >> parsing). > > > >> > > > >> Looking at JobAccumulatorsHandler i am trying to get execution graph > > for > > > >> currently running job. Following is my code: > > > >> > > > >> InetSocketAddress initialJobManagerAddress=new > > > >> InetSocketAddress(hostName,port); > > > >> InetAddress ownHostname; > > > >> ownHostname= > > > >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000, > > > 400); > > > >> > > > >> ActorSystem actorSystem= AkkaUtils.createActorSystem(co > > > >> nfiguration, > > > >> new Some(new > > > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0))); > > > >> > > > >> FiniteDuration timeout= FiniteDuration.apply(10, > > > >> TimeUnit.SECONDS); > > > >> > > > >> ActorGateway akkaActorGateway= > > > >> LeaderRetrievalUtils.retrieveLeaderGateway( > > > >> > > > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration), > > > >> actorSystem,timeout > > > >> ); > > > >> > > > >> > > > >> Future<Object> future=akkaActorGateway.ask(new > > > >> RequestJobDetails(true,false),timeout); > > > >> > > > >> MultipleJobsDetails result=(MultipleJobsDetails) > > > >> Await.result(future,timeout); > > > >> ExecutionGraphHolder executionGraphHolder=new > > > >> ExecutionGraphHolder(timeout); > > > >> LOG.info(result.toString()); > > > >> for(JobDetails detail:result.getRunningJobs()){ > > > >> LOG.info(detail.getJobName() + " ID " + > > > >> detail.getJobId()); > > > >> > > > >> * ExecutionGraph > > > >> executionGraph=executionGraphHolder.getExecutionGraph(detail. > > > getJobId(), > > > >> akkaActorGateway);* > > > >> > > > >> LOG.info("Accumulators " + > > > >> executionGraph.aggregateUserAccumulators()); > > > >> } > > > >> > > > >> > > > >> However, i am receiving following error in Flink: > > > >> > > > >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] > > nobody > > > >> ERROR akka.remote.EndpointWriter - Transient association error > > > >> (association > > > >> remains live) > > > >> java.io.NotSerializableException: org.apache.flink.runtime. > > checkpoint. > > > >> CheckpointCoordinator > > > >> at java.io.ObjectOutputStream.writeObject0( > > ObjectOutputStream. > > > >> java:1184) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.defaultWriteFields( > > > ObjectOutputSt > > > >> ream.java:1548) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.writeSerialData( > > > ObjectOutputStrea > > > >> m.java:1509) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.writeOrdinaryObject( > > > ObjectOutputS > > > >> tream.java:1432) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.writeObject0( > > ObjectOutputStream. > > > >> java:1178) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.defaultWriteFields( > > > ObjectOutputSt > > > >> ream.java:1548) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.writeSerialData( > > > ObjectOutputStrea > > > >> m.java:1509) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.writeOrdinaryObject( > > > ObjectOutputS > > > >> tream.java:1432) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.writeObject0( > > ObjectOutputStream. > > > >> java:1178) > > > >> ~[?:1.8.0_92] > > > >> at java.io.ObjectOutputStream.writeObject( > > ObjectOutputStream. > > > >> java:348) > > > >> ~[?:1.8.0_92] > > > >> at akka.serialization.JavaSerializer$$anonfun$ > > > >> toBinary$1.apply$mcV$sp(Serializer.scala:129) > > > >> ~[akka-actor_2.10-2.3.7.jar:?] > > > >> at akka.serialization.JavaSerializer$$anonfun$ > > > >> toBinary$1.apply(Serializer.scala:129) > ~[akka-actor_2.10-2.3.7.jar:?] > > > >> at akka.serialization.JavaSerializer$$anonfun$ > > > >> toBinary$1.apply(Serializer.scala:129) > ~[akka-actor_2.10-2.3.7.jar:?] > > > >> at scala.util.DynamicVariable.withValue(DynamicVariable. > > scala: > > > >> 57) > > > >> ~[scala-library-2.10.5.jar:?] > > > >> at akka.serialization.JavaSerializer.toBinary( > > > Serializer.scala: > > > >> 129) > > > >> ~[akka-actor_2.10-2.3.7.jar:?] > > > >> at akka.remote.MessageSerializer$ > > > .serialize(MessageSerializer.s > > > >> cala:36) > > > >> ~[akka-remote_2.10-2.3.7.jar:?] > > > >> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1. > > > apply > > > >> (Endpoint.scala:845) > > > >> ~[akka-remote_2.10-2.3.7.jar:?] > > > >> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1. > > > apply > > > >> (Endpoint.scala:845) > > > >> ~[akka-remote_2.10-2.3.7.jar:?] > > > >> at scala.util.DynamicVariable.withValue(DynamicVariable. > > scala: > > > >> 57) > > > >> ~[scala-library-2.10.5.jar:?] > > > >> at akka.remote.EndpointWriter.serializeMessage(Endpoint. > > scala: > > > >> 844) > > > >> ~[akka-remote_2.10-2.3.7.jar:?] > > > >> > > > >> Any reason why its failing? This code works when invoked through > > > >> WebRuntimeMonitor. > > > >> > > > >> Regards > > > >> Sumit Chawla > > > >> > > > >> > > > > > > > > > >