Repository: tez Updated Branches: refs/heads/branch-0.5 8e94986f0 -> 7501320a5
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7501320a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7501320a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7501320a Branch: refs/heads/branch-0.5 Commit: 7501320a5b7f85fed1852eecd547247160dc962b Parents: 8e94986 Author: Siddharth Seth <[email protected]> Authored: Thu Aug 20 11:46:08 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 20 11:46:08 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 3 + .../org/apache/tez/common/counters/Limits.java | 10 ++- .../org/apache/tez/client/TestTezClient.java | 68 +++++++++++++++----- 4 files changed, 65 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7501320a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f519ee6..7c6d5a4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option TEZ-2630. TezChild receives IP address instead of FQDN. (hitesh) TEZ-2635. Limit number of attempts being downloaded in unordered fetch. http://git-wip-us.apache.org/repos/asf/tez/blob/7501320a/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index fc70b48..36225ab 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.common.counters.Limits; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DAGSubmissionTimedOut; @@ -139,6 +140,8 @@ public class TezClient { tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession); this.amConfig = new AMConfiguration(tezConf, localResources, credentials); this.apiVersionInfo = new TezApiVersionInfo(); + Limits.setConfiguration(tezConf); + LOG.info("Tez Client Version: " + apiVersionInfo.toString()); } http://git-wip-us.apache.org/repos/asf/tez/blob/7501320a/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java b/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java index 38e825a..f88c074 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java @@ -18,6 +18,7 @@ package org.apache.tez.common.counters; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -41,7 +42,7 @@ public class Limits { return; } if (conf == null) { - conf = new Configuration(); + conf = new TezConfiguration(); } GROUP_NAME_MAX = conf.getInt(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH, @@ -116,4 +117,11 @@ public class Limits { } } + @VisibleForTesting + @InterfaceAudience.Private + public synchronized static void reset() { + conf = null; + initialized = false; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7501320a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 14d4d7f..ee20e87 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; @@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.common.counters.LimitExceededException; +import org.apache.tez.common.counters.Limits; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -98,12 +102,19 @@ public class TestTezClient { } } - TezClientForTest configure() throws YarnException, IOException { - return configure(new HashMap<String, LocalResource>(), true); + TezClientForTest configureAndCreateTezClient() throws YarnException, IOException { + return configureAndCreateTezClient(null); } - TezClientForTest configure(Map<String, LocalResource> lrs, boolean isSession) throws YarnException, IOException { - TezConfiguration conf = new TezConfiguration(); + TezClientForTest configureAndCreateTezClient(TezConfiguration conf) throws YarnException, IOException { + return configureAndCreateTezClient(new HashMap<String, LocalResource>(), true, conf); + } + + TezClientForTest configureAndCreateTezClient(Map<String, LocalResource> lrs, boolean isSession, + TezConfiguration conf) throws YarnException, IOException { + if (conf == null) { + conf = new TezConfiguration(); + } conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true); conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession); TezClientForTest client = new TezClientForTest("test", conf, lrs, null); @@ -138,7 +149,7 @@ public class TestTezClient { lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); - TezClientForTest client = configure(lrs, isSession); + TezClientForTest client = configureAndCreateTezClient(lrs, isSession, null); ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class); when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) @@ -244,7 +255,7 @@ public class TestTezClient { @Test (timeout=5000) public void testPreWarm() throws Exception { - TezClientForTest client = configure(); + TezClientForTest client = configureAndCreateTezClient(); client.start(); when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) @@ -272,7 +283,8 @@ public class TestTezClient { } public void testMultipleSubmissionsJob(boolean isSession) throws Exception { - TezClientForTest client1 = configure(new HashMap<String, LocalResource>(), isSession); + TezClientForTest client1 = configureAndCreateTezClient(new HashMap<String, LocalResource>(), + isSession, null); when(client1.mockYarnClient.getApplicationReport(client1.mockAppId).getYarnApplicationState()) .thenReturn(YarnApplicationState.RUNNING); client1.start(); @@ -292,7 +304,7 @@ public class TestTezClient { // the dag resource will be added to the vertex once client1.submitDAG(dag); - TezClientForTest client2 = configure(); + TezClientForTest client2 = configureAndCreateTezClient(); when(client2.mockYarnClient.getApplicationReport(client2.mockAppId).getYarnApplicationState()) .thenReturn(YarnApplicationState.RUNNING); client2.start(); @@ -307,7 +319,7 @@ public class TestTezClient { @Test(timeout = 5000) public void testWaitTillReady_Interrupt() throws Exception { - final TezClientForTest client = configure(); + final TezClientForTest client = configureAndCreateTezClient(); client.start(); when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) @@ -327,13 +339,13 @@ public class TestTezClient { thread.join(250); thread.interrupt(); thread.join(); - Assert.assertThat(exceptionReference.get(),CoreMatchers. instanceOf(InterruptedException.class)); + Assert.assertThat(exceptionReference.get(), CoreMatchers.instanceOf(InterruptedException.class)); client.stop(); } @Test(timeout = 5000) public void testWaitTillReadyAppFailed() throws Exception { - final TezClientForTest client = configure(); + final TezClientForTest client = configureAndCreateTezClient(); client.start(); String msg = "Application Test Failed"; when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) @@ -342,7 +354,7 @@ public class TestTezClient { msg); try { client.waitTillReady(); - Assert.fail(); + fail(); } catch (SessionNotRunning e) { Assert.assertTrue(e.getMessage().contains(msg)); } @@ -351,13 +363,13 @@ public class TestTezClient { @Test(timeout = 5000) public void testWaitTillReadyAppFailedNoDiagnostics() throws Exception { - final TezClientForTest client = configure(); + final TezClientForTest client = configureAndCreateTezClient(); client.start(); when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) .thenReturn(YarnApplicationState.NEW).thenReturn(YarnApplicationState.FAILED); try { client.waitTillReady(); - Assert.fail(); + fail(); } catch (SessionNotRunning e) { Assert.assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG)); } @@ -366,7 +378,7 @@ public class TestTezClient { @Test(timeout = 5000) public void testSubmitDAGAppFailed() throws Exception { - final TezClientForTest client = configure(); + final TezClientForTest client = configureAndCreateTezClient(); client.start(); client.callRealGetSessionAMProxy = true; @@ -382,11 +394,35 @@ public class TestTezClient { try { client.submitDAG(dag); - Assert.fail(); + fail(); } catch (SessionNotRunning e) { Assert.assertTrue(e.getMessage().contains(msg)); } client.stop(); } + @Test(timeout = 5000) + public void testTezClientCounterLimits() throws YarnException, IOException { + Limits.reset(); + int defaultCounterLimit = TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT; + + int newCounterLimit = defaultCounterLimit + 500; + + TezConfiguration conf = new TezConfiguration(); + conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, newCounterLimit); + + configureAndCreateTezClient(conf); + + TezCounters counters = new TezCounters(); + for (int i = 0 ; i < newCounterLimit ; i++) { + counters.findCounter("GroupName", "TestCounter" + i).setValue(i); + } + + try { + counters.findCounter("GroupName", "TestCounterFail").setValue(1); + fail("Expecting a LimitExceedException - too many counters"); + } catch (LimitExceededException e) { + } + } + }
