This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 61a28c7  TEZ-4236: DAGClientServer is not really needed to be 
started/used in local mode (László Bodor reviewed by Ashutosh Chauhan)
61a28c7 is described below

commit 61a28c75c3885f0a043d304091c2a303dde5b617
Author: László Bodor <[email protected]>
AuthorDate: Thu Jan 28 22:17:32 2021 +0100

    TEZ-4236: DAGClientServer is not really needed to be started/used in local 
mode (László Bodor reviewed by Ashutosh Chauhan)
    
    Signed-off-by: Laszlo Bodor <[email protected]>
---
 .../org/apache/tez/client/FrameworkClient.java     | 109 ++++++++++++++++
 .../main/java/org/apache/tez/client/TezClient.java | 143 +++++----------------
 .../java/org/apache/tez/client/TezClientUtils.java |  14 +-
 .../org/apache/tez/dag/api/TezConfiguration.java   |  13 ++
 .../apache/tez/dag/api/client/DAGClientImpl.java   |   2 +-
 .../tez/dag/api/client/DAGClientImplLocal.java     |  53 ++++++++
 .../java/org/apache/tez/client/TestTezClient.java  |  55 +++++---
 .../java/org/apache/tez/client/LocalClient.java    |  79 +++++++++++-
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |   8 +-
 .../org/apache/tez/dag/app/LocalDAGAppMaster.java  |  51 ++++++++
 .../tez/dag/app/rm/TaskSchedulerManager.java       |   4 +-
 .../org/apache/tez/dag/app/MockLocalClient.java    |   4 +-
 .../java/org/apache/tez/test/TestLocalMode.java    |  10 +-
 13 files changed, 394 insertions(+), 151 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java 
b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index 7c60ec1..2ec6d28 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -21,20 +21,39 @@ package org.apache.tez.client;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.RPCUtil;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DAGSubmissionTimedOut;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezReflectionException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGClientImpl;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
+import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
+import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ServiceException;
 
 @Private
 public abstract class FrameworkClient {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(FrameworkClient.class);
 
   public static FrameworkClient createFrameworkClient(TezConfiguration 
tezConf) {
 
@@ -77,4 +96,94 @@ public abstract class FrameworkClient {
 
   public abstract boolean isRunning() throws IOException;
 
+  public TezAppMasterStatus getAMStatus(Configuration conf, ApplicationId 
appId,
+      UserGroupInformation ugi) throws TezException, ServiceException, 
IOException {
+    DAGClientAMProtocolBlockingPB proxy = getProxy(conf, appId, ugi);
+
+    if (proxy == null) {
+      return TezAppMasterStatus.INITIALIZING;
+    }
+    GetAMStatusResponseProto response =
+        proxy.getAMStatus(null, GetAMStatusRequestProto.newBuilder().build());
+    return 
DagTypeConverters.convertTezAppMasterStatusFromProto(response.getStatus());
+  }
+
+  public DAGClient submitDag(DAG dag, SubmitDAGRequestProto request, String 
clientName,
+      ApplicationId sessionAppId, long clientTimeout, UserGroupInformation 
ugi, TezConfiguration tezConf)
+      throws IOException, TezException, DAGSubmissionTimedOut {
+    DAGClientAMProtocolBlockingPB proxy = null;
+    try {
+      proxy = waitForProxy(clientTimeout, tezConf, sessionAppId, ugi);
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while trying to create a connection 
to the AM", e);
+    }
+    if (proxy == null) {
+      try {
+        LOG.warn("DAG submission to session timed out, stopping session");
+        stop();
+      } catch (Throwable t) {
+        LOG.info("Got an exception when trying to stop session", t);
+      }
+      throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
+          + ", timed out after " + clientTimeout + " seconds");
+    }
+
+    String dagId = null;
+    try {
+      SubmitDAGResponseProto response = proxy.submitDAG(null, request);
+      // the following check is only for testing since the final class
+      // SubmitDAGResponseProto cannot be mocked
+      if (response != null) {
+        dagId = response.getDagId();
+      }
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+    }
+
+    LOG.info("Submitted dag to TezSession"
+        + ", sessionName=" + clientName
+        + ", applicationId=" + sessionAppId
+        + ", dagId=" + dagId
+        + ", dagName=" + dag.getName());
+    return new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
+  }
+
+  protected DAGClientAMProtocolBlockingPB waitForProxy(long clientTimeout, 
Configuration conf,
+      ApplicationId sessionAppId, UserGroupInformation ugi)
+      throws IOException, TezException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    long endTime = startTime + (clientTimeout * 1000);
+    DAGClientAMProtocolBlockingPB proxy = null;
+    while (true) {
+      proxy = TezClientUtils.getAMProxy(this, conf, sessionAppId, ugi);
+      if (proxy != null) {
+        break;
+      }
+      Thread.sleep(100L);
+      if (clientTimeout != -1 && System.currentTimeMillis() > endTime) {
+        break;
+      }
+    }
+    return proxy;
+  }
+
+  /**
+   * Shuts down session and returns a boolean=true if a proxy was successfully 
created and through
+   * that proxy a shutdownSession was called.
+   */
+  public boolean shutdownSession(Configuration conf, ApplicationId 
sessionAppId,
+      UserGroupInformation ugi) throws TezException, IOException, 
ServiceException {
+    DAGClientAMProtocolBlockingPB proxy = getProxy(conf, sessionAppId, ugi);
+    if (proxy != null) {
+      ShutdownSessionRequestProto request = 
ShutdownSessionRequestProto.newBuilder().build();
+      proxy.shutdownSession(null, request);
+      return true;
+    }
+    return false;
+  }
+
+  protected DAGClientAMProtocolBlockingPB getProxy(Configuration conf, 
ApplicationId sessionAppId,
+      UserGroupInformation ugi) throws TezException, IOException {
+    return TezClientUtils.getAMProxy(this, conf, sessionAppId, ugi);
+  }
 }
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index fbe3509..da213b8 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.JavaOptsChecker;
-import org.apache.tez.common.RPCUtil;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.dag.api.TezConfigurationConstants;
@@ -75,10 +74,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
-import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
-import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
-import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
 import org.apache.tez.dag.api.client.DAGClientImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 
@@ -123,7 +119,8 @@ public class TezClient {
   private ApplicationId lastSubmittedAppId;
   @VisibleForTesting
   final AMConfiguration amConfig;
-  private FrameworkClient frameworkClient;
+  @VisibleForTesting
+  FrameworkClient frameworkClient;
   private String diagnostics;
   @VisibleForTesting
   final boolean isSession;
@@ -158,7 +155,7 @@ public class TezClient {
 
   private TezClient(String name, TezConfiguration tezConf) {
     this(name, tezConf, tezConf.getBoolean(
-        TezConfiguration.TEZ_AM_SESSION_MODE, 
TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));    
+        TezConfiguration.TEZ_AM_SESSION_MODE, 
TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));
   }
 
   @Private
@@ -549,7 +546,8 @@ public class TezClient {
     try {
       if (proxy == null) {
         try {
-          proxy = waitForProxy();
+          proxy = frameworkClient.waitForProxy(clientTimeout, 
amConfig.getTezConfiguration(),
+              sessionAppId, getUgi());
         } catch (InterruptedException e) {
           LOG.debug("Interrupted while trying to create a connection to the 
AM", e);
         } catch (SessionNotRunning e) {
@@ -629,7 +627,6 @@ public class TezClient {
     
     verifySessionStateForSubmission();
 
-    String dagId = null;
     String callerContextStr = "";
     if (dag.getCallerContext() != null) {
       callerContextStr = ", callerContext=" + 
dag.getCallerContext().contextAsSimpleString();
@@ -678,42 +675,8 @@ public class TezClient {
       }
     }
 
-    DAGClientAMProtocolBlockingPB proxy = null;
-    try {
-      proxy = waitForProxy();
-    } catch (InterruptedException e) {
-      throw new IOException("Interrupted while trying to create a connection 
to the AM", e);
-    }
-    if (proxy == null) {
-      try {
-        LOG.warn("DAG submission to session timed out, stopping session");
-        stop();
-      } catch (Throwable t) {
-        LOG.info("Got an exception when trying to stop session", t);
-      }
-      throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
-          + ", timed out after " + clientTimeout + " seconds");
-    }
-
-    try {
-      SubmitDAGResponseProto response = proxy.submitDAG(null, request);
-      // the following check is only for testing since the final class
-      // SubmitDAGResponseProto cannot be mocked
-      if (response != null) {
-        dagId = response.getDagId();
-      }
-    } catch (ServiceException e) {
-      RPCUtil.unwrapAndThrowException(e);
-    }
-
-    LOG.info("Submitted dag to TezSession"
-        + ", sessionName=" + clientName
-        + ", applicationId=" + sessionAppId
-        + ", dagId=" + dagId
-        + ", dagName=" + dag.getName());
-    return new DAGClientImpl(sessionAppId, dagId,
-        amConfig.getTezConfiguration(),
-        frameworkClient, getUgi());
+    return frameworkClient.submitDag(dag, request, clientName, sessionAppId, 
clientTimeout,
+        getUgi(), amConfig.getTezConfiguration());
   }
 
   private UserGroupInformation getUgi() throws IOException {
@@ -746,39 +709,34 @@ public class TezClient {
         sessionStopped.set(true);
         boolean sessionShutdownSuccessful = false;
         try {
-          DAGClientAMProtocolBlockingPB proxy = getAMProxy(sessionAppId);
-          if (proxy != null) {
-            ShutdownSessionRequestProto request =
-                ShutdownSessionRequestProto.newBuilder().build();
-            proxy.shutdownSession(null, request);
-            sessionShutdownSuccessful = true;
-            boolean asynchronousStop = 
amConfig.getTezConfiguration().getBoolean(
-                TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP,
-                TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT);
-            if (!asynchronousStop) {
-              LOG.info("Waiting until application is in a final state");
-              long currentTimeMillis = System.currentTimeMillis();
-              long timeKillIssued = currentTimeMillis;
-              long killTimeOut = amConfig.getTezConfiguration().getLong(
-                  TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS,
-                  TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT);
-              ApplicationReport appReport = frameworkClient
-                  .getApplicationReport(sessionAppId);
-              while ((currentTimeMillis < timeKillIssued + killTimeOut)
-                  && 
!isJobInTerminalState(appReport.getYarnApplicationState())) {
-                try {
-                  Thread.sleep(1000L);
-                } catch (InterruptedException ie) {
-                  /** interrupted, just break */
-                  break;
-                }
-                currentTimeMillis = System.currentTimeMillis();
-                appReport = frameworkClient.getApplicationReport(sessionAppId);
+          sessionShutdownSuccessful = frameworkClient
+              .shutdownSession(amConfig.getTezConfiguration(), sessionAppId, 
getUgi());
+          boolean asynchronousStop = amConfig.getTezConfiguration().getBoolean(
+              TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP,
+              TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT);
+          if (!asynchronousStop && sessionShutdownSuccessful) {
+            LOG.info("Waiting until application is in a final state");
+            long currentTimeMillis = System.currentTimeMillis();
+            long timeKillIssued = currentTimeMillis;
+            long killTimeOut = amConfig.getTezConfiguration().getLong(
+                TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS,
+                TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT);
+            ApplicationReport appReport = frameworkClient
+                .getApplicationReport(sessionAppId);
+            while ((currentTimeMillis < timeKillIssued + killTimeOut)
+                && !isJobInTerminalState(appReport.getYarnApplicationState())) 
{
+              try {
+                Thread.sleep(1000L);
+              } catch (InterruptedException ie) {
+                /** interrupted, just break */
+                break;
               }
+              currentTimeMillis = System.currentTimeMillis();
+              appReport = frameworkClient.getApplicationReport(sessionAppId);
+            }
 
-              if (!isJobInTerminalState(appReport.getYarnApplicationState())) {
-                frameworkClient.killApplication(sessionAppId);
-              }
+            if (!isJobInTerminalState(appReport.getYarnApplicationState())) {
+              frameworkClient.killApplication(sessionAppId);
             }
           }
         } catch (TezException e) {
@@ -873,14 +831,7 @@ public class TezClient {
         return TezAppMasterStatus.SHUTDOWN;
       case RUNNING:
         try {
-          DAGClientAMProtocolBlockingPB proxy = getAMProxy(appId);
-          if (proxy == null) {
-            return TezAppMasterStatus.INITIALIZING;
-          }
-          GetAMStatusResponseProto response = proxy.getAMStatus(null,
-              GetAMStatusRequestProto.newBuilder().build());
-          return DagTypeConverters.convertTezAppMasterStatusFromProto(
-              response.getStatus());
+          return frameworkClient.getAMStatus(amConfig.getTezConfiguration(), 
appId, getUgi());
         } catch (TezException e) {
           LOG.info("Failed to retrieve AM Status via proxy", e);
         } catch (ServiceException e) {
@@ -1059,32 +1010,6 @@ public class TezClient {
     return 
FrameworkClient.createFrameworkClient(amConfig.getTezConfiguration());
   }
 
-  @VisibleForTesting
-  // for testing
-  protected DAGClientAMProtocolBlockingPB getAMProxy(ApplicationId appId)
-      throws TezException, IOException {
-    return TezClientUtils.getAMProxy(
-        frameworkClient, amConfig.getTezConfiguration(), appId, getUgi());
-  }
-
-  private DAGClientAMProtocolBlockingPB waitForProxy()
-      throws IOException, TezException, InterruptedException {
-    long startTime = System.currentTimeMillis();
-    long endTime = startTime + (clientTimeout * 1000);
-    DAGClientAMProtocolBlockingPB proxy = null;
-    while (true) {
-      proxy = getAMProxy(sessionAppId);
-      if (proxy != null) {
-        break;
-      }
-      Thread.sleep(100l);
-      if (clientTimeout != -1 && System.currentTimeMillis() > endTime) {
-        break;
-      }
-    }
-    return proxy;
-  }
-
   private void verifySessionStateForSubmission() throws SessionNotRunning {
     Preconditions.checkState(isSession, "Invalid without session mode");
     if (!sessionStarted.get()) {
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8e7ccaf..79069ed 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -912,13 +912,12 @@ public class TezClientUtils {
     return textPath;
   }
 
-  static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient yarnClient,
-      Configuration conf,
-      ApplicationId applicationId, UserGroupInformation ugi) throws 
TezException, IOException {
+  static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient 
frameworkClient,
+      Configuration conf, ApplicationId applicationId, UserGroupInformation 
ugi)
+      throws TezException, IOException {
     ApplicationReport appReport;
     try {
-      appReport = yarnClient.getApplicationReport(
-          applicationId);
+      appReport = frameworkClient.getApplicationReport(applicationId);
 
       if(appReport == null) {
         throw new TezUncheckedException("Could not retrieve application report"
@@ -948,8 +947,9 @@ public class TezClientUtils {
     } catch (YarnException e) {
       throw new TezException(e);
     }
-    return getAMProxy(conf, appReport.getHost(),
-        appReport.getRpcPort(), appReport.getClientToAMToken(), ugi);
+
+    return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(),
+        appReport.getClientToAMToken(), ugi);
   }
 
   @Private
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 179b195..05eb4b2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1749,6 +1749,19 @@ public class TezConfiguration extends Configuration {
   public static final boolean TEZ_LOCAL_MODE_DEFAULT = false;
 
   /**
+  * Boolean value. Enable local mode execution in Tez without using network 
for communicating with
+  * DAGAppMaster. This option only makes sense when {@link #TEZ_LOCAL_MODE} is 
true. When
+  * TEZ_LOCAL_MODE_WITHOUT_NETWORK is turned on, LocalClient will call 
DAGAppMaster's methods
+  * directly.
+  */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type = "boolean")
+  public static final String TEZ_LOCAL_MODE_WITHOUT_NETWORK =
+      TEZ_PREFIX + "local.mode.without.network";
+
+  public static final boolean TEZ_LOCAL_MODE_WITHOUT_NETWORK_DEFAULT = false;
+
+  /**
    * String value. TezLocalCacheManager uses this folder as a root for temp 
and localized files.
    */
   @ConfigurationScope(Scope.VERTEX)
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index b54db32..e58863f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -209,7 +209,7 @@ public class DAGClientImpl extends DAGClient {
     }
   }
 
-  private DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> 
statusOptions,
+  protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> 
statusOptions,
       long timeout) throws TezException, IOException {
 
     if (!dagCompleted) {
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
new file mode 100644
index 0000000..a0509cd
--- /dev/null
+++ 
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api.client;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+
+/**
+ * A DAGClientImpl which is typically used for 
tez.local.mode.without.network=true.
+ */
+public class DAGClientImplLocal extends DAGClientImpl {
+
+  private BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction;
+
+  public DAGClientImplLocal(ApplicationId appId, String dagId, 
TezConfiguration conf,
+      FrameworkClient frameworkClient, UserGroupInformation ugi,
+      BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction) {
+    super(appId, dagId, conf, frameworkClient, ugi);
+    this.dagStatusFunction = dagStatusFunction;
+  }
+
+  @Override
+  protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> 
statusOptions, long timeout)
+      throws TezException, IOException {
+    return dagStatusFunction.apply(statusOptions == null ? new HashSet<>() : 
statusOptions,
+        timeout);
+  }
+}
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java 
b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 7316452..67c4a60 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -93,7 +94,6 @@ import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResp
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
-import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
 import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
@@ -112,7 +112,6 @@ public class TestTezClient {
       TestTezClient.class.getName()).getAbsoluteFile();
 
   class TezClientForTest extends TezClient {
-    TezYarnClient mockTezYarnClient;
     DAGClientAMProtocolBlockingPB sessionAmProxy;
     YarnClient mockYarnClient;
     ApplicationId mockAppId;
@@ -120,23 +119,13 @@ public class TestTezClient {
     Long prewarmTimeoutMs;
 
     public TezClientForTest(String name, TezConfiguration tezConf,
-        @Nullable Map<String, LocalResource> localResources,
-        @Nullable Credentials credentials) {
+        @Nullable Map<String, LocalResource> localResources, @Nullable 
Credentials credentials) {
       super(name, tezConf, localResources, credentials);
     }
-    
+
     @Override
     protected FrameworkClient createFrameworkClient() {
-      return mockTezYarnClient;
-    }
-    
-    @Override
-    protected DAGClientAMProtocolBlockingPB getAMProxy(ApplicationId appId)
-        throws TezException, IOException {
-      if (!callRealGetSessionAMProxy) {
-        return sessionAmProxy;
-      }
-      return super.getAMProxy(appId);
+      return frameworkClient; // already initialized
     }
 
     public void setPrewarmTimeoutMs(Long prewarmTimeoutMs) {
@@ -148,7 +137,34 @@ public class TestTezClient {
       return prewarmTimeoutMs == null ? super.getPrewarmWaitTimeMs() : 
prewarmTimeoutMs;
     }
   }
-  
+
+  class TezYarnClientForTest extends TezYarnClient {
+    private TezClientForTest client;
+
+    protected TezYarnClientForTest(YarnClient yarnClient, TezClientForTest 
client) {
+      super(yarnClient);
+      this.client = client;
+    }
+
+    @Override
+    protected DAGClientAMProtocolBlockingPB waitForProxy(long clientTimeout, 
Configuration conf,
+        ApplicationId sessionAppId, UserGroupInformation ugi) throws 
TezException, IOException {
+      if (!client.callRealGetSessionAMProxy) {
+        return client.sessionAmProxy;
+      }
+      return super.getProxy(conf, sessionAppId, ugi);
+    }
+
+    @Override
+    protected DAGClientAMProtocolBlockingPB getProxy(Configuration conf, 
ApplicationId sessionAppId,
+        UserGroupInformation ugi) throws TezException, IOException {
+      if (!client.callRealGetSessionAMProxy) {
+        return client.sessionAmProxy;
+      }
+      return super.getProxy(conf, sessionAppId, ugi);
+    }
+  }
+
   TezClientForTest configureAndCreateTezClient() throws YarnException, 
IOException, ServiceException {
     return configureAndCreateTezClient(null);
   }
@@ -179,11 +195,11 @@ public class TestTezClient {
         
.thenReturn(GetAMStatusResponseProto.newBuilder().setStatus(TezAppMasterStatusProto.RUNNING).build());
 
     client.sessionAmProxy = sessionAmProxy;
-    client.mockTezYarnClient = new TezYarnClient(yarnClient);
+    client.frameworkClient = new TezYarnClientForTest(yarnClient, client);
     client.mockYarnClient = yarnClient;
     client.mockAppId = appId1;
-    
-    return client;    
+
+    return client;
   }
   
   @Test (timeout = 5000)
@@ -987,7 +1003,6 @@ public class TestTezClient {
     String val = "hostname:2181";
     conf.set("yarn.resourcemanager.zk-address", val);
 
-    ConfigurationProto confProto = null;
     //Test that Exception is not thrown by createFinalConfProtoForApp
     TezClientUtils.createFinalConfProtoForApp(conf, null);
   }
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java 
b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 5a6bb9a..c76bd6b 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -22,11 +22,14 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
-
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
 
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -46,18 +49,28 @@ import 
org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.DAGSubmissionTimedOut;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGClientHandler;
+import org.apache.tez.dag.api.client.DAGClientImpl;
+import org.apache.tez.dag.api.client.DAGClientImplLocal;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import 
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.LocalDAGAppMaster;
 import org.apache.tez.dag.app.dag.DAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
 
 public class LocalClient extends FrameworkClient {
   public static final Logger LOG = LoggerFactory.getLogger(LocalClient.class);
@@ -72,6 +85,8 @@ public class LocalClient extends FrameworkClient {
   private boolean isSession;
   private TezApiVersionInfo versionInfo = new TezApiVersionInfo();
   private volatile Throwable amFailException = null;
+  private boolean isLocalWithoutNetwork;
+
   private static final String localModeDAGSchedulerClassName =
       "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled";
 
@@ -89,6 +104,10 @@ public class LocalClient extends FrameworkClient {
 
     // disable web service for local mode.
     this.conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false);
+
+    this.isLocalWithoutNetwork =
+        tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK,
+            TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK_DEFAULT);
   }
 
 
@@ -170,7 +189,6 @@ public class LocalClient extends FrameworkClient {
     
report.setYarnApplicationState(convertDAGAppMasterState(dagAppMaster.getState()));
     
report.setFinalApplicationStatus(convertDAGAppMasterStateToFinalYARNState(dagAppMaster.getState()));
 
-
     List<String> diagnostics = dagAppMaster.getDiagnostics();
     if (diagnostics != null) {
       report.setDiagnostics(diagnostics.toString());
@@ -333,7 +351,7 @@ public class LocalClient extends FrameworkClient {
 
           dagAppMaster =
               createDAGAppMaster(applicationAttemptId, cId, currentHost, 
nmPort, nmHttpPort,
-                  new SystemClock(), appSubmitTime, isSession, 
userDir.toUri().getPath(),
+                  SystemClock.getInstance(), appSubmitTime, isSession, 
userDir.toUri().getPath(),
                   new String[] {localDir.toUri().getPath()}, new String[] 
{logDir.toUri().getPath()},
                   amCredentials, 
UserGroupInformation.getCurrentUser().getShortUserName());
           DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
@@ -372,8 +390,57 @@ public class LocalClient extends FrameworkClient {
         TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
             .getAmPluginDescriptor();
 
-    return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, 
nmHttpPort,
-        new SystemClock(), appSubmitTime, isSession, userDir, localDirs, 
logDirs,
-        versionInfo.getVersion(), credentials, jobUserName, 
amPluginDescriptorProto);
+    return isLocalWithoutNetwork
+      ? new LocalDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, 
nmHttpPort,
+          SystemClock.getInstance(), appSubmitTime, isSession, userDir, 
localDirs, logDirs,
+          versionInfo.getVersion(), credentials, jobUserName, 
amPluginDescriptorProto)
+      : new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, 
nmHttpPort,
+          SystemClock.getInstance(), appSubmitTime, isSession, userDir, 
localDirs, logDirs,
+          versionInfo.getVersion(), credentials, jobUserName, 
amPluginDescriptorProto);
+  }
+
+  @Override
+  public TezAppMasterStatus getAMStatus(Configuration configuration, 
ApplicationId appId,
+      UserGroupInformation ugi) throws TezException, ServiceException, 
IOException {
+    return clientHandler.getTezAppMasterStatus();
+  }
+
+  @Override
+  public DAGClient submitDag(org.apache.tez.dag.api.DAG dag, 
SubmitDAGRequestProto request,
+      String clientName, ApplicationId sessionAppId, long clientTimeout, 
UserGroupInformation ugi,
+      TezConfiguration tezConf) throws IOException, TezException, 
DAGSubmissionTimedOut {
+
+    Map<String, LocalResource> additionalResources = null;
+    if (request.hasAdditionalAmResources()) {
+      additionalResources =
+          
DagTypeConverters.convertFromPlanLocalResources(request.getAdditionalAmResources());
+    }
+
+    String dagId = dagAppMaster.submitDAGToAppMaster(request.getDAGPlan(), 
additionalResources);
+
+    return isLocalWithoutNetwork
+      ? new DAGClientImplLocal(sessionAppId, dagId, tezConf, this,
+          ugi, new BiFunction<Set<StatusGetOpts>, Long, DAGStatus>() {
+            @Override
+            public DAGStatus apply(Set<StatusGetOpts> statusOpts, Long 
timeout) {
+              try {
+                return clientHandler.getDAGStatus(dagId, statusOpts, timeout);
+              } catch (TezException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          })
+      : new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
+  }
+
+  @Override
+  public boolean shutdownSession(Configuration configuration, ApplicationId 
sessionAppId,
+      UserGroupInformation ugi) throws TezException, IOException, 
ServiceException {
+    if (isLocalWithoutNetwork) {
+      clientHandler.shutdownAM();
+      return true;
+    } else {
+      return super.shutdownSession(configuration, sessionAppId, ugi);
+    }
   }
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index dbcefe9..395e84a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -508,8 +508,7 @@ public class DAGAppMaster extends AbstractService {
     recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
         TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
 
-    clientRpcServer = new DAGClientServer(clientHandler, appAttemptID, 
recoveryFS);
-    addIfService(clientRpcServer, true);
+    initClientRpcServer();
 
     taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
     addIfService(taskHeartbeatHandler, true);
@@ -647,6 +646,11 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  protected void initClientRpcServer() {
+    clientRpcServer = new DAGClientServer(clientHandler, appAttemptID, 
recoveryFS);
+    addIfService(clientRpcServer, true);
+  }
+
   @VisibleForTesting
   protected DAGAppMasterShutdownHandler createShutdownHandler() {
     return new DAGAppMasterShutdownHandler();
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java
new file mode 100644
index 0000000..e0c8443
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+
+/**
+ * A DAGAppMaster implementation which is really local in a sense that it 
doesn't start an RPC
+ * server for handling dag requests. It is typically used by LocalClient, 
which already has an
+ * embedded DAGAppMaster, but by default, it calls RPC methods. With
+ * tez.local.mode.without.network=true, LocalClient will call the 
DAGAppMaster's methods directly.
+ */
+public class LocalDAGAppMaster extends DAGAppMaster {
+
+  public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId, 
ContainerId containerId,
+      String nmHost, int nmPort, int nmHttpPort, Clock clock, long 
appSubmitTime, boolean isSession,
+      String workingDirectory, String[] localDirs, String[] logDirs, String 
clientVersion,
+      Credentials credentials, String jobUserName, AMPluginDescriptorProto 
pluginDescriptorProto) {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
clock, appSubmitTime,
+        isSession, workingDirectory, localDirs, logDirs, clientVersion, 
credentials, jobUserName,
+        pluginDescriptorProto);
+  }
+
+  @Override
+  protected void initClientRpcServer() {
+    // nothing to do, in case of LocalDAGAppMaster clientRpcServer is not 
supposed to be used by clients
+  }
+
+  public int getRpcPort() {
+    return 0;
+  }
+}
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index cc2e163..8e6bfe7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -641,7 +641,9 @@ public class TaskSchedulerManager extends AbstractService 
implements
   
   @Override
   public synchronized void serviceStart() throws Exception {
-    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    // clientService is null in case of LocalDAGAppMaster
+    InetSocketAddress serviceAddr = clientService == null ? new 
InetSocketAddress("127.0.0.1", 0)
+        : clientService.getBindAddress();
     dagAppMaster = appContext.getAppMaster();
     // if web service is enabled then set tracking url. else disable it (value 
= "").
     // the actual url set on the rm web ui will be the proxy url set by 
WebAppProxyServlet, which
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 5526516..c335547 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -38,7 +38,7 @@ public class MockLocalClient extends LocalClient {
   public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) {
     this(mockAppLauncherGoFlag, clock, false, false, 1, 1);
   }
-  
+
   public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock,
       boolean initFailFlag, boolean startFailFlag, int concurrency, int 
containers) {
     this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
@@ -60,7 +60,7 @@ public class MockLocalClient extends LocalClient {
         concurrency, containers);
     return mockApp;
   }
-  
+
   public MockDAGAppMaster getMockApp() {
     return mockApp;
   }
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index e6ef8c9..bdb71ad 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -68,14 +68,16 @@ public class TestLocalMode {
   private static FileSystem remoteFs;
 
   private final boolean useDfs;
+  private final boolean useLocalModeWithoutNetwork;
 
-  @Parameterized.Parameters(name = "useDFS:{0}")
+  @Parameterized.Parameters(name = "useDFS:{0} useLocalModeWithoutNetwork:{1}")
   public static Collection<Object[]> params() {
-    return Arrays.asList(new Object[][]{{false}, {true}});
+    return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, 
true}, {true, true}});
   }
 
-  public TestLocalMode(boolean useDfs) {
+  public TestLocalMode(boolean useDfs, boolean useLocalModeWithoutNetwork) {
     this.useDfs = useDfs;
+    this.useLocalModeWithoutNetwork = useLocalModeWithoutNetwork;
   }
 
   @BeforeClass
@@ -105,6 +107,8 @@ public class TestLocalMode {
   private TezConfiguration createConf() {
     TezConfiguration conf = new TezConfiguration();
     conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK, 
useLocalModeWithoutNetwork);
+
     if (useDfs) {
       conf.set("fs.defaultFS", remoteFs.getUri().toString());
     } else {

Reply via email to