Repository: tez
Updated Branches:
  refs/heads/master c34e46c73 -> 82d73b380


TEZ-3892: getClient API for TezClient (Eric Wohlstadter via Gopal V)

Signed-off-by: Gopal V <gop...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/82d73b38
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/82d73b38
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/82d73b38

Branch: refs/heads/master
Commit: 82d73b380881ef8e7d6e6c963289c4f479bbea59
Parents: c34e46c
Author: Eric Wohlstadter <ewohlstad...@hortonworks.com>
Authored: Wed Mar 7 15:50:45 2018 -0800
Committer: Gopal V <gop...@apache.org>
Committed: Wed Mar 7 15:50:45 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezClient.java   | 230 +++++++++++++------
 .../org/apache/tez/client/TezClientUtils.java   |  21 +-
 .../org/apache/tez/client/TestTezClient.java    |  64 +++++-
 .../org/apache/tez/examples/TezExampleBase.java |  26 ++-
 4 files changed, 252 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/82d73b38/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 65ce0fb..d2c1af4 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -109,6 +109,9 @@ import com.google.protobuf.ServiceException;
 public class TezClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(TezClient.class);
+
+  private static final String appIdStrPrefix = "application";
+  private static final String APPLICATION_ID_PREFIX = appIdStrPrefix + '_';
   
   @VisibleForTesting
   static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics 
found.";
@@ -377,40 +380,14 @@ public class TezClient {
    */
   public synchronized void start() throws TezException, IOException {
     amConfig.setYarnConfiguration(new 
YarnConfiguration(amConfig.getTezConfiguration()));
-
-    frameworkClient = createFrameworkClient();
-    frameworkClient.init(amConfig.getTezConfiguration(), 
amConfig.getYarnConfiguration());
-    frameworkClient.start();
-
-    if (this.amConfig.getTezConfiguration().getBoolean(
-        TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED,
-        TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) {
-      String javaOptsCheckerClassName = 
this.amConfig.getTezConfiguration().get(
-          TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS, "");
-      if (!javaOptsCheckerClassName.isEmpty()) {
-        try {
-          javaOptsChecker = 
ReflectionUtils.createClazzInstance(javaOptsCheckerClassName);
-        } catch (Exception e) {
-          LOG.warn("Failed to initialize configured Java Opts Checker"
-              + " (" + TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS
-              + ") , checkerClass=" + javaOptsCheckerClassName
-              + ". Disabling checker.", e);
-          javaOptsChecker = null;
-        }
-      } else {
-        javaOptsChecker = new JavaOptsChecker();
-      }
-
-    }
-
+    startFrameworkClient();
+    setupJavaOptsChecker();
 
     if (isSession) {
       LOG.info("Session mode. Starting session.");
       TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
           amConfig.getTezConfiguration());
   
-      Map<String, LocalResource> tezJarResources = 
getTezJarResources(sessionCredentials);
-  
       clientTimeout = amConfig.getTezConfiguration().getInt(
           TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
           TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
@@ -420,23 +397,7 @@ public class TezClient {
           sessionAppId = createApplication();
         }
   
-        // Add session token for shuffle
-        TezClientUtils.createSessionToken(sessionAppId.toString(),
-            jobTokenSecretManager, sessionCredentials);
-  
-        ApplicationSubmissionContext appContext =
-            TezClientUtils.createApplicationSubmissionContext(
-                sessionAppId,
-                null, clientName, amConfig,
-                tezJarResources, sessionCredentials, usingTezArchiveDeploy, 
apiVersionInfo,
-                servicePluginsDescriptor, javaOptsChecker);
-  
-        // Set Tez Sessions to not retry on AM crashes if recovery is disabled
-        if (!amConfig.getTezConfiguration().getBoolean(
-            TezConfiguration.DAG_RECOVERY_ENABLED,
-            TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
-          appContext.setMaxAppAttempts(1);
-        }  
+        ApplicationSubmissionContext appContext = setupApplicationContext();
         frameworkClient.submitApplication(appContext);
         ApplicationReport appReport = 
frameworkClient.getApplicationReport(sessionAppId);
         LOG.info("The url to track the Tez Session: " + 
appReport.getTrackingUrl());
@@ -445,31 +406,136 @@ public class TezClient {
         throw new TezException(e);
       }
 
-      long amClientKeepAliveTimeoutIntervalMillis =
-          
TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConfig.getTezConfiguration());
-      // Poll at minimum of 1 second interval
-      long pollPeriod = TezCommonUtils.
-          
getAMClientHeartBeatPollIntervalMillis(amConfig.getTezConfiguration(),
-              amClientKeepAliveTimeoutIntervalMillis, 10);
-
-      boolean isLocal = amConfig.getTezConfiguration().getBoolean(
-          TezConfiguration.TEZ_LOCAL_MODE, 
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
-      if (!isLocal && amClientKeepAliveTimeoutIntervalMillis > 0) {
-        amKeepAliveService = Executors.newSingleThreadScheduledExecutor(
-            new ThreadFactoryBuilder()
-                .setDaemon(true).setNameFormat("AMKeepAliveThread 
#%d").build());
-        amKeepAliveService.scheduleWithFixedDelay(new Runnable() {
-
-          private DAGClientAMProtocolBlockingPB proxy;
-
-          @Override
-          public void run() {
-            proxy = sendAMHeartbeat(proxy);
-          }
-        }, pollPeriod, pollPeriod, TimeUnit.MILLISECONDS);
+      startClientHeartbeat();
+      this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
+    }
+  }
+
+  public synchronized TezClient getClient(String appIdStr) throws IOException, 
TezException {
+    return getClient(appIdfromString(appIdStr));
+  }
+
+  /**
+   * Alternative to start() that explicitly sets sessionAppId and doesn't 
start a new AM.
+   * The caller of getClient is responsible for initializing the new TezClient 
with a
+   * Configuration compatible with the existing AM. It is expected the caller 
has cached the
+   * original Configuration (e.g. in Zookeeper).
+   *
+   * In contrast to "start", no resources are localized. It is the 
responsibility of the caller to
+   * ensure that existing localized resources and staging dirs are still valid.
+   *
+   * @param appId
+   * @return 'this' just as a convenience for fluent style chaining
+   */
+  public synchronized TezClient getClient(ApplicationId appId) throws 
TezException, IOException {
+    sessionAppId = appId;
+    amConfig.setYarnConfiguration(new 
YarnConfiguration(amConfig.getTezConfiguration()));
+    startFrameworkClient();
+    setupJavaOptsChecker();
+
+    if (!isSession) {
+      String msg = "Must be in session mode to bind TezClient to existing AM";
+      LOG.error(msg);
+      throw new IllegalStateException(msg);
+    }
+
+    LOG.info("Session mode. Reconnecting to session: " + 
sessionAppId.toString());
+
+    clientTimeout = amConfig.getTezConfiguration().getInt(
+            TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
+            TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
+
+    try {
+      setupApplicationContext();
+      ApplicationReport appReport = 
frameworkClient.getApplicationReport(sessionAppId);
+      LOG.info("The url to track the Tez Session: " + 
appReport.getTrackingUrl());
+      sessionStarted.set(true);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+
+    startClientHeartbeat();
+    this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
+    return this;
+  }
+
+  private void startFrameworkClient() {
+    frameworkClient = createFrameworkClient();
+    frameworkClient.init(amConfig.getTezConfiguration(), 
amConfig.getYarnConfiguration());
+    frameworkClient.start();
+  }
+
+  private ApplicationSubmissionContext setupApplicationContext() throws 
IOException, YarnException {
+    TezClientUtils.processTezLocalCredentialsFile(sessionCredentials,
+            amConfig.getTezConfiguration());
+
+    Map<String, LocalResource> tezJarResources = 
getTezJarResources(sessionCredentials);
+    // Add session token for shuffle
+    TezClientUtils.createSessionToken(sessionAppId.toString(),
+            jobTokenSecretManager, sessionCredentials);
+
+    ApplicationSubmissionContext appContext =
+            TezClientUtils.createApplicationSubmissionContext(
+                    sessionAppId,
+                    null, clientName, amConfig,
+                    tezJarResources, sessionCredentials, 
usingTezArchiveDeploy, apiVersionInfo,
+                    servicePluginsDescriptor, javaOptsChecker);
+
+    // Set Tez Sessions to not retry on AM crashes if recovery is disabled
+    if (!amConfig.getTezConfiguration().getBoolean(
+            TezConfiguration.DAG_RECOVERY_ENABLED,
+            TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+      appContext.setMaxAppAttempts(1);
+    }
+    return appContext;
+  }
+
+  private void setupJavaOptsChecker() {
+    if (this.amConfig.getTezConfiguration().getBoolean(
+            TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED,
+            TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT)) {
+      String javaOptsCheckerClassName = 
this.amConfig.getTezConfiguration().get(
+              TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS, "");
+      if (!javaOptsCheckerClassName.isEmpty()) {
+        try {
+          javaOptsChecker = 
ReflectionUtils.createClazzInstance(javaOptsCheckerClassName);
+        } catch (Exception e) {
+          LOG.warn("Failed to initialize configured Java Opts Checker"
+                  + " (" + TezConfiguration.TEZ_CLIENT_JAVA_OPTS_CHECKER_CLASS
+                  + ") , checkerClass=" + javaOptsCheckerClassName
+                  + ". Disabling checker.", e);
+          javaOptsChecker = null;
+        }
+      } else {
+        javaOptsChecker = new JavaOptsChecker();
       }
 
-      this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
+    }
+  }
+
+  private void startClientHeartbeat() {
+    long amClientKeepAliveTimeoutIntervalMillis =
+            
TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConfig.getTezConfiguration());
+    // Poll at minimum of 1 second interval
+    long pollPeriod = TezCommonUtils.
+            
getAMClientHeartBeatPollIntervalMillis(amConfig.getTezConfiguration(),
+                    amClientKeepAliveTimeoutIntervalMillis, 10);
+
+    boolean isLocal = amConfig.getTezConfiguration().getBoolean(
+            TezConfiguration.TEZ_LOCAL_MODE, 
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+    if (!isLocal && amClientKeepAliveTimeoutIntervalMillis > 0) {
+      amKeepAliveService = Executors.newSingleThreadScheduledExecutor(
+              new ThreadFactoryBuilder()
+                      .setDaemon(true).setNameFormat("AMKeepAliveThread 
#%d").build());
+      amKeepAliveService.scheduleWithFixedDelay(new Runnable() {
+
+        private DAGClientAMProtocolBlockingPB proxy;
+
+        @Override
+        public void run() {
+          proxy = sendAMHeartbeat(proxy);
+        }
+      }, pollPeriod, pollPeriod, TimeUnit.MILLISECONDS);
     }
   }
 
@@ -1211,4 +1277,32 @@ public class TezClient {
           servicePluginsDescriptor);
     }
   }
+
+  //Copied this helper method from 
+  //org.apache.hadoop.yarn.api.records.ApplicationId in Hadoop 2.8+
+  //to simplify implementation on 2.7.x
+  @Public
+  @Unstable
+  public static ApplicationId appIdfromString(String appIdStr) {
+    if (!appIdStr.startsWith(APPLICATION_ID_PREFIX)) {
+      throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+              + appIdStr + ". The valid ApplicationId should start with prefix 
"
+              + appIdStrPrefix);
+    }
+    try {
+      int pos1 = APPLICATION_ID_PREFIX.length() - 1;
+      int pos2 = appIdStr.indexOf('_', pos1 + 1);
+      if (pos2 < 0) {
+        throw new IllegalArgumentException("Invalid ApplicationId: "
+                + appIdStr);
+      }
+      long rmId = Long.parseLong(appIdStr.substring(pos1 + 1, pos2));
+      int appId = Integer.parseInt(appIdStr.substring(pos2 + 1));
+      ApplicationId applicationId = ApplicationId.newInstance(rmId, appId);
+      return applicationId;
+    } catch (NumberFormatException n) {
+      throw new IllegalArgumentException("Invalid ApplicationId: "
+              + appIdStr, n);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/82d73b38/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index f9316e5..caf610d 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -575,16 +575,19 @@ public class TezClientUtils {
     }
 
     // emit conf as PB file
-    ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf,
-        servicePluginsDescriptor);
+    // don't overwrite existing conf, needed for TezClient.getClient() so 
existing containers have stable resource fingerprints
+    if(!binaryConfPath.getFileSystem(tezConf).exists(binaryConfPath)) {
+      ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf,
+              servicePluginsDescriptor);
 
-    FSDataOutputStream amConfPBOutBinaryStream = null;
-    try {
-      amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, 
binaryConfPath);
-      finalConfProto.writeTo(amConfPBOutBinaryStream);
-    } finally {
-      if(amConfPBOutBinaryStream != null){
-        amConfPBOutBinaryStream.close();
+      FSDataOutputStream amConfPBOutBinaryStream = null;
+      try {
+        amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, 
binaryConfPath);
+        finalConfProto.writeTo(amConfPBOutBinaryStream);
+      } finally {
+        if (amConfPBOutBinaryStream != null) {
+          amConfPBOutBinaryStream.close();
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/82d73b38/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java 
b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 89310df..0cbef76 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -168,12 +168,12 @@ public class TestTezClient {
   
   @Test (timeout = 5000)
   public void testTezclientApp() throws Exception {
-    testTezClient(false);
+    testTezClient(false, true);
   }
   
   @Test (timeout = 5000)
   public void testTezclientSession() throws Exception {
-    testTezClient(true);
+    testTezClient(true, true);
   }
 
   @Test (timeout = 5000)
@@ -238,8 +238,51 @@ public class TestTezClient {
       assertTrue(request.hasAdditionalAmResources());
     }
   }
+
+  @Test (timeout = 5000)
+  public void testGetClient() throws Exception {
+    /* BEGIN first TezClient usage without calling stop() */
+    TezClientForTest client = testTezClient(true, false);
+    /* END first TezClient usage without calling stop() */
+
+    /* BEGIN reuse of AM from new TezClient */
+    ArgumentCaptor<ApplicationSubmissionContext> captor = 
ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
+    
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+            .thenReturn(YarnApplicationState.RUNNING);
+
+    //Reuse existing appId from first TezClient
+    ApplicationId existingAppId = client.mockAppId;
+    TezClientForTest client2 = configureAndCreateTezClient(null, true,
+            client.amConfig.getTezConfiguration());
+    String mockLR1Name = "LR1";
+    Map<String, LocalResource> lrDAG = Collections.singletonMap(mockLR1Name, 
LocalResource
+            .newInstance(URL.newInstance("file", "localhost", 0, "/test1"), 
LocalResourceType.FILE,
+                    LocalResourceVisibility.PUBLIC, 1, 1));
+    Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1,
+            Resource.newInstance(1, 1));
+    DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
+
+    //Bind TezClient to existing app and submit a dag
+    DAGClient dagClient = client2.getClient(existingAppId).submitDAG(dag);
+
+    
assertTrue(dagClient.getExecutionContext().contains(existingAppId.toString()));
+    assertEquals(dagClient.getSessionIdentifierString(), 
existingAppId.toString());
+
+    // Validate request for new AM is not submitted to RM */
+    verify(client2.mockYarnClient, 
times(0)).submitApplication(captor.capture());
+
+    // Validate dag submission from second TezClient as normal */
+    verify(client2.sessionAmProxy, times(1)).submitDAG((RpcController)any(), 
(SubmitDAGRequestProto) any());
+
+    // Validate stop from new TezClient as normal */
+    client2.stop();
+    verify(client2.sessionAmProxy, times(1)).shutdownSession((RpcController) 
any(),
+            (ShutdownSessionRequestProto) any());
+    verify(client2.mockYarnClient, times(1)).stop();
+    /* END reuse of AM from new TezClient */
+  }
   
-  public void testTezClient(boolean isSession) throws Exception {
+  public TezClientForTest testTezClient(boolean isSession, boolean shouldStop) 
throws Exception {
     Map<String, LocalResource> lrs = Maps.newHashMap();
     String lrName1 = "LR1";
     lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", 
"localhost", 0, "/test"),
@@ -343,13 +386,16 @@ public class TestTezClient {
       assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           lrName2));
     }
-    
-    client.stop();
-    if (isSession) {
-      verify(client.sessionAmProxy, times(1)).shutdownSession((RpcController) 
any(),
-          (ShutdownSessionRequestProto) any());
+
+    if(shouldStop) {
+      client.stop();
+      if (isSession) {
+        verify(client.sessionAmProxy, 
times(1)).shutdownSession((RpcController) any(),
+                (ShutdownSessionRequestProto) any());
+      }
+      verify(client.mockYarnClient, times(1)).stop();
     }
-    verify(client.mockYarnClient, times(1)).stop();
+    return client;
   }
 
   @Test (timeout=5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/82d73b38/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git 
a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java 
b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index a3c0224..6b626b1 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -63,11 +63,16 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
   protected static final String LOCAL_MODE = "local";
   protected static final String COUNTER_LOG = "counter";
   protected static final String GENERATE_SPLIT_IN_CLIENT = 
"generateSplitInClient";
+  protected static final String LEAVE_AM_RUNNING = "leaveAmRunning";
+  protected static final String RECONNECT_APP_ID = "reconnectAppId";
+
 
   private boolean disableSplitGrouping = false;
   private boolean isLocalMode = false;
   private boolean isCountersLog = false;
   private boolean generateSplitInClient = false;
+  private boolean leaveAmRunning = false;
+  private String reconnectAppId;
   private HadoopShim hadoopShim;
 
   protected boolean isCountersLog() {
@@ -88,6 +93,8 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
     options.addOption(DISABLE_SPLIT_GROUPING, false , "disable split 
grouping");
     options.addOption(COUNTER_LOG, false , "print counter log");
     options.addOption(GENERATE_SPLIT_IN_CLIENT, false, "whether generate split 
in client");
+    options.addOption(LEAVE_AM_RUNNING, false, "whether client should stop 
session");
+    options.addOption(RECONNECT_APP_ID, true, "appId for client reconnect");
     return options;
   }
 
@@ -108,6 +115,12 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
     if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) {
       generateSplitInClient = true;
     }
+    if (optionParser.getCommandLine().hasOption(LEAVE_AM_RUNNING)) {
+      leaveAmRunning = true;
+    }
+    if (optionParser.getCommandLine().hasOption(RECONNECT_APP_ID)) {
+        reconnectAppId = 
optionParser.getCommandLine().getOptionValue(RECONNECT_APP_ID);
+    }
     hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
 
     return _execute(otherArgs, null, null);
@@ -231,15 +244,20 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
     try {
       return runJob(otherArgs, tezConf, tezClientInternal);
     } finally {
-      if (ownTezClient && tezClientInternal != null) {
+      if (ownTezClient && tezClientInternal != null && !leaveAmRunning) {
         tezClientInternal.stop();
       }
     }
   }
 
   private TezClient createTezClient(TezConfiguration tezConf) throws 
IOException, TezException {
-    TezClient tezClient = TezClient.create(getClass().getSimpleName(), 
tezConf);
-    tezClient.start();
+    TezClient tezClient = TezClient.create("TezExampleApplication", tezConf);
+    if(reconnectAppId != null) {
+      ApplicationId appId = TezClient.appIdfromString(reconnectAppId);
+      tezClient.getClient(appId);
+    } else {
+      tezClient.start();
+    }
     return tezClient;
   }
 
@@ -265,6 +283,8 @@ public abstract class TezExampleBase extends Configured 
implements Tool {
         + " enable split grouping without this option.");
     ps.println("-" + COUNTER_LOG + "\t\t to print counters information");
     ps.println("-" + GENERATE_SPLIT_IN_CLIENT + "\t\tgenerate input split in 
client");
+    ps.println("-" + LEAVE_AM_RUNNING + "\t\twhether client should stop 
session");
+    ps.println("-" + RECONNECT_APP_ID + "\t\tappId for client reconnect");
     ps.println();
     ps.println("The Tez example extra options usage syntax is ");
     ps.println("example_name [extra_options] [example_parameters]");

Reply via email to