Repository: tez Updated Branches: refs/heads/master c259daedf -> 141881cee
TEZ-1278. TezClient#waitTillReady() should not swallow interrupts. Contributed by Johannes Zillmann. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/141881ce Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/141881ce Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/141881ce Branch: refs/heads/master Commit: 141881ceeb1c0765f166b7691e473839e196288f Parents: c259dae Author: Siddharth Seth <[email protected]> Authored: Tue Jul 29 11:09:21 2014 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Jul 29 11:09:21 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 10 ++--- .../org/apache/tez/client/TestTezClient.java | 43 ++++++++++++++++++++ .../mapreduce/examples/OrderedWordCount.java | 2 +- 4 files changed, 48 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/141881ce/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fef7b8d..341dd6a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -36,6 +36,7 @@ INCOMPATIBLE CHANGES (bikas)" TEZ-1300. Change deploy mechanism for Tez to be based on a tarball which includes Hadoop libs. + TEZ-1278. TezClient#waitTillReady() should not swallow interrupts Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/141881ce/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index be6cf13..454aad6 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -573,9 +573,10 @@ public class TezClient { * In session mode, this waits for the session host to be ready to accept a DAG * @throws IOException * @throws TezException + * @throws InterruptedException */ @InterfaceStability.Evolving - public void waitTillReady() throws IOException, TezException { + public void waitTillReady() throws IOException, TezException, InterruptedException { if (!isSession) { // nothing to wait for in non-session mode return; @@ -589,12 +590,7 @@ public class TezClient { if (status.equals(TezAppMasterStatus.READY)) { return; } - try { - Thread.sleep(SLEEP_FOR_READY); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted", e); - continue; - } + Thread.sleep(SLEEP_FOR_READY); } } http://git-wip-us.apache.org/repos/asf/tez/blob/141881ce/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 0454fda..7b00c28 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -20,6 +20,7 @@ package org.apache.tez.client; import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -49,6 +51,7 @@ import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; +import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -205,4 +208,44 @@ public class TestTezClient { } verify(yarnClient, times(1)).stop(); } + + @Test(timeout = 5000) + public void testWaitTillReady_Interrupt() throws Exception { + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true); + conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); + Map<String, LocalResource> lrs = Maps.newHashMap(); + final TezClientForTest client = new TezClientForTest("test", conf, lrs, null); + + ApplicationId appId1 = ApplicationId.newInstance(0, 1); + YarnClient yarnClient = mock(YarnClient.class, RETURNS_DEEP_STUBS); + when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId1); + ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class); + + DAGClientAMProtocolBlockingPB sessionAmProxy = mock(DAGClientAMProtocolBlockingPB.class, RETURNS_DEEP_STUBS); + + client.sessionAmProxy = sessionAmProxy; + client.mockYarnClient = new TezYarnClient(yarnClient); + + client.start(); + + when(yarnClient.getApplicationReport(appId1).getYarnApplicationState()).thenReturn(YarnApplicationState.NEW); + final AtomicReference<Exception> exceptionReference = new AtomicReference<Exception>(); + Thread thread = new Thread() { + @Override + public void run() { + try { + client.waitTillReady(); + } catch (Exception e) { + exceptionReference.set(e); + } + }; + }; + thread.start(); + thread.join(250); + thread.interrupt(); + thread.join(); + Assert.assertThat(exceptionReference.get(),CoreMatchers. instanceOf(InterruptedException.class)); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/141881ce/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java index f66e60f..37de9b5 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java @@ -506,7 +506,7 @@ public class OrderedWordCount extends Configured implements Tool { } private static void waitForTezSessionReady(TezClient tezSession) - throws IOException, TezException { + throws IOException, TezException, InterruptedException { tezSession.waitTillReady(); }
