Repository: tez Updated Branches: refs/heads/TEZ-2003 7f91bd878 -> 7522e60b5 (forced update)
TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false. (Chang Li via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a4f6dbce Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a4f6dbce Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a4f6dbce Branch: refs/heads/TEZ-2003 Commit: a4f6dbcecec8c084f10a0879a2960f140879618f Parents: 765adfb Author: Hitesh Shah <[email protected]> Authored: Thu Mar 26 17:20:10 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu Mar 26 17:20:10 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../ats/acls/ATSHistoryACLPolicyManager.java | 24 ++++++++-- .../ats/acls/TestATSHistoryWithACLs.java | 16 +++++++ .../logging/ats/ATSHistoryLoggingService.java | 28 +++++++++-- .../ats/TestATSHistoryLoggingService.java | 50 ++++++++++++++++++++ 5 files changed, 112 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1f07257..bd7f337 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -91,6 +91,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false. TEZ-2047. Build fails against hadoop-2.2 post TEZ-2018 TEZ-2064. SessionNotRunning Exception not thrown is all cases TEZ-2189. Tez UI live AM tracking url only works for localhost addresses http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java index 4b4487b..23d3558 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java @@ -38,6 +38,7 @@ import org.apache.tez.common.security.ACLManager; import org.apache.tez.common.security.ACLType; import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; @@ -51,6 +52,8 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { Configuration conf; String user; final static String DOMAIN_ID_PREFIX = "Tez_ATS_"; + private static final String atsHistoryLoggingServiceClassName = + "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService"; private void initializeTimelineClient() { if (this.conf == null) { @@ -60,9 +63,20 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { this.timelineClient.stop(); this.timelineClient = null; } - this.timelineClient = TimelineClient.createTimelineClient(); - this.timelineClient.init(this.conf); - this.timelineClient.start(); + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + this.timelineClient = TimelineClient.createTimelineClient(); + this.timelineClient.init(this.conf); + this.timelineClient.start(); + } else { + this.timelineClient = null; + if (conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "") + .equals(atsHistoryLoggingServiceClassName)) { + LOG.warn(atsHistoryLoggingServiceClassName + + " is disabled due to Timeline Service being disabled, " + + YarnConfiguration.TIMELINE_SERVICE_ENABLED + " set to false"); + } + } try { this.user = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException e) { @@ -110,7 +124,9 @@ public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager { timelineDomain.setWriters(user); try { - timelineClient.putDomain(timelineDomain); + if (timelineClient != null) { + timelineClient.putDomain(timelineDomain); + } } catch (Exception e) { LOG.warn("Could not post timeline domain", e); } http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java index 30c6dda..c03ce27 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClient; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.security.DAGAccessControls; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -56,6 +57,7 @@ import org.apache.tez.runtime.library.processor.SleepProcessor; import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; import org.apache.tez.tests.MiniTezClusterWithTimeline; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -295,6 +297,20 @@ public class TestATSHistoryWithACLs { verifyEntityDomains(applicationId, false); } + private static final String atsHistoryACLManagerClassName = + "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; + @Test (timeout=50000) + public void testTimelineServiceDisabled() throws Exception { + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,false); + ATSHistoryACLPolicyManager historyACLPolicyManager = ReflectionUtils.createClazzInstance( + atsHistoryACLManagerClassName); + historyACLPolicyManager.setConf(tezConf); + Assert.assertNull(historyACLPolicyManager.timelineClient); + } + private void verifyEntityDomains(ApplicationId applicationId, boolean sameDomain) { assertNotNull(timelineAddress); http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java index 11f38b9..b2ac1bb 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelineP import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.security.HistoryACLPolicyManager; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; @@ -72,6 +73,8 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { private long maxPollingTimeMillis; private String sessionDomainId; + private static final String atsHistoryLoggingServiceClassName = + "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService"; private static final String atsHistoryACLManagerClassName = "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager"; private HistoryACLPolicyManager historyACLPolicyManager; @@ -83,8 +86,19 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { @Override public void serviceInit(Configuration conf) throws Exception { LOG.info("Initializing ATSService"); - timelineClient = TimelineClient.createTimelineClient(); - timelineClient.init(conf); + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + } else { + this.timelineClient = null; + if (conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "") + .equals(atsHistoryLoggingServiceClassName)) { + LOG.warn(atsHistoryLoggingServiceClassName + + " is disabled due to Timeline Service being disabled, " + + YarnConfiguration.TIMELINE_SERVICE_ENABLED + " set to false"); + } + } maxTimeToWaitOnShutdown = conf.getLong( TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT); @@ -118,6 +132,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { @Override public void serviceStart() { + if (timelineClient == null) { + return; + } LOG.info("Starting ATSService"); timelineClient.start(); @@ -169,6 +186,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { @Override public void serviceStop() { + if (timelineClient == null) { + return; + } LOG.info("Stopping ATSService" + ", eventQueueBacklog=" + eventQueue.size()); stopped.set(true); @@ -233,7 +253,9 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { public void handle(DAGHistoryEvent event) { - eventQueue.add(event); + if (timelineClient != null) { + eventQueue.add(event); + } } private boolean isValidEvent(DAGHistoryEvent event) { http://git-wip-us.apache.org/repos/asf/tez/blob/a4f6dbce/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java index 7d1abbd..464864e 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.history.DAGHistoryEvent; @@ -142,4 +143,53 @@ public class TestATSHistoryLoggingService { Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter); } + @Test(timeout=20000) + public void testTimelineServiceDisable() throws Exception { + ATSHistoryLoggingService atsHistoryLoggingService1; + AppContext appContext1; + appContext1 = mock(AppContext.class); + atsHistoryLoggingService1 = new ATSHistoryLoggingService(); + + atsHistoryLoggingService1.setAppContext(appContext); + atsHistoryLoggingService1.timelineClient = mock(TimelineClient.class); + when(atsHistoryLoggingService1.timelineClient.putEntities( + Matchers.<TimelineEntity[]>anyVararg())).thenAnswer( + new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ++atsInvokeCounter; + atsEntitiesCounter += invocation.getArguments().length; + try { + Thread.sleep(500l); + } catch (InterruptedException e) { + // do nothing + } + return null; + } + }); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + atsHistoryLoggingService1.init(conf); + atsHistoryLoggingService1.start(); + TezDAGID tezDAGID = TezDAGID.getInstance( + ApplicationId.newInstance(100l, 1), 1); + DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID, + new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1")); + for (int i = 0; i < 100; ++i) { + atsHistoryLoggingService1.handle(historyEvent); + } + + try { + Thread.sleep(1000l); + } catch (InterruptedException e) { + // Do nothing + } + LOG.info("ATS entitiesSent=" + atsEntitiesCounter + + ", timelineInvocations=" + atsInvokeCounter); + Assert.assertEquals(atsInvokeCounter, 0); + Assert.assertEquals(atsEntitiesCounter, 0); + Assert.assertNull(atsHistoryLoggingService1.timelineClient); + + } }
