Repository: tez Updated Branches: refs/heads/branch-0.9 364ae4faf -> c369ba659
TEZ-3943. TezClient leaks DAGClient for prewarm (Sergey Shelukhin via jlowe) (cherry picked from commit cf0302c429b1b24b75371374e6676376decc34b0) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c369ba65 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c369ba65 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c369ba65 Branch: refs/heads/branch-0.9 Commit: c369ba659c56a8fe7c32d4972cb99590400644b8 Parents: 364ae4f Author: Jason Lowe <[email protected]> Authored: Tue May 29 14:23:03 2018 -0500 Committer: Jason Lowe <[email protected]> Committed: Tue May 29 14:26:24 2018 -0500 ---------------------------------------------------------------------- .../java/org/apache/tez/client/TezClient.java | 24 ++++++++++++++++++-- .../org/apache/tez/client/TestTezClient.java | 21 ++++++++++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c369ba65/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index d2c1af4..9dd4a69 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -142,7 +142,7 @@ public class TezClient { @VisibleForTesting final ServicePluginsDescriptor servicePluginsDescriptor; private JavaOptsChecker javaOptsChecker = null; - + private DAGClient prewarmDagClient = null; private int preWarmDAGCounter = 0; /* max submitDAG request size through IPC; beyond this we transfer them in the same way we transfer local resource */ @@ -591,6 +591,25 @@ public class TezClient { } } + private void closePrewarmDagClient() { + if (prewarmDagClient == null) { + return; + } + try { + prewarmDagClient.tryKillDAG(); + LOG.info("Waiting for prewarm DAG to shut down"); + prewarmDagClient.waitForCompletion(); + } catch (Exception ex) { + LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex); + } + try { + prewarmDagClient.close(); + } catch (Exception e) { + LOG.warn("Failed to close prewarm DagClient " + prewarmDagClient, e); + } + prewarmDagClient = null; + } + private DAGClient submitDAGSession(DAG dag) throws TezException, IOException { Preconditions.checkState(isSession == true, "submitDAG with additional resources applies to only session mode. " + @@ -693,6 +712,7 @@ public class TezClient { * @throws IOException */ public synchronized void stop() throws TezException, IOException { + closePrewarmDagClient(); try { if (amKeepAliveService != null) { amKeepAliveService.shutdownNow(); @@ -925,7 +945,7 @@ public class TezClient { "available", e); } if(isReady) { - submitDAG(dag); + prewarmDagClient = submitDAG(dag); } else { throw new SessionNotReady("Tez AM not ready, could not submit DAG"); } http://git-wip-us.apache.org/repos/asf/tez/blob/c369ba65/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 2c04061..e959a55 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -38,6 +38,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.times; import static org.mockito.Mockito.atLeast; @@ -87,10 +88,15 @@ import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; +import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto; +import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto; +import org.apache.tez.dag.api.records.DAGProtos.ProgressProto; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.hamcrest.CoreMatchers; import org.junit.Assert; @@ -405,7 +411,7 @@ public class TestTezClient { client.start(); when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) - .thenReturn(YarnApplicationState.RUNNING); + .thenReturn(YarnApplicationState.RUNNING); when( client.sessionAmProxy.getAMStatus((RpcController) any(), (GetAMStatusRequestProto) any())) @@ -419,9 +425,21 @@ public class TestTezClient { SubmitDAGRequestProto proto = captor1.getValue(); assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)); + setClientToReportStoppedDags(client); client.stop(); } + private void setClientToReportStoppedDags(TezClientForTest client) throws Exception { + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.FINISHED); + when(client.sessionAmProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class))) + .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGStatusProto.newBuilder() + .addDiagnostics("Diagnostics_0").setState(DAGStatusStateProto.DAG_SUCCEEDED) + .setDAGProgress(ProgressProto.newBuilder() + .setFailedTaskCount(0).setKilledTaskCount(0).setRunningTaskCount(0) + .setSucceededTaskCount(1).setTotalTaskCount(1).build()).build()).build()); + } + @Test (timeout=30000) public void testPreWarmWithTimeout() throws Exception { long startTime = 0 , endTime = 0; @@ -506,6 +524,7 @@ public class TestTezClient { assertTrue("Time taken is not as expected", (endTime - startTime) <= timeout); verify(spyClient, times(2)).submitDAG(any(DAG.class)); + setClientToReportStoppedDags(client); spyClient.stop(); client.stop(); }
