Repository: tez
Updated Branches:
  refs/heads/master 77efbc313 -> 596c90a77


TEZ-2300. TezClient.stop() takes a lot of time or does not work sometimes 
(jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/596c90a7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/596c90a7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/596c90a7

Branch: refs/heads/master
Commit: 596c90a777beed4cdf7fb2fcf08e92a003d03c18
Parents: 77efbc3
Author: Jonathan Eagles <[email protected]>
Authored: Fri Aug 28 10:51:37 2015 -0500
Committer: Jonathan Eagles <[email protected]>
Committed: Fri Aug 28 10:51:37 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/tez/client/TezClient.java   | 38 ++++++++++++++++++++
 .../apache/tez/dag/api/TezConfiguration.java    | 19 ++++++++++
 .../org/apache/tez/client/TestTezClient.java    | 38 ++++++++++++++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  7 +---
 5 files changed, 97 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d1eb7f6..09e51d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -151,6 +151,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2300. TezClient.stop() takes a lot of time or does not work sometimes
   TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
   TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort 
buffers
   TEZ-2687. ATS History shutdown happens before the min-held containers are 
released

http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 312ddcd..e39cf4f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -565,11 +566,43 @@ public class TezClient {
                 ShutdownSessionRequestProto.newBuilder().build();
             proxy.shutdownSession(null, request);
             sessionShutdownSuccessful = true;
+            boolean asynchronousStop = 
amConfig.getTezConfiguration().getBoolean(
+                TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP,
+                TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT);
+            if (!asynchronousStop) {
+              LOG.info("Waiting until application is in a final state");
+              long currentTimeMillis = System.currentTimeMillis();
+              long timeKillIssued = currentTimeMillis;
+              long killTimeOut = amConfig.getTezConfiguration().getLong(
+                  TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS,
+                  TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT);
+              ApplicationReport appReport = frameworkClient
+                  .getApplicationReport(sessionAppId);
+              while ((currentTimeMillis < timeKillIssued + killTimeOut)
+                  && 
!isJobInTerminalState(appReport.getYarnApplicationState())) {
+                try {
+                  Thread.sleep(1000L);
+                } catch (InterruptedException ie) {
+                  /** interrupted, just break */
+                  break;
+                }
+                currentTimeMillis = System.currentTimeMillis();
+                appReport = frameworkClient.getApplicationReport(sessionAppId);
+              }
+
+              if (!isJobInTerminalState(appReport.getYarnApplicationState())) {
+                frameworkClient.killApplication(sessionAppId);
+              }
+            }
           }
         } catch (TezException e) {
           LOG.info("Failed to shutdown Tez Session via proxy", e);
         } catch (ServiceException e) {
           LOG.info("Failed to shutdown Tez Session via proxy", e);
+        } catch (ApplicationNotFoundException e) {
+          LOG.info("Failed to kill nonexistent application " + sessionAppId, 
e);
+        } catch (YarnException e) {
+          throw new TezException(e);
         }
         if (!sessionShutdownSuccessful) {
           LOG.info("Could not connect to AM, killing session via YARN"
@@ -590,6 +623,11 @@ public class TezClient {
       }
     }
   }
+  private boolean isJobInTerminalState(YarnApplicationState 
yarnApplicationState) {
+    return (yarnApplicationState == YarnApplicationState.FINISHED
+        || yarnApplicationState == YarnApplicationState.FAILED
+        || yarnApplicationState == YarnApplicationState.KILLED);
+  }
 
   /**
    * Get the name of the client

http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index bb404ee..54d490b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1286,4 +1286,23 @@ public class TezConfiguration extends Configuration {
       TEZ_PREFIX + "java.opts.checker.enabled";
   public static final boolean TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT = 
true;
 
+  /**
+   * Long value. Time interval, in milliseconds, for client to wait during 
client-requested
+   * AM shutdown before issuing a hard kill to the RM for this application.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.CLIENT)
+  public static final String TEZ_CLIENT_HARD_KILL_TIMEOUT_MS = TEZ_PREFIX + 
"client.timeout-ms";
+
+  public static final long TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT = 30 * 
1000L;
+
+  /**
+   * Boolean value. Backwards compatibility setting. Changes TezClient stop to 
be a
+   * synchronous call waiting until AM is in a final state before returning to 
the user.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.CLIENT)
+  public static final String TEZ_CLIENT_ASYNCHRONOUS_STOP = TEZ_PREFIX + 
"client.asynchronous-stop";
+
+  public static final boolean TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT = true;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java 
b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 2c3cb36..55c4c0f 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -36,6 +36,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -80,6 +81,7 @@ import com.google.common.collect.Maps;
 import com.google.protobuf.RpcController;
 
 public class TestTezClient {
+  static final long HARD_KILL_TIMEOUT = 1500L;
 
   class TezClientForTest extends TezClient {
     TezYarnClient mockTezYarnClient;
@@ -125,6 +127,8 @@ public class TestTezClient {
     }
     conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
+    conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false);
+    conf.setLong(TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS, 
HARD_KILL_TIMEOUT);
     TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
 
     ApplicationId appId1 = ApplicationId.newInstance(0, 1);
@@ -524,4 +528,38 @@ public class TestTezClient {
     client.start();
   }
 
+  @Test(timeout = 5000)
+  public void testStopRetriesUntilTerminalState() throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false);
+    final TezClientForTest client = configureAndCreateTezClient(conf);
+    client.start();
+    
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+        
.thenReturn(YarnApplicationState.NEW).thenReturn(YarnApplicationState.KILLED);
+    try {
+      client.stop();
+    } catch (Exception e) {
+      Assert.fail("Expected ApplicationNotFoundException");
+    }
+    verify(client.mockYarnClient, 
atLeast(2)).getApplicationReport(client.mockAppId);
+  }
+
+  @Test(timeout = 5000)
+  public void testStopRetriesUntilTimeout() throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false);
+    final TezClientForTest client = configureAndCreateTezClient(conf);
+    client.start();
+    
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+        .thenReturn(YarnApplicationState.NEW);
+    long start = System.currentTimeMillis();
+    try {
+      client.stop();
+    } catch (Exception e) {
+      Assert.fail("Stop should complete without exception: " + e);
+    }
+    long end = System.currentTimeMillis();
+    verify(client.mockYarnClient, 
atLeast(2)).getApplicationReport(client.mockAppId);
+    Assert.assertTrue("Stop ended before timeout", end - start > 
HARD_KILL_TIMEOUT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 34e7c2a..613046b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1220,12 +1220,7 @@ public class DAGAppMaster extends AbstractService {
         //send a DAG_KILL message
         LOG.info("Sending a kill event to the current DAG"
             + ", dagId=" + currentDAG.getID());
-        try {
-          logDAGKillRequestEvent(currentDAG.getID(), true);
-        } catch (IOException e) {
-          throw new TezException(e);
-        }
-        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+        tryKillDAG(currentDAG);
       } else {
         LOG.info("No current running DAG, shutting down the AM");
         if (isSession && !state.equals(DAGAppMasterState.ERROR)) {

Reply via email to