[ 
https://issues.apache.org/jira/browse/TEZ-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15410357#comment-15410357
 ] 

Piyush Narang commented on TEZ-3369:
------------------------------------

Yeah so I guess regarding the static info, I'm thinking of returning (for a 
given dagId):
{code}
message DAGInformationProto {
  required string name = 1;
  required string dagId = 2;
  repeated VertexInformationProto vertices = 4;
}
message VertexInformationProto {
  required string name = 1;
  required string id = 2;
}
{code}

I think Cascading can look up / construct the vertex ID -> name mappings from 
this object. 

The dynamic stuff is needed not just while it is running but also after 
completion. I'm thinking of something on the lines of:
{code}
message TaskInformationProto {
  required string id = 1;
  required TaskStateProto state = 2;
  required double progress = 3;
  required int64 scheduledTime = 4;
  required int64 startTime = 5;
  required int64 endTime = 6;
  required string successfulAttemptId = 7;
  required string diagnostics = 8;
  repeated TezCounterGroupProto counters = 9;
  //collection of all attempts? 
}
{code}

Doesn't seem like Cascading needs the task attempt details as of now (apart 
from which attemptId was successful), but it does seem like an API that could 
be added in the future. 
To call this API:
getTaskInformation(dagId, vertexId, taskId) -> returns a TaskInfo object

It would be nice to retrieve progress / status details for a bunch of tasks. I 
think we'll need to have this supported by pagination (else the data might be 
too much in case of vertices that have a lot of tasks). I believe this should 
be possible if we use the vertexMap (LinkedHashMap) in 
[DagImpl|https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java#L1804].
 

Looked at the code in DAGClientImpl and I see what you mean by the [client 
redirecting|https://github.com/apache/tez/blob/master/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java#L233].
 Will need to update the new api implementation to use this. I think we 
hopefully should have the data available in the Timeline side, so it should 
mostly just be implementing on the RPC / AM side. 

Does this sound reasonable? I can put out the full proto of the RPCs if that'll 
help make things clear. 

> Fixing Tez's DAGClients to work with Cascading
> ----------------------------------------------
>
>                 Key: TEZ-3369
>                 URL: https://issues.apache.org/jira/browse/TEZ-3369
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Piyush Narang
>
> Hi,
> We seem to be running into issues when we try to use the newest version of 
> Tez (0.9.0-SNAPSHOT) with Cascading. The issue seems to be:
> {code}
> java.lang.ClassCastException: cascading.stats.tez.util.TezTimelineClient 
> cannot be cast to org.apache.tez.dag.api.client.DAGClient
>       at 
> cascading.stats.tez.util.TezStatsUtil.createTimelineClient(TezStatsUtil.java:142)
> {code}
> (Full stack trace at the end)
> Relevant Cascading code is:
> 1) [Cascading tries to create a TezTimelineClient and cast it to a DAGClient 
> | 
> https://github.com/Cascading/cascading/blob/3.1/cascading-hadoop2-tez-stats/src/main/java/cascading/stats/tez/util/TezStatsUtil.java#L142]
> 2) [TezTimelineClient extends from DAGClientTimelineImpl | 
> https://github.com/Cascading/cascading/blob/3.1/cascading-hadoop2-tez-stats/src/main/java/cascading/stats/tez/util/TezTimelineClient.java#L53]
> 3) [DAGClientTimelineImpl extends from DAGClientInternal | 
> https://github.com/apache/tez/blob/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java#L68]
> 4) [DAGClientInternal extends Closeable which is why things break | 
> https://github.com/apache/tez/blob/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java#L38].
> This behavior was 'broken' in this [commit | 
> https://github.com/apache/tez/commit/2af886b509015200e1c04527275474cbc771c667]
>  (release 0.8.3)
> The TezTimelineClient in Cascading seems to do two things:
> 1) DAGClient functionalities - ends up delegating to the inner DAGClient 
> object.
> 2) Retrieve stuff like vertexID, vertexChildren and vertexChild (from this 
> [interface|https://github.com/Cascading/cascading/blob/3.1/cascading-hadoop2-tez-stats/src/main/java/cascading/stats/tez/util/TimelineClient.java#L31]).
>  
> As there's no good way to get the vertexID / vertexChildren / vertexChild 
> (correct me if I'm wrong), they end up extending from the 
> DAGClientTimelineImpl which has the http client and json parsing code to 
> allow [things like this | 
> https://github.com/Cascading/cascading/blob/3.1/cascading-hadoop2-tez-stats/src/main/java/cascading/stats/tez/util/TezTimelineClient.java#L93]:
> {code}
> @Override
>   public String getVertexID( String vertexName ) throws IOException, 
> TezException
>     {
>     // the filter 'vertexName' is in the 'otherinfo' field, so it must be 
> requested, otherwise timeline server throws
>     // an NPE. to be safe, we include both fields in the result
>     String format = 
> "%s/%s?primaryFilter=%s:%s&secondaryFilter=vertexName:%s&fields=%s";
>     String url = String.format( format, baseUri, TEZ_VERTEX_ID, TEZ_DAG_ID, 
> dagId, vertexName, FILTER_BY_FIELDS );
>     JSONObject jsonRoot = getJsonRootEntity( url );
>     JSONArray entitiesNode = jsonRoot.optJSONArray( ENTITIES );
> ...
> {code}
> Some options I can think of:
> 1) Ideally these methods getVertexID / getVertexChildren / getVertexChild 
> would be part of DAGClient? Or even part of the DAGClientTimelineImpl? That 
> way the cascading code wouldn't need updating if the uri changed / json 
> format changed, it would end up being updated in these clients as well. I 
> suspect adding this to DAGClient would require more work as it'll also need 
> to be supported by the RPCClient and I don't think there are the relevant 
> protos and such available. 
> 2) A simpler fix would be to have DAGClientInternal extend DAGClient 
> (currently it just implements Closeable). This will not require any changes 
> on the Cascading side as DAGClientTimelineImpl will continue to be a 
> DAGClient. 
> Full stack trace:
> {code}
> Exception in thread "flow 
> com.twitter.data_platform.e2e_testing.jobs.parquet.E2ETestConvertThriftToParquet"
>  java.lang.ClassCastException: cascading.stats.tez.util.TezTimelineClient 
> cannot be cast to org.apache.tez.dag.api.client.DAGClient
>       at 
> cascading.stats.tez.util.TezStatsUtil.createTimelineClient(TezStatsUtil.java:142)
>       at 
> cascading.flow.tez.planner.Hadoop2TezFlowStepJob$1.getJobStatusClient(Hadoop2TezFlowStepJob.java:117)
>       at 
> cascading.flow.tez.planner.Hadoop2TezFlowStepJob$1.getJobStatusClient(Hadoop2TezFlowStepJob.java:105)
>       at 
> cascading.stats.tez.TezStepStats$1.getJobStatusClient(TezStepStats.java:60)
>       at 
> cascading.stats.tez.TezStepStats$1.getJobStatusClient(TezStepStats.java:56)
>       at cascading.stats.CounterCache.cachedCounters(CounterCache.java:229)
>       at cascading.stats.CounterCache.cachedCounters(CounterCache.java:187)
>       at cascading.stats.CounterCache.getCounterValue(CounterCache.java:167)
>       at 
> cascading.stats.BaseCachedStepStats.getCounterValue(BaseCachedStepStats.java:105)
>       at cascading.stats.FlowStats.getCounterValue(FlowStats.java:170)
>       at 
> cascading.flow.tez.Hadoop2TezFlow.getTotalSliceCPUMilliSeconds(Hadoop2TezFlow.java:303)
>       at cascading.flow.BaseFlow.run(BaseFlow.java:1287)
>       at cascading.flow.BaseFlow.access$100(BaseFlow.java:82)
>       at cascading.flow.BaseFlow$1.run(BaseFlow.java:928)
>       at java.lang.Thread.run(Thread.java:745)
> Exception in thread "main" java.lang.Throwable: If you know what exactly 
> caused this error, please consider contributing to GitHub via following link.
> https://github.com/twitter/scalding/wiki/Common-Exceptions-and-possible-reasons#javalangclasscastexception
>       at com.twitter.scalding.Tool$.main(Tool.scala:152)
>       at com.twitter.scalding.Tool.main(Tool.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
>       at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
> Caused by: java.lang.ClassCastException: 
> cascading.stats.tez.util.TezTimelineClient cannot be cast to 
> org.apache.tez.dag.api.client.DAGClient
>       at 
> cascading.stats.tez.util.TezStatsUtil.createTimelineClient(TezStatsUtil.java:142)
>       at 
> cascading.flow.tez.planner.Hadoop2TezFlowStepJob$1.getJobStatusClient(Hadoop2TezFlowStepJob.java:117)
>       at 
> cascading.flow.tez.planner.Hadoop2TezFlowStepJob$1.getJobStatusClient(Hadoop2TezFlowStepJob.java:105)
>       at 
> cascading.stats.tez.TezStepStats$1.getJobStatusClient(TezStepStats.java:60)
>       at 
> cascading.stats.tez.TezStepStats$1.getJobStatusClient(TezStepStats.java:56)
>       at cascading.stats.CounterCache.cachedCounters(CounterCache.java:229)
>       at cascading.stats.CounterCache.cachedCounters(CounterCache.java:187)
>       at cascading.stats.CounterCache.getCountersFor(CounterCache.java:155)
>       at 
> cascading.stats.BaseCachedStepStats.getCountersFor(BaseCachedStepStats.java:93)
>       at cascading.stats.FlowStats.getCountersFor(FlowStats.java:159)
>       at com.twitter.scalding.Stats$.getAllCustomCounters(Stats.scala:93)
>       at com.twitter.scalding.Job.handleStats(Job.scala:269)
>       at com.twitter.scalding.Job.run(Job.scala:298)
>       at com.twitter.scalding.Tool.start$1(Tool.scala:124)
>       at com.twitter.scalding.Tool.run(Tool.scala:140)
>       at com.twitter.scalding.Tool.run(Tool.scala:68)
>       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>       at com.twitter.scalding.Tool$.main(Tool.scala:148)
>       ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to