Repository: tez Updated Branches: refs/heads/master ac37c4a49 -> cbc0c6376
TEZ-3359. Add granular log levels for HistoryLoggingService. (Harish Jaiprakash via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbc0c637 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbc0c637 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbc0c637 Branch: refs/heads/master Commit: cbc0c63761ecb81f4805cd8318cd347f0bc7674e Parents: ac37c4a Author: Hitesh Shah <[email protected]> Authored: Tue Jul 26 10:44:08 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Jul 26 10:44:08 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/tez/client/TezClient.java | 14 +- .../main/java/org/apache/tez/dag/api/DAG.java | 42 +++- .../org/apache/tez/dag/api/HistoryLogLevel.java | 63 ++++++ .../apache/tez/dag/api/TezConfiguration.java | 14 +- .../java/org/apache/tez/dag/api/TestDAG.java | 76 +++++++ .../apache/tez/dag/api/TestHistoryLogLevel.java | 63 ++++++ .../tez/dag/history/HistoryEventHandler.java | 45 +++- .../tez/dag/history/HistoryEventType.java | 59 +++--- .../dag/history/TestHistoryEventHandler.java | 204 +++++++++++++++++++ .../dag/history/ats/acls/TestATSHistoryV15.java | 98 +++++---- 11 files changed, 611 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fb39981..5f25985 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3359. Add granular log levels for HistoryLoggingService. TEZ-3374. Change TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP conf key name. TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. @@ -89,6 +90,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3359. Add granular log levels for HistoryLoggingService. TEZ-3374. Change TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP conf key name. TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. TEZ-3357. Change TimelineCachePlugin to handle DAG grouping. http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index f359a26..df39c0a 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -62,6 +62,7 @@ import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DAGSubmissionTimedOut; import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.api.HistoryLogLevel; import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.SessionNotReady; import org.apache.tez.dag.api.SessionNotRunning; @@ -362,7 +363,18 @@ public class TezClient { "Credentials cannot be set after the session App Master has been started"); amConfig.setCredentials(credentials); } - + + /** + * Sets the history log level for this session. It will be in effect for DAGs submitted after this + * call. + * + * @param historyLogLevel The log level to be used. + */ + public synchronized void setHistoryLogLevel(HistoryLogLevel historyLogLevel) { + amConfig.getTezConfiguration().setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, + historyLogLevel); + } + @Private @VisibleForTesting public synchronized void setUpHistoryAclManager(HistoryACLPolicyManager myAclPolicyManager) { http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 0eb51e1..65321a8 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -375,6 +375,18 @@ public class DAG { } /** + * Set history log level for this DAG. This config overrides the default or one set at the session + * level. + * + * @param historyLogLevel The ATS history log level for this DAG. + * + * @return this DAG + */ + public DAG setHistoryLogLevel(HistoryLogLevel historyLogLevel) { + return this.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, historyLogLevel.name()); + } + + /** * Sets the default execution context for the DAG. This can be overridden at a per Vertex level. * See {@link org.apache.tez.dag.api.Vertex#setExecutionContext(VertexExecutionContext)} * @@ -1005,7 +1017,6 @@ public class DAG { dagBuilder.addEdge(edgeBuilder); } - ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); if (dagAccessControls != null) { @@ -1030,7 +1041,7 @@ public class DAG { confProtoBuilder.addConfKeyValues(kvp); } } - if (this.dagConf != null && !this.dagConf.isEmpty()) { + if (!this.dagConf.isEmpty()) { for (Entry<String, String> entry : this.dagConf.entrySet()) { PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); kvp.setKey(entry.getKey()); @@ -1038,13 +1049,38 @@ public class DAG { confProtoBuilder.addConfKeyValues(kvp); } } + // Copy historyLogLevel from tezConf into dagConf if its not overridden in dagConf. + String logLevel = this.dagConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); + if (logLevel != null) { + // The config is from dagConf, we have already added it to the proto above, just check if + // the value is valid. + if (!HistoryLogLevel.validateLogLevel(logLevel)) { + throw new IllegalArgumentException( + "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL + + " is set to invalid value: " + logLevel); + } + } else { + // Validate and set value from tezConf. + logLevel = tezConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); + if (logLevel != null) { + if (!HistoryLogLevel.validateLogLevel(logLevel)) { + throw new IllegalArgumentException( + "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL + + " is set to invalid value: " + logLevel); + } + PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); + kvp.setKey(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); + kvp.setValue(logLevel); + confProtoBuilder.addConfKeyValues(kvp); + } + } dagBuilder.setDagConf(confProtoBuilder); if (dagCredentials != null) { dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials)); TezCommonUtils.logCredentials(LOG, dagCredentials, "dag"); } - + return dagBuilder.build(); } http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java new file mode 100644 index 0000000..5eb4785 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import java.util.Locale; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.conf.Configuration; + +/** + * The log level for the history, this is used to determine which events are sent to the history + * logger. The default level is ALL. + */ +@Public +public enum HistoryLogLevel { + NONE, + AM, + DAG, + VERTEX, + TASK, + ALL; + + public static final HistoryLogLevel DEFAULT = ALL; + + public boolean shouldLog(HistoryLogLevel eventLevel) { + return eventLevel.ordinal() <= ordinal(); + } + + public static HistoryLogLevel getLogLevel(Configuration conf, HistoryLogLevel defaultValue) { + String logLevel = conf.getTrimmed(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); + if (logLevel == null) { + return defaultValue; + } + return valueOf(logLevel.toUpperCase(Locale.ENGLISH)); + } + + public static boolean validateLogLevel(String logLevel) { + if (logLevel != null) { + try { + valueOf(logLevel.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 005304a..15e937f 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1229,9 +1229,19 @@ public class TezConfiguration extends Configuration { "org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService"; /** + * Enum value. Config to limit the type of events published to the history logging service. + * The valid log levels are defined in the enum {@link HistoryLogLevel}. The default value is + * defined in {@link HistoryLogLevel#DEFAULT}. + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty + public static final String TEZ_HISTORY_LOGGING_LOGLEVEL = + TEZ_PREFIX + "history.logging.log.level"; + + /** * Comma separated list of Integers. These are the values that were set for the config value - * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are required so that - * the groupIds generated previously will continue to be generated by the plugin. If an older + * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are required so + * that the groupIds generated previously will continue to be generated by the plugin. If an older * value is not present then the UI may not show information for DAGs which were created * with a different grouping value. * http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index ae5dfbb..05c4e30 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -30,7 +30,9 @@ import org.apache.tez.client.CallerContext; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan.Builder; import org.junit.Assert; import org.junit.Test; @@ -361,4 +363,78 @@ public class TestDAG { Assert.assertEquals(dagPlan, firstPlan); } } + + @Test + public void testCreateDAGForHistoryLogLevel() { + Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1", + LocalResource.newInstance( + URL.newInstance("file", "localhost", 0, "/test1"), + LocalResourceType.FILE, + LocalResourceVisibility.PUBLIC, 1, 1)); + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor1"), 1, + Resource.newInstance(1, 1)); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1, + Resource.newInstance(1, 1)); + DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG); + + TezConfiguration tezConf = new TezConfiguration(); + + // Expect null when history log level is not set in both dag and tezConf + DAGPlan dagPlan = dag.createDag(tezConf, null, null, null, false); + Builder builder = DAGPlan.newBuilder(dagPlan); + Assert.assertNull(findKVP(builder.getDagConf(), TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL)); + + // Set tezConf but not dag, expect value in tezConf. + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "TASK"); + dagPlan = dag.createDag(tezConf, null, null, null, false); + Assert.assertEquals("TASK", findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(), + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL)); + + // Set invalid value in tezConf, expect exception. + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "invalid"); + try { + dagPlan = dag.createDag(tezConf, null, null, null, false); + Assert.fail("Expected illegal argument exception"); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL + + " is set to invalid value: invalid", e.getMessage()); + } + + // Set value in dag, should override tez conf value. + dag.setHistoryLogLevel(HistoryLogLevel.VERTEX); + dagPlan = dag.createDag(tezConf, null, null, null, false); + Assert.assertEquals("VERTEX", findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(), + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL)); + + // Set value directly into dagConf. + dag.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, HistoryLogLevel.DAG.name()); + dagPlan = dag.createDag(tezConf, null, null, null, false); + Assert.assertEquals("DAG", findKVP(DAGPlan.newBuilder(dagPlan).getDagConf(), + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL)); + + // Set value invalid directly into dagConf and expect exception. + dag.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, "invalid"); + try { + dagPlan = dag.createDag(tezConf, null, null, null, false); + Assert.fail("Expected illegal argument exception"); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL + + " is set to invalid value: invalid", e.getMessage()); + } + } + + private String findKVP(ConfigurationProto conf, String key) { + String foundValue = null; + for (int i = 0; i < conf.getConfKeyValuesCount(); ++i) { + if (conf.getConfKeyValues(i).getKey().equals(key)) { + if (foundValue == null) { + foundValue = conf.getConfKeyValues(i).getValue(); + } else { + Assert.fail("Multiple values found: " + foundValue + ", " + + conf.getConfKeyValues(i).getValue()); + } + } + } + return foundValue; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java new file mode 100644 index 0000000..76c944d --- /dev/null +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestHistoryLogLevel.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class TestHistoryLogLevel { + + @Test + public void testGetLogLevel() { + assertNull(HistoryLogLevel.getLogLevel(getConfiguration(null), null)); + assertEquals(HistoryLogLevel.DEFAULT, + HistoryLogLevel.getLogLevel(getConfiguration(null), HistoryLogLevel.DEFAULT)); + assertEquals(HistoryLogLevel.NONE, + HistoryLogLevel.getLogLevel(getConfiguration("NONE"), HistoryLogLevel.DEFAULT)); + assertEquals(HistoryLogLevel.NONE, + HistoryLogLevel.getLogLevel(getConfiguration("none"), HistoryLogLevel.DEFAULT)); + try { + HistoryLogLevel.getLogLevel(getConfiguration("invalid"), HistoryLogLevel.DEFAULT); + fail("Expected IllegalArugment Exception"); + } catch (IllegalArgumentException e) { + } + } + + @Test + public void testValidateLogLevel() { + assertTrue(HistoryLogLevel.validateLogLevel(null)); + assertTrue(HistoryLogLevel.validateLogLevel("NONE")); + assertTrue(HistoryLogLevel.validateLogLevel("none")); + assertFalse(HistoryLogLevel.validateLogLevel("invalid")); + } + + private Configuration getConfiguration(String confValue) { + Configuration conf = new Configuration(false); + if (confValue != null) { + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, confValue); + } + return conf; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java index 95ff0cd..042d022 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java @@ -19,14 +19,18 @@ package org.apache.tez.dag.history; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.HistoryLogLevel; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.logging.HistoryLoggingService; import org.apache.tez.dag.history.recovery.RecoveryService; import org.apache.tez.dag.records.TezDAGID; @@ -40,6 +44,10 @@ public class HistoryEventHandler extends CompositeService { private boolean recoveryEnabled; private HistoryLoggingService historyLoggingService; + private HistoryLogLevel amHistoryLogLevel; + private Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel = + new ConcurrentHashMap<TezDAGID, HistoryLogLevel>(); + public HistoryEventHandler(AppContext context) { super(HistoryEventHandler.class.getName()); this.context = context; @@ -70,8 +78,10 @@ public class HistoryEventHandler extends CompositeService { new Class[]{AppContext.class}, new Object[] {context}); addService(recoveryService); } - super.serviceInit(conf); + amHistoryLogLevel = HistoryLogLevel.getLogLevel(context.getAMConf(), HistoryLogLevel.DEFAULT); + + super.serviceInit(conf); } @Override @@ -106,7 +116,7 @@ public class HistoryEventHandler extends CompositeService { if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) { recoveryService.handle(event); } - if (event.getHistoryEvent().isHistoryEvent()) { + if (event.getHistoryEvent().isHistoryEvent() && shouldLogEvent(event)) { historyLoggingService.handle(event); } @@ -118,6 +128,37 @@ public class HistoryEventHandler extends CompositeService { + ": " + event.getHistoryEvent().toString()); } + private boolean shouldLogEvent(DAGHistoryEvent event) { + TezDAGID dagId = event.getDagID(); + + HistoryLogLevel dagLogLevel = null; + if (dagId != null) { + dagLogLevel = dagIdToLogLevel.get(dagId); + } + if (dagLogLevel == null) { + dagLogLevel = amHistoryLogLevel; + } + + HistoryEvent historyEvent = event.getHistoryEvent(); + if (historyEvent.getEventType() == HistoryEventType.DAG_SUBMITTED) { + dagLogLevel = HistoryLogLevel.getLogLevel(((DAGSubmittedEvent)historyEvent).getConf(), + amHistoryLogLevel); + dagIdToLogLevel.put(dagId, dagLogLevel); + } else if (historyEvent.getEventType() == HistoryEventType.DAG_RECOVERED) { + if (context.getCurrentDAG() != null) { + dagLogLevel = HistoryLogLevel.getLogLevel(context.getCurrentDAG().getConf(), + amHistoryLogLevel); + dagIdToLogLevel.put(dagId, dagLogLevel); + } + } else if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) { + if (dagIdToLogLevel.containsKey(dagId)) { + dagIdToLogLevel.remove(dagId); + } + } + + return dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel()); + } + public void handle(DAGHistoryEvent event) { try { handleCriticalEvent(event); http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java index 9bf98df..a41d0e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java @@ -18,32 +18,39 @@ package org.apache.tez.dag.history; +import org.apache.tez.dag.api.HistoryLogLevel; import org.apache.tez.dag.api.TezUncheckedException; public enum HistoryEventType { - APP_LAUNCHED, - AM_LAUNCHED, - AM_STARTED, - DAG_SUBMITTED, - DAG_INITIALIZED, - DAG_STARTED, - DAG_FINISHED, - DAG_KILL_REQUEST, - VERTEX_INITIALIZED, - VERTEX_STARTED, - VERTEX_CONFIGURE_DONE, - VERTEX_FINISHED, - TASK_STARTED, - TASK_FINISHED, - TASK_ATTEMPT_STARTED, - TASK_ATTEMPT_FINISHED, - CONTAINER_LAUNCHED, - CONTAINER_STOPPED, - DAG_COMMIT_STARTED, - VERTEX_COMMIT_STARTED, - VERTEX_GROUP_COMMIT_STARTED, - VERTEX_GROUP_COMMIT_FINISHED, - DAG_RECOVERED; + APP_LAUNCHED(HistoryLogLevel.AM), + AM_LAUNCHED(HistoryLogLevel.AM), + AM_STARTED(HistoryLogLevel.AM), + DAG_SUBMITTED(HistoryLogLevel.DAG), + DAG_INITIALIZED(HistoryLogLevel.DAG), + DAG_STARTED(HistoryLogLevel.DAG), + DAG_FINISHED(HistoryLogLevel.DAG), + DAG_KILL_REQUEST(HistoryLogLevel.DAG), + VERTEX_INITIALIZED(HistoryLogLevel.VERTEX), + VERTEX_STARTED(HistoryLogLevel.VERTEX), + VERTEX_CONFIGURE_DONE(HistoryLogLevel.VERTEX), + VERTEX_FINISHED(HistoryLogLevel.VERTEX), + TASK_STARTED(HistoryLogLevel.TASK), + TASK_FINISHED(HistoryLogLevel.TASK), + TASK_ATTEMPT_STARTED(HistoryLogLevel.ALL), + TASK_ATTEMPT_FINISHED(HistoryLogLevel.ALL), + CONTAINER_LAUNCHED(HistoryLogLevel.ALL), + CONTAINER_STOPPED(HistoryLogLevel.ALL), + DAG_COMMIT_STARTED(HistoryLogLevel.DAG), + VERTEX_COMMIT_STARTED(HistoryLogLevel.VERTEX), + VERTEX_GROUP_COMMIT_STARTED(HistoryLogLevel.VERTEX), + VERTEX_GROUP_COMMIT_FINISHED(HistoryLogLevel.VERTEX), + DAG_RECOVERED(HistoryLogLevel.DAG); + + private final HistoryLogLevel historyLogLevel; + + private HistoryEventType(HistoryLogLevel historyLogLevel) { + this.historyLogLevel = historyLogLevel; + } public static boolean isDAGSpecificEvent(HistoryEventType historyEventType) { switch (historyEventType) { @@ -77,6 +84,8 @@ public enum HistoryEventType { } } - - + public HistoryLogLevel getHistoryLogLevel() { + return historyLogLevel; } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java new file mode 100644 index 0000000..c8a076d --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.tez.dag.api.HistoryLogLevel; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.logging.HistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.HadoopShim; +import org.junit.Test; + +public class TestHistoryEventHandler { + + private static ApplicationId appId = ApplicationId.newInstance(1000l, 1); + private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + private static String user = "TEST_USER"; + + @Test + public void testAll() { + testLogLevel(null, 6); + testLogLevel(HistoryLogLevel.NONE, 0); + testLogLevel(HistoryLogLevel.AM, 1); + testLogLevel(HistoryLogLevel.DAG, 3); + testLogLevel(HistoryLogLevel.VERTEX, 4); + testLogLevel(HistoryLogLevel.TASK, 5); + testLogLevel(HistoryLogLevel.ALL, 6); + } + + @Test + public void testWithDAGRecovery() { + testLogLevelWithRecovery(null, 6); + testLogLevelWithRecovery(HistoryLogLevel.AM, 1); + testLogLevelWithRecovery(HistoryLogLevel.DAG, 3); + testLogLevelWithRecovery(HistoryLogLevel.VERTEX, 4); + testLogLevelWithRecovery(HistoryLogLevel.TASK, 5); + testLogLevelWithRecovery(HistoryLogLevel.ALL, 6); + } + + @Test + public void testMultipleDag() { + testLogLevel(null, HistoryLogLevel.NONE, 7); + testLogLevel(null, HistoryLogLevel.AM, 7); + testLogLevel(null, HistoryLogLevel.DAG, 9); + testLogLevel(null, HistoryLogLevel.VERTEX, 10); + testLogLevel(null, HistoryLogLevel.TASK, 11); + testLogLevel(null, HistoryLogLevel.ALL, 12); + testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.NONE, 5); + testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.AM, 5); + testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.DAG, 7); + testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.VERTEX, 8); + testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK, 9); + testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 10); + testLogLevel(HistoryLogLevel.NONE, HistoryLogLevel.NONE, 0); + } + + private void testLogLevelWithRecovery(HistoryLogLevel level, int expectedCount) { + HistoryEventHandler handler = createHandler(level); + InMemoryHistoryLoggingService.events.clear(); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + List<DAGHistoryEvent> events = makeHistoryEvents(dagId, handler.getConfig()); + events.set(1, new DAGHistoryEvent(dagId, + new DAGRecoveredEvent(attemptId, dagId, "test", user, 0, null))); + for (DAGHistoryEvent event : events) { + handler.handle(event); + } + assertEquals("Failed for level: " + level, + expectedCount, InMemoryHistoryLoggingService.events.size()); + handler.stop(); + } + + private void testLogLevel(HistoryLogLevel level, int expectedCount) { + HistoryEventHandler handler = createHandler(level); + InMemoryHistoryLoggingService.events.clear(); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + for (DAGHistoryEvent event : makeHistoryEvents(dagId, handler.getConfig())) { + handler.handle(event); + } + assertEquals("Failed for level: " + level, + expectedCount, InMemoryHistoryLoggingService.events.size()); + handler.stop(); + } + + private void testLogLevel(HistoryLogLevel defaultLogLevel, HistoryLogLevel dagLogLevel, + int expectedCount) { + HistoryEventHandler handler = createHandler(defaultLogLevel); + InMemoryHistoryLoggingService.events.clear(); + TezDAGID dagId1 = TezDAGID.getInstance(appId, 1); + for (DAGHistoryEvent event : makeHistoryEvents(dagId1, handler.getConfig())) { + handler.handle(event); + } + TezDAGID dagId2 = TezDAGID.getInstance(appId, 2); + Configuration conf = new Configuration(handler.getConfig()); + conf.setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, dagLogLevel); + for (DAGHistoryEvent event : makeHistoryEvents(dagId2, conf)) { + handler.handle(event); + } + + assertEquals(expectedCount, InMemoryHistoryLoggingService.events.size()); + handler.stop(); + } + + public static class InMemoryHistoryLoggingService extends HistoryLoggingService { + public InMemoryHistoryLoggingService() { + super("InMemoryHistoryLoggingService"); + } + static List<DAGHistoryEvent> events = new ArrayList<>(); + @Override + public void handle(DAGHistoryEvent event) { + events.add(event); + } + } + + private HistoryEventHandler createHandler(HistoryLogLevel logLevel) { + Configuration conf = new Configuration(false); + conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + InMemoryHistoryLoggingService.class.getName()); + if (logLevel != null) { + conf.setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, logLevel); + } + + DAG dag = mock(DAG.class); + when(dag.getConf()).thenReturn(conf); + + AppContext appContext = mock(AppContext.class); + when(appContext.getApplicationID()).thenReturn(appId); + when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {}); + when(appContext.getAMConf()).thenReturn(conf); + when(appContext.getCurrentDAG()).thenReturn(dag); + + HistoryEventHandler handler = new HistoryEventHandler(appContext); + handler.init(conf); + + return handler; + } + + private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, Configuration inConf) { + List<DAGHistoryEvent> historyEvents = new ArrayList<>(); + + long time = System.currentTimeMillis(); + Configuration conf = new Configuration(inConf); + historyEvents.add(new DAGHistoryEvent(null, + new AMStartedEvent(attemptId, time, user))); + historyEvents.add(new DAGHistoryEvent(dagId, + new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user, + conf, null))); + TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); + historyEvents.add(new DAGHistoryEvent(dagId, + new VertexStartedEvent(vertexID, time, time))); + TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1); + historyEvents.add(new DAGHistoryEvent(dagId, + new TaskStartedEvent(tezTaskID, "test", time, time))); + historyEvents.add(new DAGHistoryEvent(dagId, + new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time, + ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null, + null, null))); + historyEvents.add(new DAGHistoryEvent(dagId, + new DAGFinishedEvent(dagId, time, time, DAGState.SUCCEEDED, null, null, user, "test", null, + attemptId, DAGPlan.getDefaultInstance()))); + return historyEvents; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/cbc0c637/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java index 6f70bf5..6f653ad 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java @@ -19,20 +19,12 @@ package org.apache.tez.dag.history.ats.acls; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Collection; -import java.util.Collections; import java.util.Random; -import javax.ws.rs.core.MediaType; - import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -46,26 +38,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClient; -import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.common.security.DAGAccessControls; +import org.apache.tez.dag.api.HistoryLogLevel; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.runtime.library.processor.SleepProcessor; @@ -76,15 +60,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import com.google.common.collect.Sets; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; - -import org.mockito.Matchers; - import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; public class TestATSHistoryV15 { @@ -103,8 +79,6 @@ public class TestATSHistoryV15 { + TestATSHistoryV15.class.getName() + "-tmpDir"; private static Path atsActivePath; - private static String user; - @BeforeClass public static void setup() throws IOException { try { @@ -144,7 +118,6 @@ public class TestATSHistoryV15 { LOG.info("Failed to start Mini Tez Cluster", e); } } - user = UserGroupInformation.getCurrentUser().getShortUserName(); timelineAddress = mrrTezCluster.getConfig().get( YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS); if (timelineAddress != null) { @@ -167,7 +140,7 @@ public class TestATSHistoryV15 { } } - @Test (timeout=50000) + @Test(timeout=50000) public void testSimpleDAG() throws Exception { TezClient tezSession = null; ApplicationId applicationId; @@ -211,24 +184,77 @@ public class TestATSHistoryV15 { assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); // Verify HDFS data - int count = verifyATSDataOnHDFS(atsActivePath, 0, applicationId); - Assert.assertTrue("Count is: " + count, count > 0); - + int count = verifyATSDataOnHDFS(atsActivePath, applicationId); + Assert.assertEquals("Count is: " + count, 2, count); } finally { if (tezSession != null) { tezSession.stop(); } } + } + + @Test + public void testATSLogLevelNone() throws Exception { + TezClient tezSession = null; + ApplicationId applicationId; + String viewAcls = "nobody nobody_group"; + try { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( + SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(256, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES, + "TEZ_DAG_ID"); + + tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSV15HistoryLoggingService.class.getName()); + Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random + .nextInt(100000)))); + remoteFs.mkdirs(remoteStagingDir); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); + + tezSession = TezClient.create("TezSleepProcessor", tezConf, true); + tezSession.start(); + + applicationId = tezSession.getAppMasterApplicationId(); + dag.setHistoryLogLevel(HistoryLogLevel.NONE); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + + // Verify HDFS data + int count = verifyATSDataOnHDFS(atsActivePath, applicationId); + Assert.assertEquals("Count is: " + count, 1, count); + } finally { + if (tezSession != null) { + tezSession.stop(); + } + } } - private int verifyATSDataOnHDFS(Path p, int count, ApplicationId applicationId) throws IOException { + private int verifyATSDataOnHDFS(Path p, ApplicationId applicationId) throws IOException { + int count = 0; RemoteIterator<LocatedFileStatus> iter = remoteFs.listFiles(p, true); while (iter.hasNext()) { LocatedFileStatus f = iter.next(); LOG.info("Found file " + f.toString()); if (f.isDirectory()) { - verifyATSDataOnHDFS(f.getPath(), count, applicationId); + count += verifyATSDataOnHDFS(f.getPath(), applicationId); } else { if (f.getPath().getName().contains( "" + applicationId.getClusterTimestamp() + "_" + applicationId.getId())) { @@ -240,7 +266,7 @@ public class TestATSHistoryV15 { } @Test - public void testGetGroupId() { + public void testGetGroupId() throws Exception { ApplicationId appId = ApplicationId.newInstance(1000l, 1); TezDAGID dagid = TezDAGID.getInstance(appId, 1); for (final HistoryEventType eventType : HistoryEventType.values()) { @@ -290,8 +316,8 @@ public class TestATSHistoryV15 { default: Assert.assertEquals(dagid.toString(), grpId.getTimelineEntityGroupId()); } + service.close(); } } - }
