Repository: tez
Updated Branches:
  refs/heads/branch-0.7 32de2b079 -> fb8fbd594


TEZ-2300. TezClient.stop() takes a lot of time or does not work sometimes 
(jeagles)

(cherry picked from commit 596c90a777beed4cdf7fb2fcf08e92a003d03c18)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fb8fbd59
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fb8fbd59
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fb8fbd59

Branch: refs/heads/branch-0.7
Commit: fb8fbd594975dee39cb375a36058907d4fa54eb5
Parents: 32de2b0
Author: Jonathan Eagles <[email protected]>
Authored: Fri Aug 28 10:51:37 2015 -0500
Committer: Jonathan Eagles <[email protected]>
Committed: Fri Aug 28 11:06:51 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/tez/client/TezClient.java   | 38 ++++++++++++++++++++
 .../apache/tez/dag/api/TezConfiguration.java    | 20 +++++++++++
 .../org/apache/tez/client/TestTezClient.java    | 37 +++++++++++++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  7 +---
 5 files changed, 97 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index de31984..4bc5e7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2300. TezClient.stop() takes a lot of time or does not work sometimes
   TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
   TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort 
buffers
   TEZ-2687. ATS History shutdown happens before the min-held containers are 
released

http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 45837fe..3a5302c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -506,11 +507,43 @@ public class TezClient {
                 ShutdownSessionRequestProto.newBuilder().build();
             proxy.shutdownSession(null, request);
             sessionShutdownSuccessful = true;
+            boolean asynchronousStop = 
amConfig.getTezConfiguration().getBoolean(
+                TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP,
+                TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT);
+            if (!asynchronousStop) {
+              LOG.info("Waiting until application is in a final state");
+              long currentTimeMillis = System.currentTimeMillis();
+              long timeKillIssued = currentTimeMillis;
+              long killTimeOut = amConfig.getTezConfiguration().getLong(
+                  TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS,
+                  TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT);
+              ApplicationReport appReport = frameworkClient
+                  .getApplicationReport(sessionAppId);
+              while ((currentTimeMillis < timeKillIssued + killTimeOut)
+                  && 
!isJobInTerminalState(appReport.getYarnApplicationState())) {
+                try {
+                  Thread.sleep(1000L);
+                } catch (InterruptedException ie) {
+                  /** interrupted, just break */
+                  break;
+                }
+                currentTimeMillis = System.currentTimeMillis();
+                appReport = frameworkClient.getApplicationReport(sessionAppId);
+              }
+
+              if (!isJobInTerminalState(appReport.getYarnApplicationState())) {
+                frameworkClient.killApplication(sessionAppId);
+              }
+            }
           }
         } catch (TezException e) {
           LOG.info("Failed to shutdown Tez Session via proxy", e);
         } catch (ServiceException e) {
           LOG.info("Failed to shutdown Tez Session via proxy", e);
+        } catch (ApplicationNotFoundException e) {
+          LOG.info("Failed to kill nonexistent application " + sessionAppId, 
e);
+        } catch (YarnException e) {
+          throw new TezException(e);
         }
         if (!sessionShutdownSuccessful) {
           LOG.info("Could not connect to AM, killing session via YARN"
@@ -531,6 +564,11 @@ public class TezClient {
       }
     }
   }
+  private boolean isJobInTerminalState(YarnApplicationState 
yarnApplicationState) {
+    return (yarnApplicationState == YarnApplicationState.FINISHED
+        || yarnApplicationState == YarnApplicationState.FAILED
+        || yarnApplicationState == YarnApplicationState.KILLED);
+  }
 
   /**
    * Get the name of the client

http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 2114793..aa781bc 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1258,4 +1258,24 @@ public class TezConfiguration extends Configuration {
       TEZ_PREFIX + "client.diagnostics.wait.timeout-ms";
   @Private
   public static final long TEZ_CLIENT_DIAGNOSTICS_WAIT_TIMEOUT_MS_DEFAULT = 
3*1000;
+
+  /**
+   * Long value. Time interval, in milliseconds, for client to wait during 
client-requested
+   * AM shutdown before issuing a hard kill to the RM for this application.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.CLIENT)
+  public static final String TEZ_CLIENT_HARD_KILL_TIMEOUT_MS = TEZ_PREFIX + 
"client.timeout-ms";
+
+  public static final long TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT = 30 * 
1000L;
+
+  /**
+   * Boolean value. Backwards compatibility setting. Changes TezClient stop to 
be a
+   * synchronous call waiting until AM is in a final state before returning to 
the user.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.CLIENT)
+  public static final String TEZ_CLIENT_ASYNCHRONOUS_STOP = TEZ_PREFIX + 
"client.asynchronous-stop";
+
+  public static final boolean TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT = true;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java 
b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 92d3cd2..f1f23e1 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -31,6 +31,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -74,6 +75,7 @@ import com.google.common.collect.Maps;
 import com.google.protobuf.RpcController;
 
 public class TestTezClient {
+  static final long HARD_KILL_TIMEOUT = 1500L;
 
   class TezClientForTest extends TezClient {
     TezYarnClient mockTezYarnClient;
@@ -119,6 +121,7 @@ public class TestTezClient {
     }
     conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
+    conf.setLong(TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS, 
HARD_KILL_TIMEOUT);
     TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
 
     ApplicationId appId1 = ApplicationId.newInstance(0, 1);
@@ -429,4 +432,38 @@ public class TestTezClient {
     }
   }
 
+  @Test(timeout = 5000)
+  public void testStopRetriesUntilTerminalState() throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false);
+    final TezClientForTest client = configureAndCreateTezClient(conf);
+    client.start();
+    
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+        
.thenReturn(YarnApplicationState.NEW).thenReturn(YarnApplicationState.KILLED);
+    try {
+      client.stop();
+    } catch (Exception e) {
+      Assert.fail("Expected ApplicationNotFoundException");
+    }
+    verify(client.mockYarnClient, 
atLeast(2)).getApplicationReport(client.mockAppId);
+  }
+
+  @Test(timeout = 5000)
+  public void testStopRetriesUntilTimeout() throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false);
+    final TezClientForTest client = configureAndCreateTezClient(conf);
+    client.start();
+    
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+        .thenReturn(YarnApplicationState.NEW);
+    long start = System.currentTimeMillis();
+    try {
+      client.stop();
+    } catch (Exception e) {
+      Assert.fail("Stop should complete without exception: " + e);
+    }
+    long end = System.currentTimeMillis();
+    verify(client.mockYarnClient, 
atLeast(2)).getApplicationReport(client.mockAppId);
+    Assert.assertTrue("Stop ended before timeout", end - start > 
HARD_KILL_TIMEOUT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 691ee02..9bf0819 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1183,12 +1183,7 @@ public class DAGAppMaster extends AbstractService {
         //send a DAG_KILL message
         LOG.info("Sending a kill event to the current DAG"
             + ", dagId=" + currentDAG.getID());
-        try {
-          logDAGKillRequestEvent(currentDAG.getID(), true);
-        } catch (IOException e) {
-          throw new TezException(e);
-        }
-        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+        tryKillDAG(currentDAG);
       } else {
         LOG.info("No current running DAG, shutting down the AM");
         if (isSession && !state.equals(DAGAppMasterState.ERROR)) {

Reply via email to