Hi,
there is ClusterClient.getAccumulators(JobID jobID) which should be able to
get the accumulators for a running job. If you can construct a
ClusterClient that should be a good solution.

Cheers,
Aljoscha

On Wed, 21 Sep 2016 at 21:15 Chawla,Sumit <sumitkcha...@gmail.com> wrote:

> Hi Sean
>
> My goal here is to get User Accumulators.  I know there exists the REST
> Calls.  But since i am running my code in the same JVM, i wanted to avoid
> go over HTTP.  I saw this code in JobAccumulatorsHandler and tried to use
> this.  Would you suggest some alternative approach to avoid this over the
> network serialization for Akka?
>
> Regards
> Sumit Chawla
>
>
> On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <se...@apache.org> wrote:
>
> > Between two different actor systems in the same JVM, messages are still
> > serialized (they go through a local socket, I think).
> >
> > Getting the execution graph is not easily possible, and not intended, as
> it
> > actually contains RPC resources, etc.
> >
> > What do you need from the execution graph? Maybe there is another way to
> > achieve that...
> >
> > On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > wrote:
> >
> > > Hi Chesney
> > >
> > > I am actually running this code in the same JVM as the WebInterface and
> > > JobManager.  I am programmatically, starting the JobManager. and  then
> > > running this code in same JVM to query metrics.  Only difference could
> be
> > > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure
> if
> > it
> > > forces it to execute the code as if request is coming over the wire.  I
> > am
> > > not very well aware of Akka internals, so may be somebody can shed some
> > > light on it.
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <ches...@apache.org>
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > this is a rather subtle issue you stumbled upon here.
> > > >
> > > > The ExecutionGraph is not serializable. The only reason why the
> > > > WebInterface can access it is because it runs in the same JVM as the
> > > > JobManager.
> > > >
> > > > I'm not sure if there is a way for what you are trying to do.
> > > >
> > > > Regards,
> > > > Chesnay
> > > >
> > > >
> > > > On 21.09.2016 06:11, Chawla,Sumit wrote:
> > > >
> > > >> Hi All
> > > >>
> > > >>
> > > >> I am trying to get JOB  accumulators.  ( I am aware that I can get
> the
> > > >> accumulators through REST APIs as well, but i wanted to avoid JSON
> > > >> parsing).
> > > >>
> > > >> Looking at JobAccumulatorsHandler i am trying to get execution graph
> > for
> > > >> currently running job.  Following is my code:
> > > >>
> > > >>    InetSocketAddress initialJobManagerAddress=new
> > > >> InetSocketAddress(hostName,port);
> > > >>              InetAddress ownHostname;
> > > >>              ownHostname=
> > > >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,
> > > 400);
> > > >>
> > > >>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
> > > >> nfiguration,
> > > >>                      new Some(new
> > > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
> > > >>
> > > >>              FiniteDuration timeout= FiniteDuration.apply(10,
> > > >> TimeUnit.SECONDS);
> > > >>
> > > >>              ActorGateway akkaActorGateway=
> > > >> LeaderRetrievalUtils.retrieveLeaderGateway(
> > > >>
> > > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
> > > >>                      actorSystem,timeout
> > > >>              );
> > > >>
> > > >>
> > > >>              Future<Object> future=akkaActorGateway.ask(new
> > > >> RequestJobDetails(true,false),timeout);
> > > >>
> > > >>              MultipleJobsDetails result=(MultipleJobsDetails)
> > > >> Await.result(future,timeout);
> > > >>              ExecutionGraphHolder executionGraphHolder=new
> > > >> ExecutionGraphHolder(timeout);
> > > >>              LOG.info(result.toString());
> > > >>              for(JobDetails detail:result.getRunningJobs()){
> > > >>                  LOG.info(detail.getJobName() + "  ID " +
> > > >> detail.getJobId());
> > > >>
> > > >> *                ExecutionGraph
> > > >> executionGraph=executionGraphHolder.getExecutionGraph(detail.
> > > getJobId(),
> > > >> akkaActorGateway);*
> > > >>
> > > >>                 LOG.info("Accumulators " +
> > > >> executionGraph.aggregateUserAccumulators());
> > > >>              }
> > > >>
> > > >>
> > > >> However, i am receiving following error in Flink:
> > > >>
> > > >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3]
> > nobody
> > > >> ERROR akka.remote.EndpointWriter - Transient association error
> > > >> (association
> > > >> remains live)
> > > >> java.io.NotSerializableException: org.apache.flink.runtime.
> > checkpoint.
> > > >> CheckpointCoordinator
> > > >>          at java.io.ObjectOutputStream.writeObject0(
> > ObjectOutputStream.
> > > >> java:1184)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > > ObjectOutputSt
> > > >> ream.java:1548)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeSerialData(
> > > ObjectOutputStrea
> > > >> m.java:1509)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > > ObjectOutputS
> > > >> tream.java:1432)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeObject0(
> > ObjectOutputStream.
> > > >> java:1178)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > > ObjectOutputSt
> > > >> ream.java:1548)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeSerialData(
> > > ObjectOutputStrea
> > > >> m.java:1509)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > > ObjectOutputS
> > > >> tream.java:1432)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeObject0(
> > ObjectOutputStream.
> > > >> java:1178)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeObject(
> > ObjectOutputStream.
> > > >> java:348)
> > > >> ~[?:1.8.0_92]
> > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > >> toBinary$1.apply$mcV$sp(Serializer.scala:129)
> > > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > >> toBinary$1.apply(Serializer.scala:129)
> ~[akka-actor_2.10-2.3.7.jar:?]
> > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > >> toBinary$1.apply(Serializer.scala:129)
> ~[akka-actor_2.10-2.3.7.jar:?]
> > > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> > scala:
> > > >> 57)
> > > >> ~[scala-library-2.10.5.jar:?]
> > > >>          at akka.serialization.JavaSerializer.toBinary(
> > > Serializer.scala:
> > > >> 129)
> > > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > > >>          at akka.remote.MessageSerializer$
> > > .serialize(MessageSerializer.s
> > > >> cala:36)
> > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> > > apply
> > > >> (Endpoint.scala:845)
> > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> > > apply
> > > >> (Endpoint.scala:845)
> > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> > scala:
> > > >> 57)
> > > >> ~[scala-library-2.10.5.jar:?]
> > > >>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.
> > scala:
> > > >> 844)
> > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > >>
> > > >> Any reason why its failing? This code works when invoked through
> > > >> WebRuntimeMonitor.
> > > >>
> > > >> Regards
> > > >> Sumit Chawla
> > > >>
> > > >>
> > > >
> > >
> >
>

Reply via email to