This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 61a28c7 TEZ-4236: DAGClientServer is not really needed to be
started/used in local mode (László Bodor reviewed by Ashutosh Chauhan)
61a28c7 is described below
commit 61a28c75c3885f0a043d304091c2a303dde5b617
Author: László Bodor <[email protected]>
AuthorDate: Thu Jan 28 22:17:32 2021 +0100
TEZ-4236: DAGClientServer is not really needed to be started/used in local
mode (László Bodor reviewed by Ashutosh Chauhan)
Signed-off-by: Laszlo Bodor <[email protected]>
---
.../org/apache/tez/client/FrameworkClient.java | 109 ++++++++++++++++
.../main/java/org/apache/tez/client/TezClient.java | 143 +++++----------------
.../java/org/apache/tez/client/TezClientUtils.java | 14 +-
.../org/apache/tez/dag/api/TezConfiguration.java | 13 ++
.../apache/tez/dag/api/client/DAGClientImpl.java | 2 +-
.../tez/dag/api/client/DAGClientImplLocal.java | 53 ++++++++
.../java/org/apache/tez/client/TestTezClient.java | 55 +++++---
.../java/org/apache/tez/client/LocalClient.java | 79 +++++++++++-
.../java/org/apache/tez/dag/app/DAGAppMaster.java | 8 +-
.../org/apache/tez/dag/app/LocalDAGAppMaster.java | 51 ++++++++
.../tez/dag/app/rm/TaskSchedulerManager.java | 4 +-
.../org/apache/tez/dag/app/MockLocalClient.java | 4 +-
.../java/org/apache/tez/test/TestLocalMode.java | 10 +-
13 files changed, 394 insertions(+), 151 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index 7c60ec1..2ec6d28 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -21,20 +21,39 @@ package org.apache.tez.client;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.RPCUtil;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DAGSubmissionTimedOut;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGClientImpl;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
+import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
+import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ServiceException;
@Private
public abstract class FrameworkClient {
+ protected static final Logger LOG =
LoggerFactory.getLogger(FrameworkClient.class);
public static FrameworkClient createFrameworkClient(TezConfiguration
tezConf) {
@@ -77,4 +96,94 @@ public abstract class FrameworkClient {
public abstract boolean isRunning() throws IOException;
+ public TezAppMasterStatus getAMStatus(Configuration conf, ApplicationId
appId,
+ UserGroupInformation ugi) throws TezException, ServiceException,
IOException {
+ DAGClientAMProtocolBlockingPB proxy = getProxy(conf, appId, ugi);
+
+ if (proxy == null) {
+ return TezAppMasterStatus.INITIALIZING;
+ }
+ GetAMStatusResponseProto response =
+ proxy.getAMStatus(null, GetAMStatusRequestProto.newBuilder().build());
+ return
DagTypeConverters.convertTezAppMasterStatusFromProto(response.getStatus());
+ }
+
+ public DAGClient submitDag(DAG dag, SubmitDAGRequestProto request, String
clientName,
+ ApplicationId sessionAppId, long clientTimeout, UserGroupInformation
ugi, TezConfiguration tezConf)
+ throws IOException, TezException, DAGSubmissionTimedOut {
+ DAGClientAMProtocolBlockingPB proxy = null;
+ try {
+ proxy = waitForProxy(clientTimeout, tezConf, sessionAppId, ugi);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while trying to create a connection
to the AM", e);
+ }
+ if (proxy == null) {
+ try {
+ LOG.warn("DAG submission to session timed out, stopping session");
+ stop();
+ } catch (Throwable t) {
+ LOG.info("Got an exception when trying to stop session", t);
+ }
+ throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
+ + ", timed out after " + clientTimeout + " seconds");
+ }
+
+ String dagId = null;
+ try {
+ SubmitDAGResponseProto response = proxy.submitDAG(null, request);
+ // the following check is only for testing since the final class
+ // SubmitDAGResponseProto cannot be mocked
+ if (response != null) {
+ dagId = response.getDagId();
+ }
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ }
+
+ LOG.info("Submitted dag to TezSession"
+ + ", sessionName=" + clientName
+ + ", applicationId=" + sessionAppId
+ + ", dagId=" + dagId
+ + ", dagName=" + dag.getName());
+ return new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
+ }
+
+ protected DAGClientAMProtocolBlockingPB waitForProxy(long clientTimeout,
Configuration conf,
+ ApplicationId sessionAppId, UserGroupInformation ugi)
+ throws IOException, TezException, InterruptedException {
+ long startTime = System.currentTimeMillis();
+ long endTime = startTime + (clientTimeout * 1000);
+ DAGClientAMProtocolBlockingPB proxy = null;
+ while (true) {
+ proxy = TezClientUtils.getAMProxy(this, conf, sessionAppId, ugi);
+ if (proxy != null) {
+ break;
+ }
+ Thread.sleep(100L);
+ if (clientTimeout != -1 && System.currentTimeMillis() > endTime) {
+ break;
+ }
+ }
+ return proxy;
+ }
+
+ /**
+ * Shuts down session and returns a boolean=true if a proxy was successfully
created and through
+ * that proxy a shutdownSession was called.
+ */
+ public boolean shutdownSession(Configuration conf, ApplicationId
sessionAppId,
+ UserGroupInformation ugi) throws TezException, IOException,
ServiceException {
+ DAGClientAMProtocolBlockingPB proxy = getProxy(conf, sessionAppId, ugi);
+ if (proxy != null) {
+ ShutdownSessionRequestProto request =
ShutdownSessionRequestProto.newBuilder().build();
+ proxy.shutdownSession(null, request);
+ return true;
+ }
+ return false;
+ }
+
+ protected DAGClientAMProtocolBlockingPB getProxy(Configuration conf,
ApplicationId sessionAppId,
+ UserGroupInformation ugi) throws TezException, IOException {
+ return TezClientUtils.getAMProxy(this, conf, sessionAppId, ugi);
+ }
}
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index fbe3509..da213b8 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.JavaOptsChecker;
-import org.apache.tez.common.RPCUtil;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.dag.api.TezConfigurationConstants;
@@ -75,10 +74,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
-import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
-import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
-import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -123,7 +119,8 @@ public class TezClient {
private ApplicationId lastSubmittedAppId;
@VisibleForTesting
final AMConfiguration amConfig;
- private FrameworkClient frameworkClient;
+ @VisibleForTesting
+ FrameworkClient frameworkClient;
private String diagnostics;
@VisibleForTesting
final boolean isSession;
@@ -158,7 +155,7 @@ public class TezClient {
private TezClient(String name, TezConfiguration tezConf) {
this(name, tezConf, tezConf.getBoolean(
- TezConfiguration.TEZ_AM_SESSION_MODE,
TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));
+ TezConfiguration.TEZ_AM_SESSION_MODE,
TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));
}
@Private
@@ -549,7 +546,8 @@ public class TezClient {
try {
if (proxy == null) {
try {
- proxy = waitForProxy();
+ proxy = frameworkClient.waitForProxy(clientTimeout,
amConfig.getTezConfiguration(),
+ sessionAppId, getUgi());
} catch (InterruptedException e) {
LOG.debug("Interrupted while trying to create a connection to the
AM", e);
} catch (SessionNotRunning e) {
@@ -629,7 +627,6 @@ public class TezClient {
verifySessionStateForSubmission();
- String dagId = null;
String callerContextStr = "";
if (dag.getCallerContext() != null) {
callerContextStr = ", callerContext=" +
dag.getCallerContext().contextAsSimpleString();
@@ -678,42 +675,8 @@ public class TezClient {
}
}
- DAGClientAMProtocolBlockingPB proxy = null;
- try {
- proxy = waitForProxy();
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while trying to create a connection
to the AM", e);
- }
- if (proxy == null) {
- try {
- LOG.warn("DAG submission to session timed out, stopping session");
- stop();
- } catch (Throwable t) {
- LOG.info("Got an exception when trying to stop session", t);
- }
- throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
- + ", timed out after " + clientTimeout + " seconds");
- }
-
- try {
- SubmitDAGResponseProto response = proxy.submitDAG(null, request);
- // the following check is only for testing since the final class
- // SubmitDAGResponseProto cannot be mocked
- if (response != null) {
- dagId = response.getDagId();
- }
- } catch (ServiceException e) {
- RPCUtil.unwrapAndThrowException(e);
- }
-
- LOG.info("Submitted dag to TezSession"
- + ", sessionName=" + clientName
- + ", applicationId=" + sessionAppId
- + ", dagId=" + dagId
- + ", dagName=" + dag.getName());
- return new DAGClientImpl(sessionAppId, dagId,
- amConfig.getTezConfiguration(),
- frameworkClient, getUgi());
+ return frameworkClient.submitDag(dag, request, clientName, sessionAppId,
clientTimeout,
+ getUgi(), amConfig.getTezConfiguration());
}
private UserGroupInformation getUgi() throws IOException {
@@ -746,39 +709,34 @@ public class TezClient {
sessionStopped.set(true);
boolean sessionShutdownSuccessful = false;
try {
- DAGClientAMProtocolBlockingPB proxy = getAMProxy(sessionAppId);
- if (proxy != null) {
- ShutdownSessionRequestProto request =
- ShutdownSessionRequestProto.newBuilder().build();
- proxy.shutdownSession(null, request);
- sessionShutdownSuccessful = true;
- boolean asynchronousStop =
amConfig.getTezConfiguration().getBoolean(
- TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP,
- TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT);
- if (!asynchronousStop) {
- LOG.info("Waiting until application is in a final state");
- long currentTimeMillis = System.currentTimeMillis();
- long timeKillIssued = currentTimeMillis;
- long killTimeOut = amConfig.getTezConfiguration().getLong(
- TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS,
- TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT);
- ApplicationReport appReport = frameworkClient
- .getApplicationReport(sessionAppId);
- while ((currentTimeMillis < timeKillIssued + killTimeOut)
- &&
!isJobInTerminalState(appReport.getYarnApplicationState())) {
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException ie) {
- /** interrupted, just break */
- break;
- }
- currentTimeMillis = System.currentTimeMillis();
- appReport = frameworkClient.getApplicationReport(sessionAppId);
+ sessionShutdownSuccessful = frameworkClient
+ .shutdownSession(amConfig.getTezConfiguration(), sessionAppId,
getUgi());
+ boolean asynchronousStop = amConfig.getTezConfiguration().getBoolean(
+ TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP,
+ TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT);
+ if (!asynchronousStop && sessionShutdownSuccessful) {
+ LOG.info("Waiting until application is in a final state");
+ long currentTimeMillis = System.currentTimeMillis();
+ long timeKillIssued = currentTimeMillis;
+ long killTimeOut = amConfig.getTezConfiguration().getLong(
+ TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS,
+ TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT);
+ ApplicationReport appReport = frameworkClient
+ .getApplicationReport(sessionAppId);
+ while ((currentTimeMillis < timeKillIssued + killTimeOut)
+ && !isJobInTerminalState(appReport.getYarnApplicationState()))
{
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException ie) {
+ /** interrupted, just break */
+ break;
}
+ currentTimeMillis = System.currentTimeMillis();
+ appReport = frameworkClient.getApplicationReport(sessionAppId);
+ }
- if (!isJobInTerminalState(appReport.getYarnApplicationState())) {
- frameworkClient.killApplication(sessionAppId);
- }
+ if (!isJobInTerminalState(appReport.getYarnApplicationState())) {
+ frameworkClient.killApplication(sessionAppId);
}
}
} catch (TezException e) {
@@ -873,14 +831,7 @@ public class TezClient {
return TezAppMasterStatus.SHUTDOWN;
case RUNNING:
try {
- DAGClientAMProtocolBlockingPB proxy = getAMProxy(appId);
- if (proxy == null) {
- return TezAppMasterStatus.INITIALIZING;
- }
- GetAMStatusResponseProto response = proxy.getAMStatus(null,
- GetAMStatusRequestProto.newBuilder().build());
- return DagTypeConverters.convertTezAppMasterStatusFromProto(
- response.getStatus());
+ return frameworkClient.getAMStatus(amConfig.getTezConfiguration(),
appId, getUgi());
} catch (TezException e) {
LOG.info("Failed to retrieve AM Status via proxy", e);
} catch (ServiceException e) {
@@ -1059,32 +1010,6 @@ public class TezClient {
return
FrameworkClient.createFrameworkClient(amConfig.getTezConfiguration());
}
- @VisibleForTesting
- // for testing
- protected DAGClientAMProtocolBlockingPB getAMProxy(ApplicationId appId)
- throws TezException, IOException {
- return TezClientUtils.getAMProxy(
- frameworkClient, amConfig.getTezConfiguration(), appId, getUgi());
- }
-
- private DAGClientAMProtocolBlockingPB waitForProxy()
- throws IOException, TezException, InterruptedException {
- long startTime = System.currentTimeMillis();
- long endTime = startTime + (clientTimeout * 1000);
- DAGClientAMProtocolBlockingPB proxy = null;
- while (true) {
- proxy = getAMProxy(sessionAppId);
- if (proxy != null) {
- break;
- }
- Thread.sleep(100l);
- if (clientTimeout != -1 && System.currentTimeMillis() > endTime) {
- break;
- }
- }
- return proxy;
- }
-
private void verifySessionStateForSubmission() throws SessionNotRunning {
Preconditions.checkState(isSession, "Invalid without session mode");
if (!sessionStarted.get()) {
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8e7ccaf..79069ed 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -912,13 +912,12 @@ public class TezClientUtils {
return textPath;
}
- static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient yarnClient,
- Configuration conf,
- ApplicationId applicationId, UserGroupInformation ugi) throws
TezException, IOException {
+ static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient
frameworkClient,
+ Configuration conf, ApplicationId applicationId, UserGroupInformation
ugi)
+ throws TezException, IOException {
ApplicationReport appReport;
try {
- appReport = yarnClient.getApplicationReport(
- applicationId);
+ appReport = frameworkClient.getApplicationReport(applicationId);
if(appReport == null) {
throw new TezUncheckedException("Could not retrieve application report"
@@ -948,8 +947,9 @@ public class TezClientUtils {
} catch (YarnException e) {
throw new TezException(e);
}
- return getAMProxy(conf, appReport.getHost(),
- appReport.getRpcPort(), appReport.getClientToAMToken(), ugi);
+
+ return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort(),
+ appReport.getClientToAMToken(), ugi);
}
@Private
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 179b195..05eb4b2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1749,6 +1749,19 @@ public class TezConfiguration extends Configuration {
public static final boolean TEZ_LOCAL_MODE_DEFAULT = false;
/**
+ * Boolean value. Enable local mode execution in Tez without using network
for communicating with
+ * DAGAppMaster. This option only makes sense when {@link #TEZ_LOCAL_MODE} is
true. When
+ * TEZ_LOCAL_MODE_WITHOUT_NETWORK is turned on, LocalClient will call
DAGAppMaster's methods
+ * directly.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type = "boolean")
+ public static final String TEZ_LOCAL_MODE_WITHOUT_NETWORK =
+ TEZ_PREFIX + "local.mode.without.network";
+
+ public static final boolean TEZ_LOCAL_MODE_WITHOUT_NETWORK_DEFAULT = false;
+
+ /**
* String value. TezLocalCacheManager uses this folder as a root for temp
and localized files.
*/
@ConfigurationScope(Scope.VERTEX)
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index b54db32..e58863f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -209,7 +209,7 @@ public class DAGClientImpl extends DAGClient {
}
}
- private DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts>
statusOptions,
+ protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts>
statusOptions,
long timeout) throws TezException, IOException {
if (!dagCompleted) {
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
new file mode 100644
index 0000000..a0509cd
--- /dev/null
+++
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImplLocal.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api.client;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+
+/**
+ * A DAGClientImpl which is typically used for
tez.local.mode.without.network=true.
+ */
+public class DAGClientImplLocal extends DAGClientImpl {
+
+ private BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction;
+
+ public DAGClientImplLocal(ApplicationId appId, String dagId,
TezConfiguration conf,
+ FrameworkClient frameworkClient, UserGroupInformation ugi,
+ BiFunction<Set<StatusGetOpts>, Long, DAGStatus> dagStatusFunction) {
+ super(appId, dagId, conf, frameworkClient, ugi);
+ this.dagStatusFunction = dagStatusFunction;
+ }
+
+ @Override
+ protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts>
statusOptions, long timeout)
+ throws TezException, IOException {
+ return dagStatusFunction.apply(statusOptions == null ? new HashSet<>() :
statusOptions,
+ timeout);
+ }
+}
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 7316452..67c4a60 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -93,7 +94,6 @@ import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResp
import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto;
-import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
@@ -112,7 +112,6 @@ public class TestTezClient {
TestTezClient.class.getName()).getAbsoluteFile();
class TezClientForTest extends TezClient {
- TezYarnClient mockTezYarnClient;
DAGClientAMProtocolBlockingPB sessionAmProxy;
YarnClient mockYarnClient;
ApplicationId mockAppId;
@@ -120,23 +119,13 @@ public class TestTezClient {
Long prewarmTimeoutMs;
public TezClientForTest(String name, TezConfiguration tezConf,
- @Nullable Map<String, LocalResource> localResources,
- @Nullable Credentials credentials) {
+ @Nullable Map<String, LocalResource> localResources, @Nullable
Credentials credentials) {
super(name, tezConf, localResources, credentials);
}
-
+
@Override
protected FrameworkClient createFrameworkClient() {
- return mockTezYarnClient;
- }
-
- @Override
- protected DAGClientAMProtocolBlockingPB getAMProxy(ApplicationId appId)
- throws TezException, IOException {
- if (!callRealGetSessionAMProxy) {
- return sessionAmProxy;
- }
- return super.getAMProxy(appId);
+ return frameworkClient; // already initialized
}
public void setPrewarmTimeoutMs(Long prewarmTimeoutMs) {
@@ -148,7 +137,34 @@ public class TestTezClient {
return prewarmTimeoutMs == null ? super.getPrewarmWaitTimeMs() :
prewarmTimeoutMs;
}
}
-
+
+ class TezYarnClientForTest extends TezYarnClient {
+ private TezClientForTest client;
+
+ protected TezYarnClientForTest(YarnClient yarnClient, TezClientForTest
client) {
+ super(yarnClient);
+ this.client = client;
+ }
+
+ @Override
+ protected DAGClientAMProtocolBlockingPB waitForProxy(long clientTimeout,
Configuration conf,
+ ApplicationId sessionAppId, UserGroupInformation ugi) throws
TezException, IOException {
+ if (!client.callRealGetSessionAMProxy) {
+ return client.sessionAmProxy;
+ }
+ return super.getProxy(conf, sessionAppId, ugi);
+ }
+
+ @Override
+ protected DAGClientAMProtocolBlockingPB getProxy(Configuration conf,
ApplicationId sessionAppId,
+ UserGroupInformation ugi) throws TezException, IOException {
+ if (!client.callRealGetSessionAMProxy) {
+ return client.sessionAmProxy;
+ }
+ return super.getProxy(conf, sessionAppId, ugi);
+ }
+ }
+
TezClientForTest configureAndCreateTezClient() throws YarnException,
IOException, ServiceException {
return configureAndCreateTezClient(null);
}
@@ -179,11 +195,11 @@ public class TestTezClient {
.thenReturn(GetAMStatusResponseProto.newBuilder().setStatus(TezAppMasterStatusProto.RUNNING).build());
client.sessionAmProxy = sessionAmProxy;
- client.mockTezYarnClient = new TezYarnClient(yarnClient);
+ client.frameworkClient = new TezYarnClientForTest(yarnClient, client);
client.mockYarnClient = yarnClient;
client.mockAppId = appId1;
-
- return client;
+
+ return client;
}
@Test (timeout = 5000)
@@ -987,7 +1003,6 @@ public class TestTezClient {
String val = "hostname:2181";
conf.set("yarn.resourcemanager.zk-address", val);
- ConfigurationProto confProto = null;
//Test that Exception is not thrown by createFinalConfProtoForApp
TezClientUtils.createFinalConfProtoForApp(conf, null);
}
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 5a6bb9a..c76bd6b 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -22,11 +22,14 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
-
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -46,18 +49,28 @@ import
org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.DAGSubmissionTimedOut;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGClientHandler;
+import org.apache.tez.dag.api.client.DAGClientImpl;
+import org.apache.tez.dag.api.client.DAGClientImplLocal;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import
org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.LocalDAGAppMaster;
import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
public class LocalClient extends FrameworkClient {
public static final Logger LOG = LoggerFactory.getLogger(LocalClient.class);
@@ -72,6 +85,8 @@ public class LocalClient extends FrameworkClient {
private boolean isSession;
private TezApiVersionInfo versionInfo = new TezApiVersionInfo();
private volatile Throwable amFailException = null;
+ private boolean isLocalWithoutNetwork;
+
private static final String localModeDAGSchedulerClassName =
"org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled";
@@ -89,6 +104,10 @@ public class LocalClient extends FrameworkClient {
// disable web service for local mode.
this.conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false);
+
+ this.isLocalWithoutNetwork =
+ tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK,
+ TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK_DEFAULT);
}
@@ -170,7 +189,6 @@ public class LocalClient extends FrameworkClient {
report.setYarnApplicationState(convertDAGAppMasterState(dagAppMaster.getState()));
report.setFinalApplicationStatus(convertDAGAppMasterStateToFinalYARNState(dagAppMaster.getState()));
-
List<String> diagnostics = dagAppMaster.getDiagnostics();
if (diagnostics != null) {
report.setDiagnostics(diagnostics.toString());
@@ -333,7 +351,7 @@ public class LocalClient extends FrameworkClient {
dagAppMaster =
createDAGAppMaster(applicationAttemptId, cId, currentHost,
nmPort, nmHttpPort,
- new SystemClock(), appSubmitTime, isSession,
userDir.toUri().getPath(),
+ SystemClock.getInstance(), appSubmitTime, isSession,
userDir.toUri().getPath(),
new String[] {localDir.toUri().getPath()}, new String[]
{logDir.toUri().getPath()},
amCredentials,
UserGroupInformation.getCurrentUser().getShortUserName());
DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
@@ -372,8 +390,57 @@ public class LocalClient extends FrameworkClient {
TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
.getAmPluginDescriptor();
- return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort,
nmHttpPort,
- new SystemClock(), appSubmitTime, isSession, userDir, localDirs,
logDirs,
- versionInfo.getVersion(), credentials, jobUserName,
amPluginDescriptorProto);
+ return isLocalWithoutNetwork
+ ? new LocalDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort,
nmHttpPort,
+ SystemClock.getInstance(), appSubmitTime, isSession, userDir,
localDirs, logDirs,
+ versionInfo.getVersion(), credentials, jobUserName,
amPluginDescriptorProto)
+ : new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort,
nmHttpPort,
+ SystemClock.getInstance(), appSubmitTime, isSession, userDir,
localDirs, logDirs,
+ versionInfo.getVersion(), credentials, jobUserName,
amPluginDescriptorProto);
+ }
+
+ @Override
+ public TezAppMasterStatus getAMStatus(Configuration configuration,
ApplicationId appId,
+ UserGroupInformation ugi) throws TezException, ServiceException,
IOException {
+ return clientHandler.getTezAppMasterStatus();
+ }
+
+ @Override
+ public DAGClient submitDag(org.apache.tez.dag.api.DAG dag,
SubmitDAGRequestProto request,
+ String clientName, ApplicationId sessionAppId, long clientTimeout,
UserGroupInformation ugi,
+ TezConfiguration tezConf) throws IOException, TezException,
DAGSubmissionTimedOut {
+
+ Map<String, LocalResource> additionalResources = null;
+ if (request.hasAdditionalAmResources()) {
+ additionalResources =
+
DagTypeConverters.convertFromPlanLocalResources(request.getAdditionalAmResources());
+ }
+
+ String dagId = dagAppMaster.submitDAGToAppMaster(request.getDAGPlan(),
additionalResources);
+
+ return isLocalWithoutNetwork
+ ? new DAGClientImplLocal(sessionAppId, dagId, tezConf, this,
+ ugi, new BiFunction<Set<StatusGetOpts>, Long, DAGStatus>() {
+ @Override
+ public DAGStatus apply(Set<StatusGetOpts> statusOpts, Long
timeout) {
+ try {
+ return clientHandler.getDAGStatus(dagId, statusOpts, timeout);
+ } catch (TezException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ })
+ : new DAGClientImpl(sessionAppId, dagId, tezConf, this, ugi);
+ }
+
+ @Override
+ public boolean shutdownSession(Configuration configuration, ApplicationId
sessionAppId,
+ UserGroupInformation ugi) throws TezException, IOException,
ServiceException {
+ if (isLocalWithoutNetwork) {
+ clientHandler.shutdownAM();
+ return true;
+ } else {
+ return super.shutdownSession(configuration, sessionAppId, ugi);
+ }
}
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index dbcefe9..395e84a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -508,8 +508,7 @@ public class DAGAppMaster extends AbstractService {
recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
- clientRpcServer = new DAGClientServer(clientHandler, appAttemptID,
recoveryFS);
- addIfService(clientRpcServer, true);
+ initClientRpcServer();
taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
addIfService(taskHeartbeatHandler, true);
@@ -647,6 +646,11 @@ public class DAGAppMaster extends AbstractService {
}
}
+ protected void initClientRpcServer() {
+ clientRpcServer = new DAGClientServer(clientHandler, appAttemptID,
recoveryFS);
+ addIfService(clientRpcServer, true);
+ }
+
@VisibleForTesting
protected DAGAppMasterShutdownHandler createShutdownHandler() {
return new DAGAppMasterShutdownHandler();
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java
new file mode 100644
index 0000000..e0c8443
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+
+/**
+ * A DAGAppMaster implementation which is really local in a sense that it
doesn't start an RPC
+ * server for handling dag requests. It is typically used by LocalClient,
which already has an
+ * embedded DAGAppMaster, but by default, it calls RPC methods. With
+ * tez.local.mode.without.network=true, LocalClient will call the
DAGAppMaster's methods directly.
+ */
+public class LocalDAGAppMaster extends DAGAppMaster {
+
+ public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId,
+ String nmHost, int nmPort, int nmHttpPort, Clock clock, long
appSubmitTime, boolean isSession,
+ String workingDirectory, String[] localDirs, String[] logDirs, String
clientVersion,
+ Credentials credentials, String jobUserName, AMPluginDescriptorProto
pluginDescriptorProto) {
+ super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
clock, appSubmitTime,
+ isSession, workingDirectory, localDirs, logDirs, clientVersion,
credentials, jobUserName,
+ pluginDescriptorProto);
+ }
+
+ @Override
+ protected void initClientRpcServer() {
+ // nothing to do, in case of LocalDAGAppMaster clientRpcServer is not
supposed to be used by clients
+ }
+
+ public int getRpcPort() {
+ return 0;
+ }
+}
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index cc2e163..8e6bfe7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -641,7 +641,9 @@ public class TaskSchedulerManager extends AbstractService
implements
@Override
public synchronized void serviceStart() throws Exception {
- InetSocketAddress serviceAddr = clientService.getBindAddress();
+ // clientService is null in case of LocalDAGAppMaster
+ InetSocketAddress serviceAddr = clientService == null ? new
InetSocketAddress("127.0.0.1", 0)
+ : clientService.getBindAddress();
dagAppMaster = appContext.getAppMaster();
// if web service is enabled then set tracking url. else disable it (value
= "").
// the actual url set on the rm web ui will be the proxy url set by
WebAppProxyServlet, which
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 5526516..c335547 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -38,7 +38,7 @@ public class MockLocalClient extends LocalClient {
public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) {
this(mockAppLauncherGoFlag, clock, false, false, 1, 1);
}
-
+
public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock,
boolean initFailFlag, boolean startFailFlag, int concurrency, int
containers) {
this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
@@ -60,7 +60,7 @@ public class MockLocalClient extends LocalClient {
concurrency, containers);
return mockApp;
}
-
+
public MockDAGAppMaster getMockApp() {
return mockApp;
}
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index e6ef8c9..bdb71ad 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -68,14 +68,16 @@ public class TestLocalMode {
private static FileSystem remoteFs;
private final boolean useDfs;
+ private final boolean useLocalModeWithoutNetwork;
- @Parameterized.Parameters(name = "useDFS:{0}")
+ @Parameterized.Parameters(name = "useDFS:{0} useLocalModeWithoutNetwork:{1}")
public static Collection<Object[]> params() {
- return Arrays.asList(new Object[][]{{false}, {true}});
+ return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false,
true}, {true, true}});
}
- public TestLocalMode(boolean useDfs) {
+ public TestLocalMode(boolean useDfs, boolean useLocalModeWithoutNetwork) {
this.useDfs = useDfs;
+ this.useLocalModeWithoutNetwork = useLocalModeWithoutNetwork;
}
@BeforeClass
@@ -105,6 +107,8 @@ public class TestLocalMode {
private TezConfiguration createConf() {
TezConfiguration conf = new TezConfiguration();
conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK,
useLocalModeWithoutNetwork);
+
if (useDfs) {
conf.set("fs.defaultFS", remoteFs.getUri().toString());
} else {