Repository: tez Updated Branches: refs/heads/master 89d47c325 -> d67faeb37
TEZ-3551: FrameworkClient created twice causing minor delay (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d67faeb3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d67faeb3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d67faeb3 Branch: refs/heads/master Commit: d67faeb37c8d2600449431d4ab5dac1bfcd7b1fa Parents: 89d47c3 Author: Rajesh Balamohan <[email protected]> Authored: Tue Jan 10 08:49:59 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Jan 10 08:49:59 2017 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/client/FrameworkClient.java | 2 + .../java/org/apache/tez/client/TezClient.java | 18 +++++++-- .../org/apache/tez/client/TezYarnClient.java | 9 +++++ .../tez/dag/api/client/DAGClientImpl.java | 23 +++++------ .../dag/api/client/rpc/DAGClientRPCImpl.java | 13 ++++++- .../tez/dag/api/client/rpc/TestDAGClient.java | 23 +++++++---- .../java/org/apache/tez/client/LocalClient.java | 5 +++ .../org/apache/tez/test/TestFaultTolerance.java | 40 ++++++++++++++++++-- 9 files changed, 107 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6aa66f3..0fe0c88 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3551. FrameworkClient created twice causing minor delay. TEZ-3566. Avoid caching fs isntances in TokenCache after a point. TEZ-3568. Update SecurityUtils configuration to pick user provided configuration. TEZ-3561. Fix wrong tez tarball name in install.md. http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java index cb20f49..b3e084c 100644 --- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java @@ -78,4 +78,6 @@ public abstract class FrameworkClient { public abstract ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException; + public abstract boolean isRunning() throws IOException; + } http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 29e7a8b..f4e9f10 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -547,7 +547,6 @@ public class TezClient { } } - TezConfiguration dagClientConf = new TezConfiguration(amConfig.getTezConfiguration()); Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials); DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker); @@ -613,7 +612,9 @@ public class TezClient { + ", dagId=" + dagId + ", dagName=" + dag.getName()); return new DAGClientImpl(sessionAppId, dagId, - dagClientConf, frameworkClient); + amConfig.getTezConfiguration(), + amConfig.getYarnConfiguration(), + frameworkClient); } /** @@ -1030,7 +1031,8 @@ public class TezClient { } // wait for dag in non-session mode to start running, so that we can start to getDAGStatus waitNonSessionTillReady(); - return getDAGClient(appId, amConfig.getTezConfiguration(), frameworkClient); + return getDAGClient(appId, amConfig.getTezConfiguration(), amConfig.getYarnConfiguration(), + frameworkClient); } private ApplicationId createApplication() throws TezException, IOException { @@ -1052,11 +1054,19 @@ public class TezClient { return cachedTezJarResources; } + @Private + static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, YarnConfiguration + yarnConf, FrameworkClient frameworkClient) + throws IOException, TezException { + return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, + yarnConf, frameworkClient); + } + @Private // Used only for MapReduce compatibility code static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, FrameworkClient frameworkClient) throws IOException, TezException { - return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient); + return getDAGClient(appId, tezConf, new YarnConfiguration(tezConf), frameworkClient); } // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java index 3ac82ac..2a0c79a 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java @@ -38,6 +38,8 @@ public class TezYarnClient extends FrameworkClient { private final YarnClient yarnClient; + private volatile boolean isRunning; + protected TezYarnClient(YarnClient yarnClient) { this.yarnClient = yarnClient; } @@ -50,10 +52,12 @@ public class TezYarnClient extends FrameworkClient { @Override public void start() { yarnClient.start(); + isRunning = true; } @Override public void stop() { + isRunning = false; yarnClient.stop(); } @@ -98,4 +102,9 @@ public class TezYarnClient extends FrameworkClient { } return report; } + + @Override + public boolean isRunning() throws IOException { + return isRunning; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index af67ee8..4820b6e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -76,27 +76,28 @@ public class DAGClientImpl extends DAGClient { VertexStatus.State.ERROR); private long statusPollInterval; private long diagnoticsWaitTimeout; + private boolean cleanupFrameworkClient; public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf, - @Nullable FrameworkClient frameworkClient) { + YarnConfiguration yarnConf, @Nullable FrameworkClient frameworkClient) { this.appId = appId; this.dagId = dagId; this.conf = conf; - if (frameworkClient != null && - conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) { + if (frameworkClient != null) { this.frameworkClient = frameworkClient; } else { this.frameworkClient = FrameworkClient.createFrameworkClient(conf); - this.frameworkClient.init(conf, new YarnConfiguration(conf)); + this.frameworkClient.init(conf, yarnConf); this.frameworkClient.start(); + cleanupFrameworkClient = true; } isATSEnabled = conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "") - .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") && - conf.getBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, - TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT) && - conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, - TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) && - DAGClientTimelineImpl.isSupported(); + .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") && + conf.getBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, + TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT) && + conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) && + DAGClientTimelineImpl.isSupported(); realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient); statusPollInterval = conf.getLong( @@ -341,7 +342,7 @@ public class DAGClientImpl extends DAGClient { @Override public void close() throws IOException { realClient.close(); - if (frameworkClient != null) { + if (frameworkClient != null && cleanupFrameworkClient) { frameworkClient.stop(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index ff48755..9eb9807 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -227,8 +227,15 @@ public class DAGClientRPCImpl extends DAGClientInternal { ApplicationReport getAppReport() throws IOException, TezException, ApplicationNotFoundException { + FrameworkClient client = null; try { - ApplicationReport appReport = frameworkClient.getApplicationReport(appId); + ApplicationReport appReport = null; + if (!frameworkClient.isRunning()) { + client = FrameworkClient.createFrameworkClient(conf); + appReport = client.getApplicationReport(appId); + } else { + appReport = frameworkClient.getApplicationReport(appId); + } if (LOG.isDebugEnabled()) { LOG.debug("App: " + appId + " in state: " + appReport.getYarnApplicationState()); @@ -238,6 +245,10 @@ public class DAGClientRPCImpl extends DAGClientInternal { throw e; } catch (YarnException e) { throw new TezException(e); + } finally { + if (client != null) { + client.stop(); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java index 674781e..e979f8b 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java @@ -34,6 +34,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.client.FrameworkClient; import org.apache.tez.dag.api.TezConfiguration; @@ -200,8 +201,10 @@ public class TestDAGClient { .thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithoutCounters).build()); when(mockProxy.getVertexStatus(isNull(RpcController.class), argThat(new VertexCounterRequestMatcher()))) .thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithCounters).build()); - - dagClient = new DAGClientImpl(mockAppId, dagIdStr, new TezConfiguration(), null); + + TezConfiguration tezConf = new TezConfiguration(); + YarnConfiguration yarnConf = new YarnConfiguration(tezConf); + dagClient = new DAGClientImpl(mockAppId, dagIdStr, tezConf, yarnConf, null); DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient(); realClient.appReport = mockAppReport; realClient.proxy = mockProxy; @@ -335,8 +338,10 @@ public class TestDAGClient { TezConfiguration tezConf = new TezConfiguration(); tezConf.setLong(TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS, 800l); + YarnConfiguration yarnConf = new YarnConfiguration(tezConf); - DAGClientImplForTest dagClient = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, null); + DAGClientImplForTest dagClient = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, + yarnConf,null); DAGClientRPCImplForTest dagClientRpc = new DAGClientRPCImplForTest(mockAppId, dagIdStr, tezConf, null); dagClient.setRealClient(dagClientRpc); @@ -417,12 +422,14 @@ public class TestDAGClient { String loggingClass, boolean amHistoryLoggingEnabled, boolean dagHistoryLoggingEnabled) { TezConfiguration tezConf = new TezConfiguration(); + YarnConfiguration yarnConf = new YarnConfiguration(tezConf); tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, loggingClass); tezConf.setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, amHistoryLoggingEnabled); tezConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, dagHistoryLoggingEnabled); - DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf, null); + DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf, + yarnConf,null); assertEquals(expected, dagClient.getIsATSEnabled()); } @@ -466,10 +473,10 @@ public class TestDAGClient { private DAGStatus rmDagStatus; int numGetStatusViaRmInvocations = 0; - public DAGClientImplForTest(ApplicationId appId, String dagId, - TezConfiguration conf, - @Nullable FrameworkClient frameworkClient) { - super(appId, dagId, conf, frameworkClient); + public DAGClientImplForTest(ApplicationId appId, String dagId, TezConfiguration conf, + YarnConfiguration yarnConf, + @Nullable FrameworkClient frameworkClient) { + super(appId, dagId, conf, yarnConf, frameworkClient); } private void setRealClient(DAGClientRPCImplForTest dagClientRpcImplForTest) { http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 7c65c07..db7fc2c 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -147,6 +147,11 @@ public class LocalClient extends FrameworkClient { } @Override + public boolean isRunning() { + return true; + } + + @Override public ApplicationReport getApplicationReport(ApplicationId appId) { ApplicationReport report = Records.newRecord(ApplicationReport.class); report.setApplicationId(appId); http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index b2a5d17..08bac0d 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -72,7 +72,8 @@ public class TestFaultTolerance { protected static MiniDFSCluster dfsCluster; private static TezClient tezSession = null; - + private static TezConfiguration tezConf; + @BeforeClass public static void setup() throws Exception { LOG.info("Starting mini clusters"); @@ -97,7 +98,7 @@ public class TestFaultTolerance { .valueOf(new Random().nextInt(100000)))); TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir); - TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + tezConf = new TezConfiguration(miniTezCluster.getConfig()); tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString()); tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); @@ -160,7 +161,40 @@ public class TestFaultTolerance { Assert.assertTrue(Joiner.on(":").join(dagStatus.getDiagnostics()).contains(diagnostics)); } } - + + @Test (timeout=600000) + public void testSessionStopped() throws Exception { + Configuration testConf = new Configuration(false); + + testConf.setBoolean(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true); + testConf.set(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0"); + testConf.setInt(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0); + + // verify value at v2 task1 + testConf.set(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1"); + + testConf.setInt(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 4); + DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf); + tezSession.waitTillReady(); + + DAGClient dagClient = tezSession.submitDAG(dag); + dagClient.waitForCompletion(); + // kill the session now + tezSession.stop(); + + // Check if killing DAG does not throw any exception + dagClient.tryKillDAG(); + + // restart the session for rest of the tests + tezSession = TezClient.create("TestFaultTolerance", tezConf, true); + tezSession.start(); + } + @Test (timeout=60000) public void testBasicSuccessScatterGather() throws Exception { DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", null);
