This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new af3a969 TEZ-4156: Fix Tez to reuse IPC connections (Rajesh Balamohan,
reviewed by Siddharth Seth, László Bodor, Jonathan Turner Eagles)
af3a969 is described below
commit af3a9692ee4a4689464b82db3a78c70293631dce
Author: Rajesh Balamohan <[email protected]>
AuthorDate: Wed Apr 29 03:43:49 2020 +0530
TEZ-4156: Fix Tez to reuse IPC connections (Rajesh Balamohan, reviewed by
Siddharth Seth, László Bodor, Jonathan Turner Eagles)
---
.../main/java/org/apache/tez/client/TezClient.java | 24 ++++++++++++++++------
.../java/org/apache/tez/client/TezClientUtils.java | 9 ++++----
.../apache/tez/dag/api/client/DAGClientImpl.java | 6 ++++--
.../tez/dag/api/client/rpc/DAGClientRPCImpl.java | 8 ++++++--
.../tez/dag/api/client/rpc/TestDAGClient.java | 16 ++++++++-------
5 files changed, 41 insertions(+), 22 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 60c0e5e..4fb37dc 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.JavaOptsChecker;
import org.apache.tez.common.RPCUtil;
import org.apache.tez.common.TezCommonUtils;
@@ -154,6 +155,8 @@ public class TezClient {
private ScheduledExecutorService amKeepAliveService;
+ private final Map<String, UserGroupInformation> ugiMap;
+
private TezClient(String name, TezConfiguration tezConf) {
this(name, tezConf, tezConf.getBoolean(
TezConfiguration.TEZ_AM_SESSION_MODE,
TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));
@@ -197,6 +200,7 @@ public class TezClient {
LOG.warn("The host name of the client the tez application was submitted
from was unable to be retrieved", e);
}
+ this.ugiMap = new HashMap<>();
this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
this.apiVersionInfo = new TezApiVersionInfo();
this.servicePluginsDescriptor = servicePluginsDescriptor;
@@ -713,7 +717,13 @@ public class TezClient {
return new DAGClientImpl(sessionAppId, dagId,
amConfig.getTezConfiguration(),
amConfig.getYarnConfiguration(),
- frameworkClient);
+ frameworkClient, getUgi());
+ }
+
+ private UserGroupInformation getUgi() throws IOException {
+ String userName = UserGroupInformation.getCurrentUser().getUserName();
+ return ugiMap.computeIfAbsent(userName,
+ v -> UserGroupInformation.createRemoteUser(userName));
}
@VisibleForTesting
@@ -1058,7 +1068,7 @@ public class TezClient {
protected DAGClientAMProtocolBlockingPB getAMProxy(ApplicationId appId)
throws TezException, IOException {
return TezClientUtils.getAMProxy(
- frameworkClient, amConfig.getYarnConfiguration(), appId);
+ frameworkClient, amConfig.getYarnConfiguration(), appId, getUgi());
}
private DAGClientAMProtocolBlockingPB waitForProxy()
@@ -1137,7 +1147,7 @@ public class TezClient {
// wait for dag in non-session mode to start running, so that we can start
to getDAGStatus
waitNonSessionTillReady();
return getDAGClient(appId, amConfig.getTezConfiguration(),
amConfig.getYarnConfiguration(),
- frameworkClient);
+ frameworkClient, getUgi());
}
private ApplicationId createApplication() throws TezException, IOException {
@@ -1161,17 +1171,19 @@ public class TezClient {
@Private
static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
YarnConfiguration
- yarnConf, FrameworkClient frameworkClient)
+ yarnConf, FrameworkClient frameworkClient, UserGroupInformation ugi)
throws IOException, TezException {
return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf,
- yarnConf, frameworkClient);
+ yarnConf, frameworkClient, ugi);
}
@Private // Used only for MapReduce compatibility code
static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
FrameworkClient frameworkClient)
throws IOException, TezException {
- return getDAGClient(appId, tezConf, new YarnConfiguration(tezConf),
frameworkClient);
+ UserGroupInformation ugi =
+
UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName());
+ return getDAGClient(appId, tezConf, new YarnConfiguration(tezConf),
frameworkClient, ugi);
}
// DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8850ca7..d34d31e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -919,7 +919,7 @@ public class TezClientUtils {
static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient yarnClient,
Configuration conf,
- ApplicationId applicationId) throws TezException, IOException {
+ ApplicationId applicationId, UserGroupInformation ugi) throws
TezException, IOException {
ApplicationReport appReport;
try {
appReport = yarnClient.getApplicationReport(
@@ -954,16 +954,15 @@ public class TezClientUtils {
throw new TezException(e);
}
return getAMProxy(conf, appReport.getHost(),
- appReport.getRpcPort(), appReport.getClientToAMToken());
+ appReport.getRpcPort(), appReport.getClientToAMToken(), ugi);
}
@Private
public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration
conf, String amHost,
- int amRpcPort, org.apache.hadoop.yarn.api.records.Token clientToAMToken)
throws IOException {
+ int amRpcPort, org.apache.hadoop.yarn.api.records.Token clientToAMToken,
+ UserGroupInformation userUgi) throws IOException {
final InetSocketAddress serviceAddr =
NetUtils.createSocketAddrForHost(amHost, amRpcPort);
- UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(UserGroupInformation
- .getCurrentUser().getUserName());
if (clientToAMToken != null) {
Token<ClientToAMTokenIdentifier> token =
ConverterUtils.convertFromYarn(clientToAMToken,
serviceAddr);
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index 52f66e3..0b899fd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.Preconditions;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@@ -79,7 +80,8 @@ public class DAGClientImpl extends DAGClient {
private boolean cleanupFrameworkClient;
public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration
conf,
- YarnConfiguration yarnConf, @Nullable FrameworkClient frameworkClient) {
+ YarnConfiguration yarnConf, @Nullable FrameworkClient frameworkClient,
+ UserGroupInformation ugi) {
this.appId = appId;
this.dagId = dagId;
this.conf = conf;
@@ -99,7 +101,7 @@ public class DAGClientImpl extends DAGClient {
TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) &&
DAGClientTimelineImpl.isSupported();
- realClient = new DAGClientRPCImpl(appId, dagId, conf,
this.frameworkClient);
+ realClient = new DAGClientRPCImpl(appId, dagId, conf,
this.frameworkClient, ugi);
statusPollInterval = conf.getLong(
TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,
TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT);
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 02935df..c54058b 100644
---
a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -23,6 +23,7 @@ import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.tez.common.RPCUtil;
import org.apache.tez.dag.api.SessionNotRunning;
@@ -68,13 +69,16 @@ public class DAGClientRPCImpl extends DAGClientInternal {
@VisibleForTesting
DAGClientAMProtocolBlockingPB proxy = null;
+ private UserGroupInformation ugi;
+
public DAGClientRPCImpl(ApplicationId appId, String dagId,
- TezConfiguration conf, @Nullable FrameworkClient frameworkClient) {
+ TezConfiguration conf, @Nullable FrameworkClient frameworkClient,
UserGroupInformation ugi) {
this.appId = appId;
this.dagId = dagId;
this.conf = conf;
this.frameworkClient = frameworkClient;
appReport = null;
+ this.ugi = ugi;
}
@Override
@@ -286,7 +290,7 @@ public class DAGClientRPCImpl extends DAGClientInternal {
}
proxy = TezClientUtils.getAMProxy(conf, appReport.getHost(),
appReport.getRpcPort(),
- appReport.getClientToAMToken());
+ appReport.getClientToAMToken(), ugi);
return true;
}
diff --git
a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 70ee1d4..57087cb 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -204,7 +205,8 @@ public class TestDAGClient {
TezConfiguration tezConf = new TezConfiguration();
YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
- dagClient = new DAGClientImpl(mockAppId, dagIdStr, tezConf, yarnConf,
null);
+ dagClient = new DAGClientImpl(mockAppId, dagIdStr, tezConf, yarnConf,
null,
+ UserGroupInformation.getCurrentUser());
DAGClientRPCImpl realClient =
(DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
realClient.appReport = mockAppReport;
realClient.proxy = mockProxy;
@@ -410,7 +412,7 @@ public class TestDAGClient {
}
@Test(timeout = 5000)
- public void testDagClientTimelineEnabledCondition() {
+ public void testDagClientTimelineEnabledCondition() throws IOException {
String historyLoggingClass =
"org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
testAtsEnabled(mockAppId, dagIdStr, false, "", true, true);
@@ -422,7 +424,7 @@ public class TestDAGClient {
private static void testAtsEnabled(ApplicationId appId, String dagIdStr,
boolean expected,
String loggingClass, boolean
amHistoryLoggingEnabled,
- boolean dagHistoryLoggingEnabled) {
+ boolean dagHistoryLoggingEnabled) throws
IOException {
TezConfiguration tezConf = new TezConfiguration();
YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
@@ -441,8 +443,8 @@ public class TestDAGClient {
public DAGClientRPCImplForTest(ApplicationId appId, String dagId,
TezConfiguration conf,
- @Nullable FrameworkClient frameworkClient) {
- super(appId, dagId, conf, frameworkClient);
+ @Nullable FrameworkClient frameworkClient)
throws IOException {
+ super(appId, dagId, conf, frameworkClient,
UserGroupInformation.getCurrentUser());
}
void setAMProxy(DAGClientAMProtocolBlockingPB proxy) {
@@ -477,8 +479,8 @@ public class TestDAGClient {
public DAGClientImplForTest(ApplicationId appId, String dagId,
TezConfiguration conf,
YarnConfiguration yarnConf,
- @Nullable FrameworkClient frameworkClient) {
- super(appId, dagId, conf, yarnConf, frameworkClient);
+ @Nullable FrameworkClient frameworkClient) throws IOException {
+ super(appId, dagId, conf, yarnConf, frameworkClient,
UserGroupInformation.getCurrentUser());
}
private void setRealClient(DAGClientRPCImplForTest
dagClientRpcImplForTest) {