Repository: tez
Updated Branches:
refs/heads/branch-0.8 169546703 -> 152f1420b
TEZ-3359. Add granular log levels for HistoryLoggingService. (Harish Jaiprakash
via hitesh)
(cherry picked from commit cbc0c63761ecb81f4805cd8318cd347f0bc7674e)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/152f1420
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/152f1420
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/152f1420
Branch: refs/heads/branch-0.8
Commit: 152f1420be64739ef7ff3cb6755d742aa5c63f56
Parents: 1695467
Author: Hitesh Shah <[email protected]>
Authored: Tue Jul 26 10:44:08 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Tue Jul 26 10:45:10 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/client/TezClient.java | 14 +-
.../main/java/org/apache/tez/dag/api/DAG.java | 42 +++-
.../org/apache/tez/dag/api/HistoryLogLevel.java | 63 ++++++
.../apache/tez/dag/api/TezConfiguration.java | 14 +-
.../java/org/apache/tez/dag/api/TestDAG.java | 76 +++++++
.../apache/tez/dag/api/TestHistoryLogLevel.java | 63 ++++++
.../tez/dag/history/HistoryEventHandler.java | 45 +++-
.../tez/dag/history/HistoryEventType.java | 59 +++---
.../dag/history/TestHistoryEventHandler.java | 204 +++++++++++++++++++
.../dag/history/ats/acls/TestATSHistoryV15.java | 98 +++++----
11 files changed, 610 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9071576..71d31ac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3359. Add granular log levels for HistoryLoggingService.
TEZ-3374. Change TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP conf key
name.
TEZ-3358. Support using the same TimelineGroupId for multiple DAGs.
TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index f359a26..df39c0a 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -62,6 +62,7 @@ import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.HistoryLogLevel;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotReady;
import org.apache.tez.dag.api.SessionNotRunning;
@@ -362,7 +363,18 @@ public class TezClient {
"Credentials cannot be set after the session App Master has been
started");
amConfig.setCredentials(credentials);
}
-
+
+ /**
+ * Sets the history log level for this session. It will be in effect for
DAGs submitted after this
+ * call.
+ *
+ * @param historyLogLevel The log level to be used.
+ */
+ public synchronized void setHistoryLogLevel(HistoryLogLevel historyLogLevel)
{
+
amConfig.getTezConfiguration().setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL,
+ historyLogLevel);
+ }
+
@Private
@VisibleForTesting
public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager
myAclPolicyManager) {
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 0eb51e1..65321a8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -375,6 +375,18 @@ public class DAG {
}
/**
+ * Set history log level for this DAG. This config overrides the default or
one set at the session
+ * level.
+ *
+ * @param historyLogLevel The ATS history log level for this DAG.
+ *
+ * @return this DAG
+ */
+ public DAG setHistoryLogLevel(HistoryLogLevel historyLogLevel) {
+ return this.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL,
historyLogLevel.name());
+ }
+
+ /**
* Sets the default execution context for the DAG. This can be overridden at
a per Vertex level.
* See {@link
org.apache.tez.dag.api.Vertex#setExecutionContext(VertexExecutionContext)}
*
@@ -1005,7 +1017,6 @@ public class DAG {
dagBuilder.addEdge(edgeBuilder);
}
-
ConfigurationProto.Builder confProtoBuilder =
ConfigurationProto.newBuilder();
if (dagAccessControls != null) {
@@ -1030,7 +1041,7 @@ public class DAG {
confProtoBuilder.addConfKeyValues(kvp);
}
}
- if (this.dagConf != null && !this.dagConf.isEmpty()) {
+ if (!this.dagConf.isEmpty()) {
for (Entry<String, String> entry : this.dagConf.entrySet()) {
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
kvp.setKey(entry.getKey());
@@ -1038,13 +1049,38 @@ public class DAG {
confProtoBuilder.addConfKeyValues(kvp);
}
}
+ // Copy historyLogLevel from tezConf into dagConf if its not overridden in
dagConf.
+ String logLevel =
this.dagConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
+ if (logLevel != null) {
+ // The config is from dagConf, we have already added it to the proto
above, just check if
+ // the value is valid.
+ if (!HistoryLogLevel.validateLogLevel(logLevel)) {
+ throw new IllegalArgumentException(
+ "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
+ " is set to invalid value: " + logLevel);
+ }
+ } else {
+ // Validate and set value from tezConf.
+ logLevel = tezConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
+ if (logLevel != null) {
+ if (!HistoryLogLevel.validateLogLevel(logLevel)) {
+ throw new IllegalArgumentException(
+ "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
+ " is set to invalid value: " + logLevel);
+ }
+ PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+ kvp.setKey(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
+ kvp.setValue(logLevel);
+ confProtoBuilder.addConfKeyValues(kvp);
+ }
+ }
dagBuilder.setDagConf(confProtoBuilder);
if (dagCredentials != null) {
dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials));
TezCommonUtils.logCredentials(LOG, dagCredentials, "dag");
}
-
+
return dagBuilder.build();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
new file mode 100644
index 0000000..5eb4785
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.util.Locale;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The log level for the history, this is used to determine which events are
sent to the history
+ * logger. The default level is ALL.
+ */
+@Public
+public enum HistoryLogLevel {
+ NONE,
+ AM,
+ DAG,
+ VERTEX,
+ TASK,
+ ALL;
+
+ public static final HistoryLogLevel DEFAULT = ALL;
+
+ public boolean shouldLog(HistoryLogLevel eventLevel) {
+ return eventLevel.ordinal() <= ordinal();
+ }
+
+ public static HistoryLogLevel getLogLevel(Configuration conf,
HistoryLogLevel defaultValue) {
+ String logLevel =
conf.getTrimmed(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
+ if (logLevel == null) {
+ return defaultValue;
+ }
+ return valueOf(logLevel.toUpperCase(Locale.ENGLISH));
+ }
+
+ public static boolean validateLogLevel(String logLevel) {
+ if (logLevel != null) {
+ try {
+ valueOf(logLevel.toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 005304a..15e937f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1229,9 +1229,19 @@ public class TezConfiguration extends Configuration {
"org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService";
/**
+ * Enum value. Config to limit the type of events published to the history
logging service.
+ * The valid log levels are defined in the enum {@link HistoryLogLevel}. The
default value is
+ * defined in {@link HistoryLogLevel#DEFAULT}.
+ */
+ @ConfigurationScope(Scope.DAG)
+ @ConfigurationProperty
+ public static final String TEZ_HISTORY_LOGGING_LOGLEVEL =
+ TEZ_PREFIX + "history.logging.log.level";
+
+ /**
* Comma separated list of Integers. These are the values that were set for
the config value
- * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older
values are required so that
- * the groupIds generated previously will continue to be generated by the
plugin. If an older
+ * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older
values are required so
+ * that the groupIds generated previously will continue to be generated by
the plugin. If an older
* value is not present then the UI may not show information for DAGs which
were created
* with a different grouping value.
*
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index ae5dfbb..05c4e30 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -30,7 +30,9 @@ import org.apache.tez.client.CallerContext;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan.Builder;
import org.junit.Assert;
import org.junit.Test;
@@ -361,4 +363,78 @@ public class TestDAG {
Assert.assertEquals(dagPlan, firstPlan);
}
}
+
+ @Test
+ public void testCreateDAGForHistoryLogLevel() {
+ Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
+ LocalResource.newInstance(
+ URL.newInstance("file", "localhost", 0, "/test1"),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.PUBLIC, 1, 1));
+ Vertex v1 = Vertex.create("v1",
ProcessorDescriptor.create("dummyProcessor1"), 1,
+ Resource.newInstance(1, 1));
+ Vertex v2 = Vertex.create("v2",
ProcessorDescriptor.create("dummyProcessor2"), 1,
+ Resource.newInstance(1, 1));
+ DAG dag =
DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);
+
+ TezConfiguration tezConf = new TezConfiguration();
+
+ // Expect null when history log level is not set in both dag and tezConf
+ DAGPlan dagPlan = dag.createDag(tezConf, null, null, null, false);
+ Builder builder = DAGPlan.newBuilder(dagPlan);
+ Assert.assertNull(findKVP(builder.getDagConf(),
TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL));
+
+ // Set tezConf but not dag, expect value in tezConf.
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "TASK");
+ dagPlan = dag.createDag(tezConf, null, null, null, false);
+ Assert.assertEquals("TASK",
findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(),
+ TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL));
+
+ // Set invalid value in tezConf, expect exception.
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "invalid");
+ try {
+ dagPlan = dag.createDag(tezConf, null, null, null, false);
+ Assert.fail("Expected illegal argument exception");
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals("Config: " +
TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
+ " is set to invalid value: invalid", e.getMessage());
+ }
+
+ // Set value in dag, should override tez conf value.
+ dag.setHistoryLogLevel(HistoryLogLevel.VERTEX);
+ dagPlan = dag.createDag(tezConf, null, null, null, false);
+ Assert.assertEquals("VERTEX",
findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(),
+ TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL));
+
+ // Set value directly into dagConf.
+ dag.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL,
HistoryLogLevel.DAG.name());
+ dagPlan = dag.createDag(tezConf, null, null, null, false);
+ Assert.assertEquals("DAG",
findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(),
+ TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL));
+
+ // Set value invalid directly into dagConf and expect exception.
+ dag.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "invalid");
+ try {
+ dagPlan = dag.createDag(tezConf, null, null, null, false);
+ Assert.fail("Expected illegal argument exception");
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals("Config: " +
TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
+ " is set to invalid value: invalid", e.getMessage());
+ }
+ }
+
+ private String findKVP(ConfigurationProto conf, String key) {
+ String foundValue = null;
+ for (int i = 0; i < conf.getConfKeyValuesCount(); ++i) {
+ if (conf.getConfKeyValues(i).getKey().equals(key)) {
+ if (foundValue == null) {
+ foundValue = conf.getConfKeyValues(i).getValue();
+ } else {
+ Assert.fail("Multiple values found: " + foundValue + ", " +
+ conf.getConfKeyValues(i).getValue());
+ }
+ }
+ }
+ return foundValue;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java
b/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java
new file mode 100644
index 0000000..76c944d
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestHistoryLogLevel {
+
+ @Test
+ public void testGetLogLevel() {
+ assertNull(HistoryLogLevel.getLogLevel(getConfiguration(null), null));
+ assertEquals(HistoryLogLevel.DEFAULT,
+ HistoryLogLevel.getLogLevel(getConfiguration(null),
HistoryLogLevel.DEFAULT));
+ assertEquals(HistoryLogLevel.NONE,
+ HistoryLogLevel.getLogLevel(getConfiguration("NONE"),
HistoryLogLevel.DEFAULT));
+ assertEquals(HistoryLogLevel.NONE,
+ HistoryLogLevel.getLogLevel(getConfiguration("none"),
HistoryLogLevel.DEFAULT));
+ try {
+ HistoryLogLevel.getLogLevel(getConfiguration("invalid"),
HistoryLogLevel.DEFAULT);
+ fail("Expected IllegalArugment Exception");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ @Test
+ public void testValidateLogLevel() {
+ assertTrue(HistoryLogLevel.validateLogLevel(null));
+ assertTrue(HistoryLogLevel.validateLogLevel("NONE"));
+ assertTrue(HistoryLogLevel.validateLogLevel("none"));
+ assertFalse(HistoryLogLevel.validateLogLevel("invalid"));
+ }
+
+ private Configuration getConfiguration(String confValue) {
+ Configuration conf = new Configuration(false);
+ if (confValue != null) {
+ conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, confValue);
+ }
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 95ff0cd..042d022 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -19,14 +19,18 @@
package org.apache.tez.dag.history;
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.HistoryLogLevel;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
@@ -40,6 +44,10 @@ public class HistoryEventHandler extends CompositeService {
private boolean recoveryEnabled;
private HistoryLoggingService historyLoggingService;
+ private HistoryLogLevel amHistoryLogLevel;
+ private Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel =
+ new ConcurrentHashMap<TezDAGID, HistoryLogLevel>();
+
public HistoryEventHandler(AppContext context) {
super(HistoryEventHandler.class.getName());
this.context = context;
@@ -70,8 +78,10 @@ public class HistoryEventHandler extends CompositeService {
new Class[]{AppContext.class}, new Object[] {context});
addService(recoveryService);
}
- super.serviceInit(conf);
+ amHistoryLogLevel = HistoryLogLevel.getLogLevel(context.getAMConf(),
HistoryLogLevel.DEFAULT);
+
+ super.serviceInit(conf);
}
@Override
@@ -106,7 +116,7 @@ public class HistoryEventHandler extends CompositeService {
if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) {
recoveryService.handle(event);
}
- if (event.getHistoryEvent().isHistoryEvent()) {
+ if (event.getHistoryEvent().isHistoryEvent() && shouldLogEvent(event)) {
historyLoggingService.handle(event);
}
@@ -118,6 +128,37 @@ public class HistoryEventHandler extends CompositeService {
+ ": " + event.getHistoryEvent().toString());
}
+ private boolean shouldLogEvent(DAGHistoryEvent event) {
+ TezDAGID dagId = event.getDagID();
+
+ HistoryLogLevel dagLogLevel = null;
+ if (dagId != null) {
+ dagLogLevel = dagIdToLogLevel.get(dagId);
+ }
+ if (dagLogLevel == null) {
+ dagLogLevel = amHistoryLogLevel;
+ }
+
+ HistoryEvent historyEvent = event.getHistoryEvent();
+ if (historyEvent.getEventType() == HistoryEventType.DAG_SUBMITTED) {
+ dagLogLevel =
HistoryLogLevel.getLogLevel(((DAGSubmittedEvent)historyEvent).getConf(),
+ amHistoryLogLevel);
+ dagIdToLogLevel.put(dagId, dagLogLevel);
+ } else if (historyEvent.getEventType() == HistoryEventType.DAG_RECOVERED) {
+ if (context.getCurrentDAG() != null) {
+ dagLogLevel =
HistoryLogLevel.getLogLevel(context.getCurrentDAG().getConf(),
+ amHistoryLogLevel);
+ dagIdToLogLevel.put(dagId, dagLogLevel);
+ }
+ } else if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+ if (dagIdToLogLevel.containsKey(dagId)) {
+ dagIdToLogLevel.remove(dagId);
+ }
+ }
+
+ return
dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel());
+ }
+
public void handle(DAGHistoryEvent event) {
try {
handleCriticalEvent(event);
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 9bf98df..a41d0e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -18,32 +18,39 @@
package org.apache.tez.dag.history;
+import org.apache.tez.dag.api.HistoryLogLevel;
import org.apache.tez.dag.api.TezUncheckedException;
public enum HistoryEventType {
- APP_LAUNCHED,
- AM_LAUNCHED,
- AM_STARTED,
- DAG_SUBMITTED,
- DAG_INITIALIZED,
- DAG_STARTED,
- DAG_FINISHED,
- DAG_KILL_REQUEST,
- VERTEX_INITIALIZED,
- VERTEX_STARTED,
- VERTEX_CONFIGURE_DONE,
- VERTEX_FINISHED,
- TASK_STARTED,
- TASK_FINISHED,
- TASK_ATTEMPT_STARTED,
- TASK_ATTEMPT_FINISHED,
- CONTAINER_LAUNCHED,
- CONTAINER_STOPPED,
- DAG_COMMIT_STARTED,
- VERTEX_COMMIT_STARTED,
- VERTEX_GROUP_COMMIT_STARTED,
- VERTEX_GROUP_COMMIT_FINISHED,
- DAG_RECOVERED;
+ APP_LAUNCHED(HistoryLogLevel.AM),
+ AM_LAUNCHED(HistoryLogLevel.AM),
+ AM_STARTED(HistoryLogLevel.AM),
+ DAG_SUBMITTED(HistoryLogLevel.DAG),
+ DAG_INITIALIZED(HistoryLogLevel.DAG),
+ DAG_STARTED(HistoryLogLevel.DAG),
+ DAG_FINISHED(HistoryLogLevel.DAG),
+ DAG_KILL_REQUEST(HistoryLogLevel.DAG),
+ VERTEX_INITIALIZED(HistoryLogLevel.VERTEX),
+ VERTEX_STARTED(HistoryLogLevel.VERTEX),
+ VERTEX_CONFIGURE_DONE(HistoryLogLevel.VERTEX),
+ VERTEX_FINISHED(HistoryLogLevel.VERTEX),
+ TASK_STARTED(HistoryLogLevel.TASK),
+ TASK_FINISHED(HistoryLogLevel.TASK),
+ TASK_ATTEMPT_STARTED(HistoryLogLevel.ALL),
+ TASK_ATTEMPT_FINISHED(HistoryLogLevel.ALL),
+ CONTAINER_LAUNCHED(HistoryLogLevel.ALL),
+ CONTAINER_STOPPED(HistoryLogLevel.ALL),
+ DAG_COMMIT_STARTED(HistoryLogLevel.DAG),
+ VERTEX_COMMIT_STARTED(HistoryLogLevel.VERTEX),
+ VERTEX_GROUP_COMMIT_STARTED(HistoryLogLevel.VERTEX),
+ VERTEX_GROUP_COMMIT_FINISHED(HistoryLogLevel.VERTEX),
+ DAG_RECOVERED(HistoryLogLevel.DAG);
+
+ private final HistoryLogLevel historyLogLevel;
+
+ private HistoryEventType(HistoryLogLevel historyLogLevel) {
+ this.historyLogLevel = historyLogLevel;
+ }
public static boolean isDAGSpecificEvent(HistoryEventType historyEventType) {
switch (historyEventType) {
@@ -77,6 +84,8 @@ public enum HistoryEventType {
}
}
-
-
+ public HistoryLogLevel getHistoryLogLevel() {
+ return historyLogLevel;
}
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
new file mode 100644
index 0000000..c8a076d
--- /dev/null
+++
b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.HistoryLogLevel;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGRecoveredEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.junit.Test;
+
+public class TestHistoryEventHandler {
+
+ private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+ private static ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
+ private static String user = "TEST_USER";
+
+ @Test
+ public void testAll() {
+ testLogLevel(null, 6);
+ testLogLevel(HistoryLogLevel.NONE, 0);
+ testLogLevel(HistoryLogLevel.AM, 1);
+ testLogLevel(HistoryLogLevel.DAG, 3);
+ testLogLevel(HistoryLogLevel.VERTEX, 4);
+ testLogLevel(HistoryLogLevel.TASK, 5);
+ testLogLevel(HistoryLogLevel.ALL, 6);
+ }
+
+ @Test
+ public void testWithDAGRecovery() {
+ testLogLevelWithRecovery(null, 6);
+ testLogLevelWithRecovery(HistoryLogLevel.AM, 1);
+ testLogLevelWithRecovery(HistoryLogLevel.DAG, 3);
+ testLogLevelWithRecovery(HistoryLogLevel.VERTEX, 4);
+ testLogLevelWithRecovery(HistoryLogLevel.TASK, 5);
+ testLogLevelWithRecovery(HistoryLogLevel.ALL, 6);
+ }
+
+ @Test
+ public void testMultipleDag() {
+ testLogLevel(null, HistoryLogLevel.NONE, 7);
+ testLogLevel(null, HistoryLogLevel.AM, 7);
+ testLogLevel(null, HistoryLogLevel.DAG, 9);
+ testLogLevel(null, HistoryLogLevel.VERTEX, 10);
+ testLogLevel(null, HistoryLogLevel.TASK, 11);
+ testLogLevel(null, HistoryLogLevel.ALL, 12);
+ testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.NONE, 5);
+ testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.AM, 5);
+ testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.DAG, 7);
+ testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.VERTEX, 8);
+ testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK, 9);
+ testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 10);
+ testLogLevel(HistoryLogLevel.NONE, HistoryLogLevel.NONE, 0);
+ }
+
+ private void testLogLevelWithRecovery(HistoryLogLevel level, int
expectedCount) {
+ HistoryEventHandler handler = createHandler(level);
+ InMemoryHistoryLoggingService.events.clear();
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ List<DAGHistoryEvent> events = makeHistoryEvents(dagId,
handler.getConfig());
+ events.set(1, new DAGHistoryEvent(dagId,
+ new DAGRecoveredEvent(attemptId, dagId, "test", user, 0, null)));
+ for (DAGHistoryEvent event : events) {
+ handler.handle(event);
+ }
+ assertEquals("Failed for level: " + level,
+ expectedCount, InMemoryHistoryLoggingService.events.size());
+ handler.stop();
+ }
+
+ private void testLogLevel(HistoryLogLevel level, int expectedCount) {
+ HistoryEventHandler handler = createHandler(level);
+ InMemoryHistoryLoggingService.events.clear();
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId,
handler.getConfig())) {
+ handler.handle(event);
+ }
+ assertEquals("Failed for level: " + level,
+ expectedCount, InMemoryHistoryLoggingService.events.size());
+ handler.stop();
+ }
+
+ private void testLogLevel(HistoryLogLevel defaultLogLevel, HistoryLogLevel
dagLogLevel,
+ int expectedCount) {
+ HistoryEventHandler handler = createHandler(defaultLogLevel);
+ InMemoryHistoryLoggingService.events.clear();
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 1);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1,
handler.getConfig())) {
+ handler.handle(event);
+ }
+ TezDAGID dagId2 = TezDAGID.getInstance(appId, 2);
+ Configuration conf = new Configuration(handler.getConfig());
+ conf.setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, dagLogLevel);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId2, conf)) {
+ handler.handle(event);
+ }
+
+ assertEquals(expectedCount, InMemoryHistoryLoggingService.events.size());
+ handler.stop();
+ }
+
+ public static class InMemoryHistoryLoggingService extends
HistoryLoggingService {
+ public InMemoryHistoryLoggingService() {
+ super("InMemoryHistoryLoggingService");
+ }
+ static List<DAGHistoryEvent> events = new ArrayList<>();
+ @Override
+ public void handle(DAGHistoryEvent event) {
+ events.add(event);
+ }
+ }
+
+ private HistoryEventHandler createHandler(HistoryLogLevel logLevel) {
+ Configuration conf = new Configuration(false);
+ conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+ conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ InMemoryHistoryLoggingService.class.getName());
+ if (logLevel != null) {
+ conf.setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, logLevel);
+ }
+
+ DAG dag = mock(DAG.class);
+ when(dag.getConf()).thenReturn(conf);
+
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getApplicationID()).thenReturn(appId);
+ when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
+ when(appContext.getAMConf()).thenReturn(conf);
+ when(appContext.getCurrentDAG()).thenReturn(dag);
+
+ HistoryEventHandler handler = new HistoryEventHandler(appContext);
+ handler.init(conf);
+
+ return handler;
+ }
+
+ private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
Configuration inConf) {
+ List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+
+ long time = System.currentTimeMillis();
+ Configuration conf = new Configuration(inConf);
+ historyEvents.add(new DAGHistoryEvent(null,
+ new AMStartedEvent(attemptId, time, user)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(),
attemptId, null, user,
+ conf, null)));
+ TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new VertexStartedEvent(vertexID, time, time)));
+ TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskStartedEvent(tezTaskID, "test", time, time)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID,
1), "test", time,
+ ContainerId.newContainerId(attemptId, 1),
NodeId.newInstance("localhost", 8765), null,
+ null, null)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new DAGFinishedEvent(dagId, time, time, DAGState.SUCCEEDED, null,
null, user, "test", null,
+ attemptId, DAGPlan.getDefaultInstance())));
+ return historyEvents;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/152f1420/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
----------------------------------------------------------------------
diff --git
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
index 6f70bf5..6f653ad 100644
---
a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
+++
b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
@@ -19,20 +19,12 @@
package org.apache.tez.dag.history.ats.acls;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Random;
-import javax.ws.rs.core.MediaType;
-
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -46,26 +38,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClient;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.dag.api.HistoryLogLevel;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.runtime.library.processor.SleepProcessor;
@@ -76,15 +60,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import com.google.common.collect.Sets;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-
-import org.mockito.Matchers;
-
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
public class TestATSHistoryV15 {
@@ -103,8 +79,6 @@ public class TestATSHistoryV15 {
+ TestATSHistoryV15.class.getName() + "-tmpDir";
private static Path atsActivePath;
- private static String user;
-
@BeforeClass
public static void setup() throws IOException {
try {
@@ -144,7 +118,6 @@ public class TestATSHistoryV15 {
LOG.info("Failed to start Mini Tez Cluster", e);
}
}
- user = UserGroupInformation.getCurrentUser().getShortUserName();
timelineAddress = mrrTezCluster.getConfig().get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
if (timelineAddress != null) {
@@ -167,7 +140,7 @@ public class TestATSHistoryV15 {
}
}
- @Test (timeout=50000)
+ @Test(timeout=50000)
public void testSimpleDAG() throws Exception {
TezClient tezSession = null;
ApplicationId applicationId;
@@ -211,24 +184,77 @@ public class TestATSHistoryV15 {
assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
// Verify HDFS data
- int count = verifyATSDataOnHDFS(atsActivePath, 0, applicationId);
- Assert.assertTrue("Count is: " + count, count > 0);
-
+ int count = verifyATSDataOnHDFS(atsActivePath, applicationId);
+ Assert.assertEquals("Count is: " + count, 2, count);
} finally {
if (tezSession != null) {
tezSession.stop();
}
}
+ }
+
+ @Test
+ public void testATSLogLevelNone() throws Exception {
+ TezClient tezSession = null;
+ ApplicationId applicationId;
+ String viewAcls = "nobody nobody_group";
+ try {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = DAG.create("TezSleepProcessor");
+ Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(256, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new
TezConfiguration(mrrTezCluster.getConfig());
+
tezConf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+ "TEZ_DAG_ID");
+
+ tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSV15HistoryLoggingService.class.getName());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp",
String.valueOf(random
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
+
+ tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+ tezSession.start();
+
+ applicationId = tezSession.getAppMasterApplicationId();
+ dag.setHistoryLogLevel(HistoryLogLevel.NONE);
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + "
Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+
+ // Verify HDFS data
+ int count = verifyATSDataOnHDFS(atsActivePath, applicationId);
+ Assert.assertEquals("Count is: " + count, 1, count);
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
}
- private int verifyATSDataOnHDFS(Path p, int count, ApplicationId
applicationId) throws IOException {
+ private int verifyATSDataOnHDFS(Path p, ApplicationId applicationId) throws
IOException {
+ int count = 0;
RemoteIterator<LocatedFileStatus> iter = remoteFs.listFiles(p, true);
while (iter.hasNext()) {
LocatedFileStatus f = iter.next();
LOG.info("Found file " + f.toString());
if (f.isDirectory()) {
- verifyATSDataOnHDFS(f.getPath(), count, applicationId);
+ count += verifyATSDataOnHDFS(f.getPath(), applicationId);
} else {
if (f.getPath().getName().contains(
"" + applicationId.getClusterTimestamp() + "_" +
applicationId.getId())) {
@@ -240,7 +266,7 @@ public class TestATSHistoryV15 {
}
@Test
- public void testGetGroupId() {
+ public void testGetGroupId() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1000l, 1);
TezDAGID dagid = TezDAGID.getInstance(appId, 1);
for (final HistoryEventType eventType : HistoryEventType.values()) {
@@ -290,8 +316,8 @@ public class TestATSHistoryV15 {
default:
Assert.assertEquals(dagid.toString(),
grpId.getTimelineEntityGroupId());
}
+ service.close();
}
}
-
}