Repository: tez
Updated Branches:
  refs/heads/master f6624c152 -> cf0302c42


TEZ-3943. TezClient leaks DAGClient for prewarm (Sergey Shelukhin via jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cf0302c4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cf0302c4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cf0302c4

Branch: refs/heads/master
Commit: cf0302c429b1b24b75371374e6676376decc34b0
Parents: f6624c1
Author: Jason Lowe <[email protected]>
Authored: Tue May 29 14:23:03 2018 -0500
Committer: Jason Lowe <[email protected]>
Committed: Tue May 29 14:23:03 2018 -0500

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezClient.java   | 24 ++++++++++++++++++--
 .../org/apache/tez/client/TestTezClient.java    | 21 ++++++++++++++++-
 2 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cf0302c4/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index d2c1af4..9dd4a69 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -142,7 +142,7 @@ public class TezClient {
   @VisibleForTesting
   final ServicePluginsDescriptor servicePluginsDescriptor;
   private JavaOptsChecker javaOptsChecker = null;
-
+  private DAGClient prewarmDagClient = null;
   private int preWarmDAGCounter = 0;
 
   /* max submitDAG request size through IPC; beyond this we transfer them in 
the same way we transfer local resource */
@@ -591,6 +591,25 @@ public class TezClient {
     }
   }
 
+  private void closePrewarmDagClient() {
+    if (prewarmDagClient == null) {
+      return;
+    }
+    try {
+       prewarmDagClient.tryKillDAG();
+       LOG.info("Waiting for prewarm DAG to shut down");
+       prewarmDagClient.waitForCompletion();
+    } catch (Exception ex) {
+       LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex);
+    }
+    try {
+      prewarmDagClient.close();
+    } catch (Exception e) {
+      LOG.warn("Failed to close prewarm DagClient " + prewarmDagClient, e);
+    }
+    prewarmDagClient = null;
+  }
+  
   private DAGClient submitDAGSession(DAG dag) throws TezException, IOException 
{
     Preconditions.checkState(isSession == true, 
         "submitDAG with additional resources applies to only session mode. " + 
@@ -693,6 +712,7 @@ public class TezClient {
    * @throws IOException
    */
   public synchronized void stop() throws TezException, IOException {
+    closePrewarmDagClient();
     try {
       if (amKeepAliveService != null) {
         amKeepAliveService.shutdownNow();
@@ -925,7 +945,7 @@ public class TezClient {
           "available", e);
     }
     if(isReady) {
-      submitDAG(dag);
+      prewarmDagClient = submitDAG(dag);
     } else {
       throw new SessionNotReady("Tez AM not ready, could not submit DAG");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/cf0302c4/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java 
b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 2c04061..e959a55 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -38,6 +38,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.atLeast;
@@ -87,10 +88,15 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
+import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
+import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
+import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
@@ -405,7 +411,7 @@ public class TestTezClient {
     client.start();
 
     
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
-    .thenReturn(YarnApplicationState.RUNNING);
+        .thenReturn(YarnApplicationState.RUNNING);
     
     when(
         client.sessionAmProxy.getAMStatus((RpcController) any(), 
(GetAMStatusRequestProto) any()))
@@ -419,9 +425,21 @@ public class TestTezClient {
     SubmitDAGRequestProto proto = captor1.getValue();
     
assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX));
 
+    setClientToReportStoppedDags(client);
     client.stop();
   }
 
+  private void setClientToReportStoppedDags(TezClientForTest client) throws 
Exception {
+    
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+      .thenReturn(YarnApplicationState.FINISHED);
+    when(client.sessionAmProxy.getDAGStatus(isNull(RpcController.class), 
any(GetDAGStatusRequestProto.class)))
+      
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(DAGStatusProto.newBuilder()
+          
.addDiagnostics("Diagnostics_0").setState(DAGStatusStateProto.DAG_SUCCEEDED)
+          .setDAGProgress(ProgressProto.newBuilder()
+                  
.setFailedTaskCount(0).setKilledTaskCount(0).setRunningTaskCount(0)
+                  
.setSucceededTaskCount(1).setTotalTaskCount(1).build()).build()).build());
+  }
+
   @Test (timeout=30000)
   public void testPreWarmWithTimeout() throws Exception {
     long startTime = 0 , endTime = 0;
@@ -506,6 +524,7 @@ public class TestTezClient {
     assertTrue("Time taken is not as expected",
         (endTime - startTime) <= timeout);
     verify(spyClient, times(2)).submitDAG(any(DAG.class));
+    setClientToReportStoppedDags(client);
     spyClient.stop();
     client.stop();
   }

Reply via email to