Repository: tez Updated Branches: refs/heads/master b78a84a0b -> 53aa66117
TEZ-3077. TezClient.waitTillReady should support timeout. Contributed by Kuhu Shukla. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/53aa6611 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/53aa6611 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/53aa6611 Branch: refs/heads/master Commit: 53aa6611779d03f34ac871d6fdfd640fba57f96d Parents: b78a84a Author: Siddharth Seth <[email protected]> Authored: Mon Apr 18 14:52:08 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon Apr 18 14:52:08 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 91 ++++++++++++++++-- .../org/apache/tez/dag/api/SessionNotReady.java | 31 +++++++ .../org/apache/tez/client/TestTezClient.java | 98 +++++++++++++++++++- 4 files changed, 209 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/53aa6611/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3944df2..831e7d9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.4: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3077. TezClient.waitTillReady should support timeout. TEZ-3202. Reduce the memory need for jobs with high number of segments TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs TEZ-3214. Tez UI 2: Pagination in All DAGs http://git-wip-us.apache.org/repos/asf/tez/blob/53aa6611/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index e6dd474..f359a26 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -22,9 +22,10 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.NumberFormat; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.util.Time; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.tez.common.security.JobTokenSecretManager; @@ -61,6 +63,7 @@ import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DAGSubmissionTimedOut; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.PreWarmVertex; +import org.apache.tez.dag.api.SessionNotReady; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; @@ -808,25 +811,61 @@ public class TezClient { */ @Unstable public synchronized void preWarm(PreWarmVertex preWarmVertex) throws TezException, IOException { + preWarm(preWarmVertex, 0, TimeUnit.MILLISECONDS); + } + + /** + * API to help pre-allocate containers in session mode. In non-session mode + * this is ignored. The pre-allocated containers may be re-used by subsequent + * job DAGs to improve performance. + * The preWarm vertex should be configured and setup exactly + * like the other vertices in the job DAGs so that the pre-allocated + * containers may be re-used by the subsequent DAGs to improve performance. + * The processor for the preWarmVertex may be used to pre-warm the containers + * by pre-loading classes etc. It should be short-running so that pre-warming + * does not block real execution. Users can specify their custom processors or + * use the PreWarmProcessor from the runtime library. + * The parallelism of the preWarmVertex will determine the number of preWarmed + * containers. + * Pre-warming is best efforts and among other factors is limited by the free + * resources on the cluster. Based on the specified timeout value it returns + * false if the status is not READY after the wait period. + * @param preWarmVertex + * @param timeout + * @param unit + * @throws TezException + * @throws IOException + */ + @Unstable + public synchronized void preWarm(PreWarmVertex preWarmVertex, + long timeout, TimeUnit unit) + throws TezException, IOException { if (!isSession) { - // do nothing for non session mode. This is there to let the code + // do nothing for non session mode. This is there to let the code // work correctly in both modes - LOG.warn("preWarm is not supported in non-session mode, please use session-mode of TezClient"); + LOG.warn("preWarm is not supported in non-session mode," + + "please use session-mode of TezClient"); return; } - + verifySessionStateForSubmission(); DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX + "_" + preWarmDAGCounter++); dag.addVertex(preWarmVertex); + boolean isReady; try { - waitTillReady(); + isReady = waitTillReady(timeout, unit); } catch (InterruptedException e) { - throw new IOException("Interrupted while waiting for AM to become available", e); + throw new IOException("Interrupted while waiting for AM to become " + + "available", e); + } + if(isReady) { + submitDAG(dag); + } else { + throw new SessionNotReady("Tez AM not ready, could not submit DAG"); } - submitDAG(dag); } @@ -841,12 +880,34 @@ public class TezClient { */ @Evolving public synchronized void waitTillReady() throws IOException, TezException, InterruptedException { + waitTillReady(0, TimeUnit.MILLISECONDS); + } + + /** + * Wait till the DAG is ready to be submitted. + * In non-session mode this is a no-op since the application can be + * immediately submitted. + * In session mode, this waits for the session host to be ready to accept + * a DAG and returns false if not ready after a configured time wait period. + * @param timeout + * @param unit + * @return true if READY or is not in session mode, false otherwise. + * @throws IOException + * @throws TezException + * @throws InterruptedException + */ + @Evolving + public synchronized boolean waitTillReady(long timeout, TimeUnit unit) + throws IOException, TezException, InterruptedException { + timeout = unit.toMillis(timeout); if (!isSession) { // nothing to wait for in non-session mode - return; + return true; } verifySessionStateForSubmission(); + long startTime = Time.monotonicNow(); + long timeLimit = startTime + timeout; while (true) { TezAppMasterStatus status = getAppMasterStatus(); if (status.equals(TezAppMasterStatus.SHUTDOWN)) { @@ -854,9 +915,19 @@ public class TezClient { + ((diagnostics != null) ? diagnostics : NO_CLUSTER_DIAGNOSTICS_MSG)); } if (status.equals(TezAppMasterStatus.READY)) { - return; + return true; + } + if (timeout == 0) { + Thread.sleep(SLEEP_FOR_READY); + continue; + } + long now = Time.monotonicNow(); + if (timeLimit > now) { + long sleepTime = Math.min(SLEEP_FOR_READY, timeLimit - now); + Thread.sleep(sleepTime); + } else { + return false; } - Thread.sleep(SLEEP_FOR_READY); } } http://git-wip-us.apache.org/repos/asf/tez/blob/53aa6611/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java b/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java new file mode 100644 index 0000000..8c06a4d --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +/** + * Exception thrown when the Tez Session is not ready + */ +public class SessionNotReady extends TezException { + + private static final long serialVersionUID = -287996170505550317L; + + public SessionNotReady(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/53aa6611/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 583fb79..42b762c 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -25,6 +25,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -37,9 +39,11 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -49,6 +53,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -66,6 +71,7 @@ import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.SessionNotReady; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConfigurationConstants; @@ -362,7 +368,95 @@ public class TestTezClient { client.stop(); } - + + @Test (timeout=30000) + public void testPreWarmWithTimeout() throws Exception { + long startTime = 0 , endTime = 0; + TezClientForTest client = configureAndCreateTezClient(); + final TezClientForTest spyClient = spy(client); + doCallRealMethod().when(spyClient).start(); + doCallRealMethod().when(spyClient).stop(); + spyClient.start(); + + when( + spyClient.mockYarnClient.getApplicationReport( + spyClient.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.RUNNING); + when( + spyClient.sessionAmProxy.getAMStatus((RpcController) any(), + (GetAMStatusRequestProto) any())) + .thenReturn( + GetAMStatusResponseProto.newBuilder().setStatus( + TezAppMasterStatusProto.INITIALIZING).build()); + PreWarmVertex vertex = + PreWarmVertex.create("PreWarm", 1, Resource.newInstance(1, 1)); + int timeout = 5000; + try { + startTime = Time.monotonicNow(); + spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS); + fail("PreWarm should have encountered an Exception!"); + } catch (SessionNotReady te) { + endTime = Time.monotonicNow(); + assertTrue("Time taken is not as expected", + (endTime - startTime) > timeout); + verify(spyClient, times(0)).submitDAG(any(DAG.class)); + Assert.assertTrue("Unexpected Exception message", + te.getMessage().contains("Tez AM not ready")); + + } + + when( + spyClient.sessionAmProxy.getAMStatus((RpcController) any(), + (GetAMStatusRequestProto) any())) + .thenReturn( + GetAMStatusResponseProto.newBuilder().setStatus( + TezAppMasterStatusProto.READY).build()); + try { + startTime = Time.monotonicNow(); + spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS); + endTime = Time.monotonicNow(); + assertTrue("Time taken is not as expected", + (endTime - startTime) <= timeout); + verify(spyClient, times(1)).submitDAG(any(DAG.class)); + } catch (TezException te) { + fail("PreWarm should have succeeded!"); + } + Thread amStateThread = new Thread() { + @Override + public void run() { + CountDownLatch latch = new CountDownLatch(1); + try { + when( + spyClient.sessionAmProxy.getAMStatus((RpcController) any(), + (GetAMStatusRequestProto) any())) + .thenReturn( + GetAMStatusResponseProto.newBuilder().setStatus( + TezAppMasterStatusProto.INITIALIZING).build()); + latch.await(1000, TimeUnit.MILLISECONDS); + when( + spyClient.sessionAmProxy.getAMStatus((RpcController) any(), + (GetAMStatusRequestProto) any())) + .thenReturn( + GetAMStatusResponseProto.newBuilder().setStatus( + TezAppMasterStatusProto.READY).build()); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ServiceException e) { + e.printStackTrace(); + } + } + }; + amStateThread.start(); + startTime = Time.monotonicNow(); + spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS); + endTime = Time.monotonicNow(); + assertTrue("Time taken is not as expected", + (endTime - startTime) <= timeout); + verify(spyClient, times(2)).submitDAG(any(DAG.class)); + spyClient.stop(); + client.stop(); + } + @Test (timeout = 10000) public void testMultipleSubmissions() throws Exception { testMultipleSubmissionsJob(false);
