HI Aljoscha

I was able to get the ClusterClient and Accumulators using following:

DefaultCLI defaultCLI = new DefaultCLI();
CommandLine line = new DefaultParser().parse(new Options(), new
String[]{}, true);
ClusterClient clusterClient = defaultCLI.retrieveCluster(line,configuration);



Regards
Sumit Chawla


On Thu, Sep 22, 2016 at 4:55 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> there is ClusterClient.getAccumulators(JobID jobID) which should be able
> to
> get the accumulators for a running job. If you can construct a
> ClusterClient that should be a good solution.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Sep 2016 at 21:15 Chawla,Sumit <sumitkcha...@gmail.com> wrote:
>
> > Hi Sean
> >
> > My goal here is to get User Accumulators.  I know there exists the REST
> > Calls.  But since i am running my code in the same JVM, i wanted to avoid
> > go over HTTP.  I saw this code in JobAccumulatorsHandler and tried to use
> > this.  Would you suggest some alternative approach to avoid this over the
> > network serialization for Akka?
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > Between two different actor systems in the same JVM, messages are still
> > > serialized (they go through a local socket, I think).
> > >
> > > Getting the execution graph is not easily possible, and not intended,
> as
> > it
> > > actually contains RPC resources, etc.
> > >
> > > What do you need from the execution graph? Maybe there is another way
> to
> > > achieve that...
> > >
> > > On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> > > wrote:
> > >
> > > > Hi Chesney
> > > >
> > > > I am actually running this code in the same JVM as the WebInterface
> and
> > > > JobManager.  I am programmatically, starting the JobManager. and
> then
> > > > running this code in same JVM to query metrics.  Only difference
> could
> > be
> > > > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure
> > if
> > > it
> > > > forces it to execute the code as if request is coming over the
> wire.  I
> > > am
> > > > not very well aware of Akka internals, so may be somebody can shed
> some
> > > > light on it.
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <
> ches...@apache.org>
> > > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > this is a rather subtle issue you stumbled upon here.
> > > > >
> > > > > The ExecutionGraph is not serializable. The only reason why the
> > > > > WebInterface can access it is because it runs in the same JVM as
> the
> > > > > JobManager.
> > > > >
> > > > > I'm not sure if there is a way for what you are trying to do.
> > > > >
> > > > > Regards,
> > > > > Chesnay
> > > > >
> > > > >
> > > > > On 21.09.2016 06:11, Chawla,Sumit wrote:
> > > > >
> > > > >> Hi All
> > > > >>
> > > > >>
> > > > >> I am trying to get JOB  accumulators.  ( I am aware that I can get
> > the
> > > > >> accumulators through REST APIs as well, but i wanted to avoid JSON
> > > > >> parsing).
> > > > >>
> > > > >> Looking at JobAccumulatorsHandler i am trying to get execution
> graph
> > > for
> > > > >> currently running job.  Following is my code:
> > > > >>
> > > > >>    InetSocketAddress initialJobManagerAddress=new
> > > > >> InetSocketAddress(hostName,port);
> > > > >>              InetAddress ownHostname;
> > > > >>              ownHostname=
> > > > >> ConnectionUtils.findConnectingAddress(
> initialJobManagerAddress,2000,
> > > > 400);
> > > > >>
> > > > >>              ActorSystem actorSystem=
> AkkaUtils.createActorSystem(co
> > > > >> nfiguration,
> > > > >>                      new Some(new
> > > > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
> > > > >>
> > > > >>              FiniteDuration timeout= FiniteDuration.apply(10,
> > > > >> TimeUnit.SECONDS);
> > > > >>
> > > > >>              ActorGateway akkaActorGateway=
> > > > >> LeaderRetrievalUtils.retrieveLeaderGateway(
> > > > >>
> > > > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
> > > > >>                      actorSystem,timeout
> > > > >>              );
> > > > >>
> > > > >>
> > > > >>              Future<Object> future=akkaActorGateway.ask(new
> > > > >> RequestJobDetails(true,false),timeout);
> > > > >>
> > > > >>              MultipleJobsDetails result=(MultipleJobsDetails)
> > > > >> Await.result(future,timeout);
> > > > >>              ExecutionGraphHolder executionGraphHolder=new
> > > > >> ExecutionGraphHolder(timeout);
> > > > >>              LOG.info(result.toString());
> > > > >>              for(JobDetails detail:result.getRunningJobs()){
> > > > >>                  LOG.info(detail.getJobName() + "  ID " +
> > > > >> detail.getJobId());
> > > > >>
> > > > >> *                ExecutionGraph
> > > > >> executionGraph=executionGraphHolder.getExecutionGraph(detail.
> > > > getJobId(),
> > > > >> akkaActorGateway);*
> > > > >>
> > > > >>                 LOG.info("Accumulators " +
> > > > >> executionGraph.aggregateUserAccumulators());
> > > > >>              }
> > > > >>
> > > > >>
> > > > >> However, i am receiving following error in Flink:
> > > > >>
> > > > >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3]
> > > nobody
> > > > >> ERROR akka.remote.EndpointWriter - Transient association error
> > > > >> (association
> > > > >> remains live)
> > > > >> java.io.NotSerializableException: org.apache.flink.runtime.
> > > checkpoint.
> > > > >> CheckpointCoordinator
> > > > >>          at java.io.ObjectOutputStream.writeObject0(
> > > ObjectOutputStream.
> > > > >> java:1184)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > > > ObjectOutputSt
> > > > >> ream.java:1548)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeSerialData(
> > > > ObjectOutputStrea
> > > > >> m.java:1509)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > > > ObjectOutputS
> > > > >> tream.java:1432)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeObject0(
> > > ObjectOutputStream.
> > > > >> java:1178)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > > > ObjectOutputSt
> > > > >> ream.java:1548)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeSerialData(
> > > > ObjectOutputStrea
> > > > >> m.java:1509)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > > > ObjectOutputS
> > > > >> tream.java:1432)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeObject0(
> > > ObjectOutputStream.
> > > > >> java:1178)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeObject(
> > > ObjectOutputStream.
> > > > >> java:348)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > > >> toBinary$1.apply$mcV$sp(Serializer.scala:129)
> > > > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > > >> toBinary$1.apply(Serializer.scala:129)
> > ~[akka-actor_2.10-2.3.7.jar:?]
> > > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > > >> toBinary$1.apply(Serializer.scala:129)
> > ~[akka-actor_2.10-2.3.7.jar:?]
> > > > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> > > scala:
> > > > >> 57)
> > > > >> ~[scala-library-2.10.5.jar:?]
> > > > >>          at akka.serialization.JavaSerializer.toBinary(
> > > > Serializer.scala:
> > > > >> 129)
> > > > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > > > >>          at akka.remote.MessageSerializer$
> > > > .serialize(MessageSerializer.s
> > > > >> cala:36)
> > > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > > >>          at akka.remote.EndpointWriter$$
> anonfun$serializeMessage$1.
> > > > apply
> > > > >> (Endpoint.scala:845)
> > > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > > >>          at akka.remote.EndpointWriter$$
> anonfun$serializeMessage$1.
> > > > apply
> > > > >> (Endpoint.scala:845)
> > > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> > > scala:
> > > > >> 57)
> > > > >> ~[scala-library-2.10.5.jar:?]
> > > > >>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.
> > > scala:
> > > > >> 844)
> > > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > > >>
> > > > >> Any reason why its failing? This code works when invoked through
> > > > >> WebRuntimeMonitor.
> > > > >>
> > > > >> Regards
> > > > >> Sumit Chawla
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to