Repository: tez Updated Branches: refs/heads/master 83a3c989b -> 774444312
TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/77444431 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/77444431 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/77444431 Branch: refs/heads/master Commit: 774444312f8ea586939cb85c140c94251162e731 Parents: 83a3c98 Author: Hitesh Shah <[email protected]> Authored: Tue Sep 29 11:45:36 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Sep 29 11:45:36 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../org/apache/tez/client/CallerContext.java | 171 +++++++++++++++++++ .../org/apache/tez/common/ATSConstants.java | 10 +- .../java/org/apache/tez/common/TezUtils.java | 2 +- .../main/java/org/apache/tez/dag/api/DAG.java | 20 +++ .../apache/tez/dag/api/DagTypeConverters.java | 28 ++- tez-api/src/main/proto/DAGApiRecords.proto | 8 + .../java/org/apache/tez/dag/api/TestDAG.java | 24 +++ .../org/apache/tez/common/TestTezUtils.java | 12 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 10 +- .../impl/HistoryEventJsonConversion.java | 17 ++ .../apache/tez/dag/history/utils/DAGUtils.java | 24 ++- .../tez/dag/history/utils/TestDAGUtils.java | 13 +- .../apache/tez/examples/OrderedWordCount.java | 1 + .../org/apache/tez/examples/TezExampleBase.java | 10 ++ .../tez/history/parser/datamodel/Constants.java | 5 + .../tez/history/parser/datamodel/DagInfo.java | 30 ++++ .../apache/tez/history/TestHistoryParser.java | 18 ++ .../ats/HistoryEventTimelineConversion.java | 19 ++- .../ats/TestHistoryEventTimelineConversion.java | 28 ++- .../examples/TestOrderedWordCount.java | 3 +- 21 files changed, 439 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f65cfda..d5e8141 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. TEZ-2859. TestMergeManager.testLocalDiskMergeMultipleTasks failing TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker. TEZ-2857. Fix flakey tests in TestDAGImpl. @@ -194,6 +195,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker. TEZ-2857. Fix flakey tests in TestDAGImpl. TEZ-2398. Flaky test: TestFaultTolerance @@ -464,6 +466,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. TEZ-2398. Flaky test: TestFaultTolerance TEZ-2808. Race condition between preemption and container assignment TEZ-2834. Make Tez preemption resilient to incorrect free resource reported http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/client/CallerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/CallerContext.java b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java new file mode 100644 index 0000000..ba68851 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client; + +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import com.google.common.base.Preconditions; + +@Public +@Unstable +public class CallerContext { + + /** + * Context in which Tez is being invoked. + * For example, HIVE or PIG. + */ + private String context; + + /** + * Type of the caller. Should ideally be used along with callerId to uniquely identify the caller. + * When used with YARN Timeline, this should map to the Timeline Entity Type. + * For example, HIVE_QUERY_ID. + */ + private String callerType; + + /** + * Caller ID. + * An ID to uniquely identify the caller within the callerType namespace + */ + private String callerId; + + /** + * Free-form text or a json-representation of relevant meta-data. + * This can be used to describe the work being done. For example, for Hive, + * this could be the Hive query text. + */ + private String blob; + + /** + * Private Constructor + */ + private CallerContext() { + } + + /** + * Instantiate the Caller Context + * @param context Context in which Tez is being invoked. For example, HIVE or PIG. + * @param callerId Caller ID. An ID to uniquely identifier the caller within the callerType + * namespace + * @param callerType Type of the caller. Should ideally be used along with callerId to uniquely + * identify the caller. When used with YARN Timeline, this should map to + * the Timeline Entity Type. For example, HIVE_QUERY_ID. + * @param blob Free-form text or a json-representation of relevant meta-data. + * This can be used to describe the work being done. For example, for Hive, + * this could be the Hive query text. + * @return CallerContext + */ + public static CallerContext create(String context, String callerId, + String callerType, @Nullable String blob) { + return new CallerContext(context, callerId, callerType, blob); + } + + /** + * Instantiate the Caller Context + * @param context Context in which Tez is being invoked. For example, HIVE or PIG. + * @param blob Free-form text or a json-representation of relevant meta-data. + * This can be used to describe the work being done. For example, for Hive, + * this could be the Hive query text. + * @return CallerContext + */ + @Private + public static CallerContext create(String context, @Nullable String blob) { + return new CallerContext(context, blob); + } + + + private CallerContext(String context, String callerId, String callerType, + @Nullable String blob) { + if (callerId != null || callerType != null) { + setCallerIdAndType(callerId, callerType); + } + setContext(context); + setBlob(blob); + } + + private CallerContext(String context, @Nullable String blob) { + setContext(context); + setBlob(blob); + } + + public String getCallerType() { + return callerType; + } + + public String getCallerId() { + return callerId; + } + + public String getBlob() { + return blob; + } + + public String getContext() { + return context; + } + + /** + * @param context Context in which Tez is being invoked. For example, HIVE or PIG. + */ + public CallerContext setContext(String context) { + Preconditions.checkArgument(context != null && !context.isEmpty(), + "Context cannot be null or empty"); + this.context = context; + return this; + } + + /** + * @param callerId Caller ID. An ID to uniquely identifier the caller within the callerType + * namespace + * @param callerType Type of the caller. Should ideally be used along with callerId to uniquely + * identify the caller. When used with YARN Timeline, this should map to + * the Timeline Entity Type. For example, HIVE_QUERY_ID. + */ + public CallerContext setCallerIdAndType(String callerId, String callerType) { + Preconditions.checkArgument(callerType != null && !callerType.isEmpty() + && callerId != null && !callerId.isEmpty(), + "Caller Id and Caller Type cannot be null or empty"); + this.callerType = callerType; + this.callerId = callerId; + return this; + } + + /** + * @param blob Free-form text or a json-representation of relevant meta-data. + * This can be used to describe the work being done. For example, for Hive, + * this could be the Hive query text. + */ + public CallerContext setBlob(@Nullable String blob) { + this.blob = blob; + return this; + } + + @Override + public String toString() { + return "context=" + context + + ", callerType=" + callerType + + ", callerId=" + callerId + + ", blob=" + blob; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index f786a4e..7204943 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -45,6 +45,8 @@ public class ATSConstants { public static final String NODE_ID = "nodeId"; public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress"; public static final String USER = "user"; + public static final String CALLER_CONTEXT_ID = "callerId"; + public static final String CALLER_CONTEXT_TYPE = "callerType"; /* Keys used in other info */ public static final String APP_SUBMIT_TIME = "appSubmitTime"; @@ -108,7 +110,7 @@ public class ATSConstants { "yarn.timeline-service.webapp.https.address"; /* History text related Keys */ - public static final String DESCRIPTION = "desc"; + public static final String DESC = "desc"; public static final String CONFIG = "config"; public static final String TEZ_VERSION = "tezVersion"; @@ -116,4 +118,10 @@ public class ATSConstants { public static final String REVISION = "revision"; public static final String BUILD_TIME = "buildTime"; + /* Caller Context Related Keys */ + public static final String CONTEXT = "context"; + public static final String CALLER_ID = "callerId"; + public static final String CALLER_TYPE = "callerType"; + public static final String DESCRIPTION = "description"; + } http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/common/TezUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java index 8c2f118..93d373b 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java @@ -162,7 +162,7 @@ public class TezUtils { JSONObject jsonObject = new JSONObject(); try { if (description != null && !description.isEmpty()) { - jsonObject.put(ATSConstants.DESCRIPTION, description); + jsonObject.put(ATSConstants.DESC, description); } if (conf != null) { JSONObject confJson = new JSONObject(); http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index ad656cd..68b6d52 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -33,9 +33,11 @@ import java.util.Stack; import org.apache.commons.collections4.BidiMap; import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tez.client.CallerContext; import org.apache.tez.common.JavaOptsChecker; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.slf4j.Logger; @@ -95,6 +97,7 @@ public class DAG { private DAGAccessControls dagAccessControls; Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap(); String dagInfo; + CallerContext callerContext; private Map<String,String> dagConf = new HashMap<String, String>(); private VertexExecutionContext defaultExecutionContext; @@ -170,12 +173,25 @@ public class DAG { * In the case of Hive, this could be the SQL query text. * @return {@link DAG} */ + @Deprecated public synchronized DAG setDAGInfo(String dagInfo) { Preconditions.checkNotNull(dagInfo); this.dagInfo = dagInfo; return this; } + + /** + * Set the Context in which Tez is being called. + * @param callerContext Caller Context + * @return {@link DAG} + */ + public synchronized DAG setCallerContext(CallerContext callerContext) { + Preconditions.checkNotNull(callerContext); + this.callerContext = callerContext; + return this; + } + /** * Create a group of vertices that share a common output. This can be used to implement * unions efficiently. @@ -730,6 +746,10 @@ public class DAG { DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); dagBuilder.setName(this.name); + + if (this.callerContext != null) { + dagBuilder.setCallerContext(DagTypeConverters.convertCallerContextToProto(callerContext)); + } if (this.dagInfo != null && !this.dagInfo.isEmpty()) { dagBuilder.setDagInfo(this.dagInfo); } http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 2823a86..5733da8 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezAppMasterStatus; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.CounterGroup; @@ -57,6 +58,7 @@ import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType; @@ -523,7 +525,7 @@ public class DagTypeConverters { PlanLocalResourcesProto.newBuilder(); for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) { PlanLocalResource plr = convertLocalResourceToPlanLocalResource( - entry.getKey(), entry.getValue()); + entry.getKey(), entry.getValue()); builder.addLocalResources(plr); } return builder.build(); @@ -837,4 +839,28 @@ public class DagTypeConverters { return pluginDescriptorBuilder.build(); } + public static CallerContextProto convertCallerContextToProto(CallerContext callerContext) { + CallerContextProto.Builder callerContextBuilder = CallerContextProto.newBuilder(); + callerContextBuilder.setContext(callerContext.getContext()); + if (callerContext.getCallerId() != null) { + callerContextBuilder.setCallerId(callerContext.getCallerId()); + } + if (callerContext.getCallerType() != null) { + callerContextBuilder.setCallerType(callerContext.getCallerType()); + } + if (callerContext.getBlob() != null) { + callerContextBuilder.setBlob(callerContext.getBlob()); + } + return callerContextBuilder.build(); + } + + public static CallerContext convertCallerContextFromProto(CallerContextProto proto) { + CallerContext callerContext = CallerContext.create(proto.getContext(), + (proto.hasBlob() ? proto.getBlob() : null)); + if (proto.hasCallerType() && proto.hasCallerId()) { + callerContext.setCallerIdAndType(proto.getCallerId(), proto.getCallerType()); + } + return callerContext; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/main/proto/DAGApiRecords.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index 193f7b8..d016d60 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -190,6 +190,13 @@ message ConfigurationProto { optional AMPluginDescriptorProto am_plugin_descriptor = 2; } +message CallerContextProto { + optional string context = 1; + optional string callerType = 2; + optional string callerId = 3; + optional string blob = 4; +} + message DAGPlan { required string name = 1; repeated VertexPlan vertex = 2; @@ -200,6 +207,7 @@ message DAGPlan { repeated PlanLocalResource local_resource = 7; optional string dag_info = 8; optional VertexExecutionContextProto default_execution_context = 9; + optional CallerContextProto caller_context = 10; } // DAG monitoring messages http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index 268267b..24c20b5 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.api; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.client.CallerContext; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; @@ -308,4 +309,27 @@ public class TestDAG { } } + @Test + public void testCallerContext() { + DAG dag = DAG.create("dag1"); + try { + CallerContext callerContext = CallerContext.create("ctxt", "", "", "desc"); + Assert.fail("Expected failure for invalid args"); + } catch (Exception e) { + // Expected + } + try { + CallerContext callerContext = CallerContext.create("", "desc"); + Assert.fail("Expected failure for invalid args"); + } catch (Exception e) { + // Expected + } + + CallerContext callerContext; + callerContext = CallerContext.create("ctxt", "a", "a", "desc"); + callerContext = CallerContext.create("ctxt", "desc"); + callerContext = CallerContext.create("ctxt", null); + + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java index d39c47f..c88fa67 100644 --- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java +++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java @@ -168,7 +168,7 @@ public class TestTezUtils { JSONObject jsonObject = new JSONObject(confToJson); - Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION)); + Assert.assertFalse(jsonObject.has(ATSConstants.DESC)); Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG)); JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG); @@ -178,8 +178,8 @@ public class TestTezUtils { confToJson = TezUtils.convertToHistoryText(desc, conf); jsonObject = new JSONObject(confToJson); - Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION)); - String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION); + Assert.assertTrue(jsonObject.has(ATSConstants.DESC)); + String descFromJson = jsonObject.getString(ATSConstants.DESC); Assert.assertEquals(desc, descFromJson); Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG)); @@ -201,7 +201,7 @@ public class TestTezUtils { JSONObject jsonObject = new JSONObject(confToJson); - Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION)); + Assert.assertFalse(jsonObject.has(ATSConstants.DESC)); Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG)); JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG); @@ -213,8 +213,8 @@ public class TestTezUtils { confToJson = TezUtils.convertToHistoryText(desc, conf); jsonObject = new JSONObject(confToJson); - Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION)); - String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION); + Assert.assertTrue(jsonObject.has(ATSConstants.DESC)); + String descFromJson = jsonObject.getString(ATSConstants.DESC); Assert.assertEquals(desc, descFromJson); Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG)); http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index e41d59c..e165397 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -63,6 +63,7 @@ import com.google.common.collect.Lists; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; +import org.apache.tez.client.CallerContext; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.SessionNotRunning; @@ -2303,7 +2304,14 @@ public class DAGAppMaster extends AbstractService { cumulativeAdditionalResources.putAll(lrDiff); } - LOG.info("Running DAG: " + dagPlan.getName()); + String callerContextStr = ""; + if (dagPlan.hasCallerContext()) { + CallerContext callerContext = DagTypeConverters.convertCallerContextFromProto( + dagPlan.getCallerContext()); + callerContextStr = ", callerContext=" + callerContext.toString(); + } + LOG.info("Running DAG: " + dagPlan.getName() + callerContextStr); + String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); System.err.println(timeStamp + " Running Dag: " + newDAG.getID()); System.out.println(timeStamp + " Running Dag: "+ newDAG.getID()); http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 649eb61..bf63045 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -483,6 +483,15 @@ public class HistoryEventJsonConversion { JSONObject primaryFilters = new JSONObject(); primaryFilters.put(ATSConstants.DAG_NAME, event.getDAGName()); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + primaryFilters.put(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } + jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters); // TODO decide whether this goes into different events, @@ -499,6 +508,14 @@ public class HistoryEventJsonConversion { JSONObject otherInfo = new JSONObject(); otherInfo.put(ATSConstants.DAG_PLAN, DAGUtils.generateSimpleJSONPlan(event.getDAGPlan())); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + otherInfo.put(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index 76e592e..781120c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -49,10 +49,13 @@ import org.apache.tez.dag.records.TezTaskID; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; +import com.google.common.base.Preconditions; + public class DAGUtils { public static final String DAG_NAME_KEY = "dagName"; public static final String DAG_INFO_KEY = "dagInfo"; + public static final String DAG_CONTEXT_KEY = "dagContext"; public static final String VERTICES_KEY = "vertices"; public static final String EDGES_KEY = "edges"; public static final String VERTEX_GROUPS_KEY = "vertexGroups"; @@ -165,15 +168,34 @@ public class DAGUtils { return object; } + static Map<String, String> createDagInfoMap(DAGPlan dagPlan) { + Preconditions.checkArgument(dagPlan.hasCallerContext()); + Map<String, String> dagInfo = new TreeMap<String, String>(); + dagInfo.put(ATSConstants.CONTEXT, dagPlan.getCallerContext().getContext()); + if (dagPlan.getCallerContext().hasCallerId()) { + dagInfo.put(ATSConstants.CALLER_ID, dagPlan.getCallerContext().getCallerId()); + } + if (dagPlan.getCallerContext().hasCallerType()) { + dagInfo.put(ATSConstants.CALLER_TYPE, dagPlan.getCallerContext().getCallerType()); + } + if (dagPlan.getCallerContext().hasBlob()) { + dagInfo.put(ATSConstants.DESCRIPTION, dagPlan.getCallerContext().getBlob()); + } + return dagInfo; + } + public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan) throws IOException { final String VERSION_KEY = "version"; - final int version = 1; + final int version = 2; Map<String,Object> dagMap = new LinkedHashMap<String, Object>(); dagMap.put(DAG_NAME_KEY, dagPlan.getName()); if (dagPlan.hasDagInfo()) { dagMap.put(DAG_INFO_KEY, dagPlan.getDagInfo()); } + if (dagPlan.hasCallerContext()) { + dagMap.put(DAG_CONTEXT_KEY, createDagInfoMap(dagPlan)); + } dagMap.put(VERSION_KEY, version); ArrayList<Object> verticesList = new ArrayList<Object>(); for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) { http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java index cb7e0c8..4d4577a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java @@ -28,6 +28,8 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.client.CallerContext; +import org.apache.tez.common.ATSConstants; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; @@ -53,6 +55,7 @@ import com.google.common.collect.Sets; public class TestDAGUtils { + @SuppressWarnings("deprecation") private DAGPlan createDAG() { // Create a plan with 3 vertices: A, B, C. Group(A,B)->C Configuration conf = new Configuration(false); @@ -71,6 +74,7 @@ public class TestDAGUtils { dummyTaskCount, dummyTaskResource); DAG dag = DAG.create("testDag"); + dag.setCallerContext(CallerContext.create("context1", "callerId1", "callerType1", "desc1")); dag.setDAGInfo("dagInfo"); String groupName1 = "uv12"; org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); @@ -113,10 +117,17 @@ public class TestDAGUtils { Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY)); Assert.assertEquals("testDag", atsMap.get(DAGUtils.DAG_NAME_KEY)); Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_INFO_KEY)); + Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_CONTEXT_KEY)); + Map<String, String> contextMap = (Map<String, String>)atsMap.get(DAGUtils.DAG_CONTEXT_KEY); + Assert.assertEquals("context1", contextMap.get(ATSConstants.CONTEXT)); + Assert.assertEquals("callerId1", contextMap.get(ATSConstants.CALLER_ID)); + Assert.assertEquals("callerType1", contextMap.get(ATSConstants.CALLER_TYPE)); + Assert.assertEquals("desc1", contextMap.get(ATSConstants.DESCRIPTION)); + Assert.assertEquals("dagInfo", atsMap.get(DAGUtils.DAG_INFO_KEY)); Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY)); Assert.assertTrue(atsMap.containsKey("version")); - Assert.assertEquals(1, atsMap.get("version")); + Assert.assertEquals(2, atsMap.get("version")); Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY)); Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY)); Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY)); http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java index 5e89fdc..fff7c1b 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java @@ -20,6 +20,7 @@ package org.apache.tez.examples; import java.io.IOException; +import org.apache.tez.client.CallerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index fb33612..c88c833 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -27,6 +27,8 @@ import java.util.Set; import com.google.common.collect.Sets; import org.apache.commons.cli.Options; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.CallerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -138,6 +140,14 @@ public abstract class TezExampleBase extends Configured implements Tool { public int runDag(DAG dag, boolean printCounters, Logger logger) throws TezException, InterruptedException, IOException { tezClientInternal.waitTillReady(); + + CallerContext callerContext = CallerContext.create("TezExamples", + "Tez Example DAG: " + dag.getName()); + ApplicationId appId = tezClientInternal.getAppMasterApplicationId(); + if (appId != null) { + callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication"); + } + DAGClient dagClient = tezClientInternal.submitDAG(dag); Set<StatusGetOpts> getOpts = Sets.newHashSet(); if (printCounters) { http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java index 3a24f15..dce79e2 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java @@ -50,10 +50,15 @@ public class Constants extends ATSConstants { public static final String INITIALIZER = "initializer"; public static final String USER_PAYLOAD_TEXT = "userPayloadAsText"; + public static final String DAG_CONTEXT = "dagContext"; + //constants for ATS data export public static final String DAG = "dag"; public static final String VERTICES = "vertices"; public static final String TASKS = "tasks"; public static final String TASK_ATTEMPTS = "task_attempts"; public static final String APPLICATION = "application"; + + + } http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java index 5ea94d6..5fb760c 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java @@ -33,6 +33,7 @@ import org.apache.commons.collections.bidimap.DualHashBidiMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringInterner; +import org.apache.tez.client.CallerContext; import org.apache.tez.dag.api.event.VertexState; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; @@ -65,6 +66,7 @@ public class DagInfo extends BaseInfo { private final String status; private final String diagnostics; private VersionInfo versionInfo; + private CallerContext callerContext; //VertexID --> VertexName & vice versa private final BidiMap vertexNameIDMapping; @@ -135,10 +137,34 @@ public class DagInfo extends BaseInfo { } private void parseDAGPlan(JSONObject dagPlan) throws JSONException { + int version = dagPlan.optInt(Constants.VERSION, 1); parseEdges(dagPlan.optJSONArray(Constants.EDGES)); JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES); parseBasicVertexInfo(verticesInfo); + + if (version > 1) { + parseDAGContext(dagPlan.optJSONObject(Constants.DAG_CONTEXT)); + } + } + + private void parseDAGContext(JSONObject callerContextInfo) { + if (callerContextInfo == null) { + LOG.info("No DAG Caller Context available"); + return; + } + String context = callerContextInfo.optString(Constants.CONTEXT); + String callerId = callerContextInfo.optString(Constants.CALLER_ID); + String callerType = callerContextInfo.optString(Constants.CALLER_TYPE); + String description = callerContextInfo.optString(Constants.DESCRIPTION); + + this.callerContext = CallerContext.create(context, description); + if (callerId != null && !callerId.isEmpty() && callerType != null && !callerType.isEmpty()) { + this.callerContext.setCallerIdAndType(callerId, callerType); + } else { + LOG.info("No DAG Caller Context Id and Type available"); + } + } private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException { @@ -329,6 +355,10 @@ public class DagInfo extends BaseInfo { return versionInfo; } + public final CallerContext getCallerContext() { + return callerContext; + } + public final String getName() { return name; } http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java index a1b0ba6..dedd9ef 100644 --- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java +++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java @@ -33,7 +33,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClient; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TaskCounter; @@ -645,6 +647,16 @@ public class TestHistoryParser { Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty())); TezClient tezClient = getTezClient(withTimeline); + + // Update Caller Context + CallerContext callerContext = CallerContext.create("TezExamples", "Tez WordCount Example Job"); + ApplicationId appId = tezClient.getAppMasterApplicationId(); + if (appId == null) { + appId = ApplicationId.newInstance(1001l, 1); + } + callerContext.setCallerIdAndType(appId.toString(), "TezApplication"); + dag.setCallerContext(callerContext); + DAGClient client = tezClient.submitDAG(dag); client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1); @@ -690,6 +702,12 @@ public class TestHistoryParser { assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime()); assertTrue(dagInfo.getTimeTaken() > 0); + assertNotNull(dagInfo.getCallerContext()); + assertEquals("TezExamples", dagInfo.getCallerContext().getContext()); + assertEquals("Tez WordCount Example Job", dagInfo.getCallerContext().getBlob()); + assertNotNull(dagInfo.getCallerContext().getCallerId()); + assertEquals("TezApplication", dagInfo.getCallerContext().getCallerType()); + //Verify all vertices for (VertexInfo vertexInfo : dagInfo.getVertices()) { verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0); http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 77b00c4..0d6cbcb 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -248,7 +248,7 @@ public class HistoryEventTimelineConversion { event.getApplicationAttemptId().getApplicationId().toString()); atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, - event.getContainerId().toString()); + event.getContainerId().toString()); atsEntity.setStartTime(event.getLaunchTime()); TimelineEvent launchEvt = new TimelineEvent(); @@ -391,6 +391,15 @@ public class HistoryEventTimelineConversion { atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, event.getDagID().getApplicationId().toString()); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } + try { atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); @@ -405,6 +414,14 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION); atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL + "_" + event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs()); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 4245be3..0ad1b43 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.tez.client.CallerContext; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.VersionInfo; import org.apache.tez.common.counters.TezCounters; @@ -43,6 +44,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; @@ -112,7 +114,13 @@ public class TestHistoryEventTimelineConversion { tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt()); tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt()); tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt()); - dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); + CallerContextProto.Builder callerContextProto = CallerContextProto.newBuilder(); + callerContextProto.setContext("ctxt"); + callerContextProto.setCallerId("Caller_ID"); + callerContextProto.setCallerType("Caller_Type"); + callerContextProto.setBlob("Desc_1"); + dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock") + .setCallerContext(callerContextProto).build(); containerId = ContainerId.newInstance(applicationAttemptId, 111); nodeId = NodeId.newInstance("node", 13435); } @@ -425,18 +433,24 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue()); - Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertEquals(5, timelineEntity.getPrimaryFilters().size()); Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains( dagPlan.getName())); Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_ID).contains( + dagPlan.getCallerContext().getCallerId())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_TYPE).contains( + dagPlan.getCallerContext().getCallerType())); + Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( applicationAttemptId.getApplicationId().toString())); Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - Assert.assertEquals(6, timelineEntity.getOtherInfo().size()); + Assert.assertEquals(8, timelineEntity.getOtherInfo().size()); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); Assert.assertEquals(applicationId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); @@ -451,6 +465,14 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(containerLogs, timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL + "_" + applicationAttemptId.getAttemptId())); + Assert.assertEquals( + timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_ID), + dagPlan.getCallerContext().getCallerId()); + Assert.assertEquals( + timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE), + dagPlan.getCallerContext().getCallerType()); + + } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/tez/blob/77444431/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java index eb56795..6966e8d 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java @@ -49,6 +49,7 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; import org.apache.tez.common.TezUtils; @@ -276,7 +277,7 @@ public class TestOrderedWordCount extends Configured implements Tool { vertices.add(finalReduceVertex); DAG dag = DAG.create("OrderedWordCount" + dagIndex); - dag.setDAGInfo("{ \"context\": \"Tez\", \"description\": \"TestOrderedWordCount Job\" }"); + dag.setCallerContext(CallerContext.create("Tez", "TestOrderedWordCount Job")); for (int i = 0; i < vertices.size(); ++i) { dag.addVertex(vertices.get(i)); }
