Repository: tez
Updated Branches:
refs/heads/branch-0.6 9aaea3a0c -> aa4806544
TEZ-2851. Support a way for upstream applications to pass in a caller context
to Tez. (hitesh)
(cherry picked from commit 774444312f8ea586939cb85c140c94251162e731)
Conflicts:
CHANGES.txt
tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
tez-api/src/main/proto/DAGApiRecords.proto
tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
(cherry picked from commit 1709e46cec0d954dfbea86208a7fd9bcd56b4523)
Conflicts:
CHANGES.txt
tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aa480654
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aa480654
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aa480654
Branch: refs/heads/branch-0.6
Commit: aa4806544cf8111575329c7a23eb8d8a8e4b5bd3
Parents: 9aaea3a
Author: Hitesh Shah <[email protected]>
Authored: Tue Sep 29 11:45:36 2015 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Tue Sep 29 13:16:19 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/client/CallerContext.java | 171 +++++++++++++++++++
.../org/apache/tez/common/ATSConstants.java | 10 +-
.../java/org/apache/tez/common/TezUtils.java | 2 +-
.../main/java/org/apache/tez/dag/api/DAG.java | 21 ++-
.../apache/tez/dag/api/DagTypeConverters.java | 28 ++-
tez-api/src/main/proto/DAGApiRecords.proto | 8 +
.../java/org/apache/tez/dag/api/TestDAG.java | 25 +++
.../org/apache/tez/common/TestTezUtils.java | 12 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 10 +-
.../impl/HistoryEventJsonConversion.java | 17 ++
.../apache/tez/dag/history/utils/DAGUtils.java | 24 ++-
.../tez/dag/history/utils/TestDAGUtils.java | 13 +-
.../apache/tez/examples/OrderedWordCount.java | 1 +
.../org/apache/tez/examples/TezExampleBase.java | 11 ++
.../ats/HistoryEventTimelineConversion.java | 19 ++-
.../ats/TestHistoryEventTimelineConversion.java | 29 +++-
.../examples/TestOrderedWordCount.java | 3 +-
18 files changed, 386 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea7ace5..e0b76ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2851. Support a way for upstream applications to pass in a caller
context to Tez.
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2808. Race condition between preemption and container assignment
TEZ-2203. Intern strings in tez counters
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
new file mode 100644
index 0000000..ba68851
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import com.google.common.base.Preconditions;
+
+@Public
+@Unstable
+public class CallerContext {
+
+ /**
+ * Context in which Tez is being invoked.
+ * For example, HIVE or PIG.
+ */
+ private String context;
+
+ /**
+ * Type of the caller. Should ideally be used along with callerId to
uniquely identify the caller.
+ * When used with YARN Timeline, this should map to the Timeline Entity Type.
+ * For example, HIVE_QUERY_ID.
+ */
+ private String callerType;
+
+ /**
+ * Caller ID.
+ * An ID to uniquely identify the caller within the callerType namespace
+ */
+ private String callerId;
+
+ /**
+ * Free-form text or a json-representation of relevant meta-data.
+ * This can be used to describe the work being done. For example, for Hive,
+ * this could be the Hive query text.
+ */
+ private String blob;
+
+ /**
+ * Private Constructor
+ */
+ private CallerContext() {
+ }
+
+ /**
+ * Instantiate the Caller Context
+ * @param context Context in which Tez is being invoked. For example, HIVE
or PIG.
+ * @param callerId Caller ID. An ID to uniquely identifier the caller within
the callerType
+ * namespace
+ * @param callerType Type of the caller. Should ideally be used along with
callerId to uniquely
+ * identify the caller. When used with YARN Timeline, this
should map to
+ * the Timeline Entity Type. For example, HIVE_QUERY_ID.
+ * @param blob Free-form text or a json-representation of relevant meta-data.
+ * This can be used to describe the work being done. For
example, for Hive,
+ * this could be the Hive query text.
+ * @return CallerContext
+ */
+ public static CallerContext create(String context, String callerId,
+ String callerType, @Nullable String blob) {
+ return new CallerContext(context, callerId, callerType, blob);
+ }
+
+ /**
+ * Instantiate the Caller Context
+ * @param context Context in which Tez is being invoked. For example, HIVE
or PIG.
+ * @param blob Free-form text or a json-representation of relevant meta-data.
+ * This can be used to describe the work being done. For
example, for Hive,
+ * this could be the Hive query text.
+ * @return CallerContext
+ */
+ @Private
+ public static CallerContext create(String context, @Nullable String blob) {
+ return new CallerContext(context, blob);
+ }
+
+
+ private CallerContext(String context, String callerId, String callerType,
+ @Nullable String blob) {
+ if (callerId != null || callerType != null) {
+ setCallerIdAndType(callerId, callerType);
+ }
+ setContext(context);
+ setBlob(blob);
+ }
+
+ private CallerContext(String context, @Nullable String blob) {
+ setContext(context);
+ setBlob(blob);
+ }
+
+ public String getCallerType() {
+ return callerType;
+ }
+
+ public String getCallerId() {
+ return callerId;
+ }
+
+ public String getBlob() {
+ return blob;
+ }
+
+ public String getContext() {
+ return context;
+ }
+
+ /**
+ * @param context Context in which Tez is being invoked. For example, HIVE
or PIG.
+ */
+ public CallerContext setContext(String context) {
+ Preconditions.checkArgument(context != null && !context.isEmpty(),
+ "Context cannot be null or empty");
+ this.context = context;
+ return this;
+ }
+
+ /**
+ * @param callerId Caller ID. An ID to uniquely identifier the caller within
the callerType
+ * namespace
+ * @param callerType Type of the caller. Should ideally be used along with
callerId to uniquely
+ * identify the caller. When used with YARN Timeline, this
should map to
+ * the Timeline Entity Type. For example, HIVE_QUERY_ID.
+ */
+ public CallerContext setCallerIdAndType(String callerId, String callerType) {
+ Preconditions.checkArgument(callerType != null && !callerType.isEmpty()
+ && callerId != null && !callerId.isEmpty(),
+ "Caller Id and Caller Type cannot be null or empty");
+ this.callerType = callerType;
+ this.callerId = callerId;
+ return this;
+ }
+
+ /**
+ * @param blob Free-form text or a json-representation of relevant meta-data.
+ * This can be used to describe the work being done. For
example, for Hive,
+ * this could be the Hive query text.
+ */
+ public CallerContext setBlob(@Nullable String blob) {
+ this.blob = blob;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "context=" + context
+ + ", callerType=" + callerType
+ + ", callerId=" + callerId
+ + ", blob=" + blob;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index fd82e20..85a669f 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -45,6 +45,8 @@ public class ATSConstants {
public static final String NODE_ID = "nodeId";
public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress";
public static final String USER = "user";
+ public static final String CALLER_CONTEXT_ID = "callerId";
+ public static final String CALLER_CONTEXT_TYPE = "callerType";
/* Keys used in other info */
public static final String APP_SUBMIT_TIME = "appSubmitTime";
@@ -103,7 +105,7 @@ public class ATSConstants {
"yarn.timeline-service.webapp.https.address";
/* History text related Keys */
- public static final String DESCRIPTION = "desc";
+ public static final String DESC = "desc";
public static final String CONFIG = "config";
public static final String TEZ_VERSION = "tezVersion";
@@ -111,4 +113,10 @@ public class ATSConstants {
public static final String REVISION = "revision";
public static final String BUILD_TIME = "buildTime";
+ /* Caller Context Related Keys */
+ public static final String CONTEXT = "context";
+ public static final String CALLER_ID = "callerId";
+ public static final String CALLER_TYPE = "callerType";
+ public static final String DESCRIPTION = "description";
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index 1cbfbe0..41f3aa1 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -162,7 +162,7 @@ public class TezUtils {
JSONObject jsonObject = new JSONObject();
try {
if (description != null && !description.isEmpty()) {
- jsonObject.put(ATSConstants.DESCRIPTION, description);
+ jsonObject.put(ATSConstants.DESC, description);
}
if (conf != null) {
JSONObject confJson = new JSONObject();
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index db2cb39..9398fc7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -34,6 +34,7 @@ import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tez.client.CallerContext;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
@@ -60,9 +61,7 @@ import
org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -92,6 +91,7 @@ public class DAG {
private DAGAccessControls dagAccessControls;
Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
String dagInfo;
+ CallerContext callerContext;
private Stack<String> topologicalVertexStack = new Stack<String>();
@@ -165,12 +165,25 @@ public class DAG {
* In the case of Hive, this could be the SQL
query text.
* @return {@link DAG}
*/
+ @Deprecated
public synchronized DAG setDAGInfo(String dagInfo) {
Preconditions.checkNotNull(dagInfo);
this.dagInfo = dagInfo;
return this;
}
+
+ /**
+ * Set the Context in which Tez is being called.
+ * @param callerContext Caller Context
+ * @return {@link DAG}
+ */
+ public synchronized DAG setCallerContext(CallerContext callerContext) {
+ Preconditions.checkNotNull(callerContext);
+ this.callerContext = callerContext;
+ return this;
+ }
+
/**
* Create a group of vertices that share a common output. This can be used
to implement
* unions efficiently.
@@ -701,6 +714,10 @@ public class DAG {
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
dagBuilder.setName(this.name);
+
+ if (this.callerContext != null) {
+
dagBuilder.setCallerContext(DagTypeConverters.convertCallerContextToProto(callerContext));
+ }
if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
dagBuilder.setDagInfo(this.dagInfo);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 6c3fd0d..1a80b3e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.CounterGroup;
@@ -55,6 +56,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.StatusGetOpts;
import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezSessionStatusProto;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -476,7 +478,7 @@ public class DagTypeConverters {
PlanLocalResourcesProto.newBuilder();
for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
PlanLocalResource plr = convertLocalResourceToPlanLocalResource(
- entry.getKey(), entry.getValue());
+ entry.getKey(), entry.getValue());
builder.addLocalResources(plr);
}
return builder.build();
@@ -686,4 +688,28 @@ public class DagTypeConverters {
return payload.getPayload();
}
+ public static CallerContextProto convertCallerContextToProto(CallerContext
callerContext) {
+ CallerContextProto.Builder callerContextBuilder =
CallerContextProto.newBuilder();
+ callerContextBuilder.setContext(callerContext.getContext());
+ if (callerContext.getCallerId() != null) {
+ callerContextBuilder.setCallerId(callerContext.getCallerId());
+ }
+ if (callerContext.getCallerType() != null) {
+ callerContextBuilder.setCallerType(callerContext.getCallerType());
+ }
+ if (callerContext.getBlob() != null) {
+ callerContextBuilder.setBlob(callerContext.getBlob());
+ }
+ return callerContextBuilder.build();
+ }
+
+ public static CallerContext convertCallerContextFromProto(CallerContextProto
proto) {
+ CallerContext callerContext = CallerContext.create(proto.getContext(),
+ (proto.hasBlob() ? proto.getBlob() : null));
+ if (proto.hasCallerType() && proto.hasCallerId()) {
+ callerContext.setCallerIdAndType(proto.getCallerId(),
proto.getCallerType());
+ }
+ return callerContext;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto
b/tez-api/src/main/proto/DAGApiRecords.proto
index 0539405..b71e804 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -156,6 +156,13 @@ message ConfigurationProto {
repeated PlanKeyValuePair confKeyValues = 1;
}
+message CallerContextProto {
+ optional string context = 1;
+ optional string callerType = 2;
+ optional string callerId = 3;
+ optional string blob = 4;
+}
+
message DAGPlan {
required string name = 1;
repeated VertexPlan vertex = 2;
@@ -165,6 +172,7 @@ message DAGPlan {
repeated PlanVertexGroupInfo vertex_groups = 6;
repeated PlanLocalResource local_resource = 7;
optional string dag_info = 8;
+ optional CallerContextProto caller_context = 10;
}
// DAG monitoring messages
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 31ced71..92c4db4 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.api;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.CallerContext;
import org.junit.Assert;
import org.junit.Test;
@@ -166,4 +167,28 @@ public class TestDAG {
Assert.assertEquals("Duplicated output:output_1, vertexName=v1",
e.getMessage());
}
}
+
+ @Test
+ public void testCallerContext() {
+ DAG dag = DAG.create("dag1");
+ try {
+ CallerContext callerContext = CallerContext.create("ctxt", "", "",
"desc");
+ Assert.fail("Expected failure for invalid args");
+ } catch (Exception e) {
+ // Expected
+ }
+ try {
+ CallerContext callerContext = CallerContext.create("", "desc");
+ Assert.fail("Expected failure for invalid args");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ CallerContext callerContext;
+ callerContext = CallerContext.create("ctxt", "a", "a", "desc");
+ callerContext = CallerContext.create("ctxt", "desc");
+ callerContext = CallerContext.create("ctxt", null);
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index d39c47f..c88fa67 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -168,7 +168,7 @@ public class TestTezUtils {
JSONObject jsonObject = new JSONObject(confToJson);
- Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION));
+ Assert.assertFalse(jsonObject.has(ATSConstants.DESC));
Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG);
@@ -178,8 +178,8 @@ public class TestTezUtils {
confToJson = TezUtils.convertToHistoryText(desc, conf);
jsonObject = new JSONObject(confToJson);
- Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION));
- String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION);
+ Assert.assertTrue(jsonObject.has(ATSConstants.DESC));
+ String descFromJson = jsonObject.getString(ATSConstants.DESC);
Assert.assertEquals(desc, descFromJson);
Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
@@ -201,7 +201,7 @@ public class TestTezUtils {
JSONObject jsonObject = new JSONObject(confToJson);
- Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION));
+ Assert.assertFalse(jsonObject.has(ATSConstants.DESC));
Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG);
@@ -213,8 +213,8 @@ public class TestTezUtils {
confToJson = TezUtils.convertToHistoryText(desc, conf);
jsonObject = new JSONObject(confToJson);
- Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION));
- String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION);
+ Assert.assertTrue(jsonObject.has(ATSConstants.DESC));
+ String descFromJson = jsonObject.getString(ATSConstants.DESC);
Assert.assertEquals(desc, descFromJson);
Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index cb77d6e..b4323ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -59,6 +59,7 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -2071,7 +2072,14 @@ public class DAGAppMaster extends AbstractService {
cumulativeAdditionalResources.putAll(lrDiff);
}
- LOG.info("Running DAG: " + dagPlan.getName());
+ String callerContextStr = "";
+ if (dagPlan.hasCallerContext()) {
+ CallerContext callerContext =
DagTypeConverters.convertCallerContextFromProto(
+ dagPlan.getCallerContext());
+ callerContextStr = ", callerContext=" + callerContext.toString();
+ }
+ LOG.info("Running DAG: " + dagPlan.getName() + callerContextStr);
+
String timeStamp = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").format(Calendar.getInstance().getTime());
System.err.println(timeStamp + " Running Dag: "+ newDAG.getID());
System.out.println(timeStamp + " Running Dag: "+ newDAG.getID());
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 8d7344f..22687f1 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -484,6 +484,15 @@ public class HistoryEventJsonConversion {
JSONObject primaryFilters = new JSONObject();
primaryFilters.put(ATSConstants.DAG_NAME,
event.getDAGName());
+ if (event.getDAGPlan().hasCallerContext()
+ && event.getDAGPlan().getCallerContext().hasCallerId()
+ && event.getDAGPlan().getCallerContext().hasCallerType()) {
+ primaryFilters.put(ATSConstants.CALLER_CONTEXT_ID,
+ event.getDAGPlan().getCallerContext().getCallerId());
+ primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE,
+ event.getDAGPlan().getCallerContext().getCallerType());
+ }
+
jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
// TODO decide whether this goes into different events,
@@ -500,6 +509,14 @@ public class HistoryEventJsonConversion {
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.DAG_PLAN,
DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
+ if (event.getDAGPlan().hasCallerContext()
+ && event.getDAGPlan().getCallerContext().hasCallerId()
+ && event.getDAGPlan().getCallerContext().hasCallerType()) {
+ otherInfo.put(ATSConstants.CALLER_CONTEXT_ID,
+ event.getDAGPlan().getCallerContext().getCallerId());
+ otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
+ event.getDAGPlan().getCallerContext().getCallerType());
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 339c65c..ea43d48 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -47,10 +47,13 @@ import org.apache.tez.dag.records.TezVertexID;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
+import com.google.common.base.Preconditions;
+
public class DAGUtils {
public static final String DAG_NAME_KEY = "dagName";
public static final String DAG_INFO_KEY = "dagInfo";
+ public static final String DAG_CONTEXT_KEY = "dagContext";
public static final String VERTICES_KEY = "vertices";
public static final String EDGES_KEY = "edges";
public static final String VERTEX_GROUPS_KEY = "vertexGroups";
@@ -142,15 +145,34 @@ public class DAGUtils {
return object;
}
+ static Map<String, String> createDagInfoMap(DAGPlan dagPlan) {
+ Preconditions.checkArgument(dagPlan.hasCallerContext());
+ Map<String, String> dagInfo = new TreeMap<String, String>();
+ dagInfo.put(ATSConstants.CONTEXT, dagPlan.getCallerContext().getContext());
+ if (dagPlan.getCallerContext().hasCallerId()) {
+ dagInfo.put(ATSConstants.CALLER_ID,
dagPlan.getCallerContext().getCallerId());
+ }
+ if (dagPlan.getCallerContext().hasCallerType()) {
+ dagInfo.put(ATSConstants.CALLER_TYPE,
dagPlan.getCallerContext().getCallerType());
+ }
+ if (dagPlan.getCallerContext().hasBlob()) {
+ dagInfo.put(ATSConstants.DESCRIPTION,
dagPlan.getCallerContext().getBlob());
+ }
+ return dagInfo;
+ }
+
public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan)
throws IOException {
final String VERSION_KEY = "version";
- final int version = 1;
+ final int version = 2;
Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
dagMap.put(DAG_NAME_KEY, dagPlan.getName());
if (dagPlan.hasDagInfo()) {
dagMap.put(DAG_INFO_KEY, dagPlan.getDagInfo());
}
+ if (dagPlan.hasCallerContext()) {
+ dagMap.put(DAG_CONTEXT_KEY, createDagInfoMap(dagPlan));
+ }
dagMap.put(VERSION_KEY, version);
ArrayList<Object> verticesList = new ArrayList<Object>();
for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index cb7e0c8..4d4577a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -28,6 +28,8 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.CallerContext;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -53,6 +55,7 @@ import com.google.common.collect.Sets;
public class TestDAGUtils {
+ @SuppressWarnings("deprecation")
private DAGPlan createDAG() {
// Create a plan with 3 vertices: A, B, C. Group(A,B)->C
Configuration conf = new Configuration(false);
@@ -71,6 +74,7 @@ public class TestDAGUtils {
dummyTaskCount, dummyTaskResource);
DAG dag = DAG.create("testDag");
+ dag.setCallerContext(CallerContext.create("context1", "callerId1",
"callerType1", "desc1"));
dag.setDAGInfo("dagInfo");
String groupName1 = "uv12";
org.apache.tez.dag.api.VertexGroup uv12 =
dag.createVertexGroup(groupName1, v1, v2);
@@ -113,10 +117,17 @@ public class TestDAGUtils {
Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
Assert.assertEquals("testDag", atsMap.get(DAGUtils.DAG_NAME_KEY));
Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_INFO_KEY));
+ Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_CONTEXT_KEY));
+ Map<String, String> contextMap = (Map<String,
String>)atsMap.get(DAGUtils.DAG_CONTEXT_KEY);
+ Assert.assertEquals("context1", contextMap.get(ATSConstants.CONTEXT));
+ Assert.assertEquals("callerId1", contextMap.get(ATSConstants.CALLER_ID));
+ Assert.assertEquals("callerType1",
contextMap.get(ATSConstants.CALLER_TYPE));
+ Assert.assertEquals("desc1", contextMap.get(ATSConstants.DESCRIPTION));
+
Assert.assertEquals("dagInfo", atsMap.get(DAGUtils.DAG_INFO_KEY));
Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
Assert.assertTrue(atsMap.containsKey("version"));
- Assert.assertEquals(1, atsMap.get("version"));
+ Assert.assertEquals(2, atsMap.get("version"));
Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY));
Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY));
Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY));
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git
a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index 9cf21d3..e9e1e6b 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git
a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 4bad009..1c02cde 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -24,6 +24,9 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.CallerContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -81,6 +84,14 @@ public abstract class TezExampleBase extends Configured
implements Tool {
public int runDag(DAG dag, boolean printCounters, Log logger) throws
TezException,
InterruptedException, IOException {
tezClientInternal.waitTillReady();
+
+ CallerContext callerContext = CallerContext.create("TezExamples",
+ "Tez Example DAG: " + dag.getName());
+ ApplicationId appId = tezClientInternal.getAppMasterApplicationId();
+ if (appId != null) {
+ callerContext.setCallerIdAndType(appId.toString(),
"TezExampleApplication");
+ }
+
DAGClient dagClient = tezClientInternal.submitDAG(dag);
Set<StatusGetOpts> getOpts = Sets.newHashSet();
if (printCounters) {
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git
a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 4c53337..e1b4d7e 100644
---
a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -246,7 +246,7 @@ public class HistoryEventTimelineConversion {
event.getApplicationAttemptId().getApplicationId().toString());
atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID,
- event.getContainerId().toString());
+ event.getContainerId().toString());
atsEntity.setStartTime(event.getLaunchTime());
TimelineEvent launchEvt = new TimelineEvent();
@@ -389,6 +389,15 @@ public class HistoryEventTimelineConversion {
atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID,
event.getDagID().getApplicationId().toString());
+ if (event.getDAGPlan().hasCallerContext()
+ && event.getDAGPlan().getCallerContext().hasCallerId()
+ && event.getDAGPlan().getCallerContext().hasCallerType()) {
+ atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID,
+ event.getDAGPlan().getCallerContext().getCallerId());
+ atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_TYPE,
+ event.getDAGPlan().getCallerContext().getCallerType());
+ }
+
try {
atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
@@ -402,6 +411,14 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.USER, event.getUser());
atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL + "_"
+ event.getApplicationAttemptId().getAttemptId(),
event.getContainerLogs());
+ if (event.getDAGPlan().hasCallerContext()
+ && event.getDAGPlan().getCallerContext().hasCallerId()
+ && event.getDAGPlan().getCallerContext().hasCallerType()) {
+ atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_ID,
+ event.getDAGPlan().getCallerContext().getCallerId());
+ atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE,
+ event.getDAGPlan().getCallerContext().getCallerType());
+ }
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git
a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 436d103..e1835b3 100644
---
a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -30,12 +30,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
@@ -101,7 +103,13 @@ public class TestHistoryEventTimelineConversion {
tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID,
random.nextInt());
- dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+ CallerContextProto.Builder callerContextProto =
CallerContextProto.newBuilder();
+ callerContextProto.setContext("ctxt");
+ callerContextProto.setCallerId("Caller_ID");
+ callerContextProto.setCallerType("Caller_Type");
+ callerContextProto.setBlob("Desc_1");
+ dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock")
+ .setCallerContext(callerContextProto).build();
containerId = ContainerId.newInstance(applicationAttemptId, 111);
nodeId = NodeId.newInstance("node", 13435);
}
@@ -412,18 +420,25 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
- Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+ Assert.assertEquals(5, timelineEntity.getPrimaryFilters().size());
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(
dagPlan.getName()));
Assert.assertTrue(
+
timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_ID).contains(
+ dagPlan.getCallerContext().getCallerId()));
+ Assert.assertTrue(
+
timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_TYPE).contains(
+ dagPlan.getCallerContext().getCallerType()));
+ Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
applicationAttemptId.getApplicationId().toString()));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
- Assert.assertEquals(5, timelineEntity.getOtherInfo().size());
+ Assert.assertEquals(7, timelineEntity.getOtherInfo().size());
+
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
Assert.assertEquals(applicationId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
@@ -436,6 +451,14 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(containerLogs,
timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL +
"_"
+ applicationAttemptId.getAttemptId()));
+ Assert.assertEquals(
+ timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_ID),
+ dagPlan.getCallerContext().getCallerId());
+ Assert.assertEquals(
+ timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE),
+ dagPlan.getCallerContext().getCallerType());
+
+
}
@Test(timeout = 5000)
http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 3efc5d8..e857a1c 100644
---
a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
@@ -275,7 +276,7 @@ public class TestOrderedWordCount extends Configured
implements Tool {
vertices.add(finalReduceVertex);
DAG dag = DAG.create("OrderedWordCount" + dagIndex);
- dag.setDAGInfo("{ \"context\": \"Tez\", \"description\":
\"TestOrderedWordCount Job\" }");
+ dag.setCallerContext(CallerContext.create("Tez", "TestOrderedWordCount
Job"));
for (int i = 0; i < vertices.size(); ++i) {
dag.addVertex(vertices.get(i));
}