This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new af3a969  TEZ-4156: Fix Tez to reuse IPC connections (Rajesh Balamohan, 
reviewed by Siddharth Seth, László Bodor, Jonathan Turner Eagles)
af3a969 is described below

commit af3a9692ee4a4689464b82db3a78c70293631dce
Author: Rajesh Balamohan <[email protected]>
AuthorDate: Wed Apr 29 03:43:49 2020 +0530

    TEZ-4156: Fix Tez to reuse IPC connections (Rajesh Balamohan, reviewed by 
Siddharth Seth, László Bodor, Jonathan Turner Eagles)
---
 .../main/java/org/apache/tez/client/TezClient.java | 24 ++++++++++++++++------
 .../java/org/apache/tez/client/TezClientUtils.java |  9 ++++----
 .../apache/tez/dag/api/client/DAGClientImpl.java   |  6 ++++--
 .../tez/dag/api/client/rpc/DAGClientRPCImpl.java   |  8 ++++++--
 .../tez/dag/api/client/rpc/TestDAGClient.java      | 16 ++++++++-------
 5 files changed, 41 insertions(+), 22 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 60c0e5e..4fb37dc 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.JavaOptsChecker;
 import org.apache.tez.common.RPCUtil;
 import org.apache.tez.common.TezCommonUtils;
@@ -154,6 +155,8 @@ public class TezClient {
 
   private ScheduledExecutorService amKeepAliveService;
 
+  private final Map<String, UserGroupInformation> ugiMap;
+
   private TezClient(String name, TezConfiguration tezConf) {
     this(name, tezConf, tezConf.getBoolean(
         TezConfiguration.TEZ_AM_SESSION_MODE, 
TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));    
@@ -197,6 +200,7 @@ public class TezClient {
       LOG.warn("The host name of the client the tez application was submitted 
from was unable to be retrieved", e);
     }
 
+    this.ugiMap = new HashMap<>();
     this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
     this.apiVersionInfo = new TezApiVersionInfo();
     this.servicePluginsDescriptor = servicePluginsDescriptor;
@@ -713,7 +717,13 @@ public class TezClient {
     return new DAGClientImpl(sessionAppId, dagId,
         amConfig.getTezConfiguration(),
         amConfig.getYarnConfiguration(),
-        frameworkClient);
+        frameworkClient, getUgi());
+  }
+
+  private UserGroupInformation getUgi() throws IOException {
+    String userName = UserGroupInformation.getCurrentUser().getUserName();
+    return ugiMap.computeIfAbsent(userName,
+        v -> UserGroupInformation.createRemoteUser(userName));
   }
 
   @VisibleForTesting
@@ -1058,7 +1068,7 @@ public class TezClient {
   protected DAGClientAMProtocolBlockingPB getAMProxy(ApplicationId appId)
       throws TezException, IOException {
     return TezClientUtils.getAMProxy(
-        frameworkClient, amConfig.getYarnConfiguration(), appId);
+        frameworkClient, amConfig.getYarnConfiguration(), appId, getUgi());
   }
 
   private DAGClientAMProtocolBlockingPB waitForProxy()
@@ -1137,7 +1147,7 @@ public class TezClient {
     // wait for dag in non-session mode to start running, so that we can start 
to getDAGStatus
     waitNonSessionTillReady();
     return getDAGClient(appId, amConfig.getTezConfiguration(), 
amConfig.getYarnConfiguration(),
-        frameworkClient);
+        frameworkClient, getUgi());
   }
 
   private ApplicationId createApplication() throws TezException, IOException {
@@ -1161,17 +1171,19 @@ public class TezClient {
 
   @Private
   static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, 
YarnConfiguration
-      yarnConf, FrameworkClient frameworkClient)
+      yarnConf, FrameworkClient frameworkClient, UserGroupInformation ugi)
       throws IOException, TezException {
     return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf,
-        yarnConf, frameworkClient);
+        yarnConf, frameworkClient, ugi);
   }
 
   @Private // Used only for MapReduce compatibility code
   static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
                                 FrameworkClient frameworkClient)
       throws IOException, TezException {
-    return getDAGClient(appId, tezConf, new YarnConfiguration(tezConf), 
frameworkClient);
+    UserGroupInformation ugi =
+        
UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName());
+    return getDAGClient(appId, tezConf, new YarnConfiguration(tezConf), 
frameworkClient, ugi);
   }
 
   // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8850ca7..d34d31e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -919,7 +919,7 @@ public class TezClientUtils {
 
   static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient yarnClient,
       Configuration conf,
-      ApplicationId applicationId) throws TezException, IOException {
+      ApplicationId applicationId, UserGroupInformation ugi) throws 
TezException, IOException {
     ApplicationReport appReport;
     try {
       appReport = yarnClient.getApplicationReport(
@@ -954,16 +954,15 @@ public class TezClientUtils {
       throw new TezException(e);
     }
     return getAMProxy(conf, appReport.getHost(),
-        appReport.getRpcPort(), appReport.getClientToAMToken());
+        appReport.getRpcPort(), appReport.getClientToAMToken(), ugi);
   }
 
   @Private
   public static DAGClientAMProtocolBlockingPB getAMProxy(final Configuration 
conf, String amHost,
-      int amRpcPort, org.apache.hadoop.yarn.api.records.Token clientToAMToken) 
throws IOException {
+      int amRpcPort, org.apache.hadoop.yarn.api.records.Token clientToAMToken,
+      UserGroupInformation userUgi) throws IOException {
 
     final InetSocketAddress serviceAddr = 
NetUtils.createSocketAddrForHost(amHost, amRpcPort);
-    UserGroupInformation userUgi = 
UserGroupInformation.createRemoteUser(UserGroupInformation
-        .getCurrentUser().getUserName());
     if (clientToAMToken != null) {
       Token<ClientToAMTokenIdentifier> token = 
ConverterUtils.convertFromYarn(clientToAMToken,
           serviceAddr);
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index 52f66e3..0b899fd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.Preconditions;
 
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@@ -79,7 +80,8 @@ public class DAGClientImpl extends DAGClient {
   private boolean cleanupFrameworkClient;
 
   public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration 
conf,
-      YarnConfiguration yarnConf, @Nullable FrameworkClient frameworkClient) {
+      YarnConfiguration yarnConf, @Nullable FrameworkClient frameworkClient,
+      UserGroupInformation ugi) {
     this.appId = appId;
     this.dagId = dagId;
     this.conf = conf;
@@ -99,7 +101,7 @@ public class DAGClientImpl extends DAGClient {
             TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) &&
         DAGClientTimelineImpl.isSupported();
 
-    realClient = new DAGClientRPCImpl(appId, dagId, conf, 
this.frameworkClient);
+    realClient = new DAGClientRPCImpl(appId, dagId, conf, 
this.frameworkClient, ugi);
     statusPollInterval = conf.getLong(
         TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,
         TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT);
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 02935df..c54058b 100644
--- 
a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ 
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.tez.common.RPCUtil;
 import org.apache.tez.dag.api.SessionNotRunning;
@@ -68,13 +69,16 @@ public class DAGClientRPCImpl extends DAGClientInternal {
   @VisibleForTesting
   DAGClientAMProtocolBlockingPB proxy = null;
 
+  private UserGroupInformation ugi;
+
   public DAGClientRPCImpl(ApplicationId appId, String dagId,
-      TezConfiguration conf, @Nullable FrameworkClient frameworkClient) {
+      TezConfiguration conf, @Nullable FrameworkClient frameworkClient, 
UserGroupInformation ugi) {
     this.appId = appId;
     this.dagId = dagId;
     this.conf = conf;
     this.frameworkClient = frameworkClient;
     appReport = null;
+    this.ugi = ugi;
   }
 
   @Override
@@ -286,7 +290,7 @@ public class DAGClientRPCImpl extends DAGClientInternal {
     }
 
     proxy = TezClientUtils.getAMProxy(conf, appReport.getHost(), 
appReport.getRpcPort(),
-        appReport.getClientToAMToken());
+        appReport.getClientToAMToken(), ugi);
     return true;
   }
 
diff --git 
a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java 
b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 70ee1d4..57087cb 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Set;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -204,7 +205,8 @@ public class TestDAGClient {
 
     TezConfiguration tezConf = new TezConfiguration();
     YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
-    dagClient = new DAGClientImpl(mockAppId, dagIdStr, tezConf,  yarnConf, 
null);
+    dagClient = new DAGClientImpl(mockAppId, dagIdStr, tezConf,  yarnConf, 
null,
+        UserGroupInformation.getCurrentUser());
     DAGClientRPCImpl realClient = 
(DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
     realClient.appReport = mockAppReport;
     realClient.proxy = mockProxy;
@@ -410,7 +412,7 @@ public class TestDAGClient {
   }
 
   @Test(timeout = 5000)
-  public void testDagClientTimelineEnabledCondition() {
+  public void testDagClientTimelineEnabledCondition() throws IOException {
     String historyLoggingClass = 
"org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
 
     testAtsEnabled(mockAppId, dagIdStr, false, "", true, true);
@@ -422,7 +424,7 @@ public class TestDAGClient {
 
   private static void testAtsEnabled(ApplicationId appId, String dagIdStr, 
boolean expected,
                                      String loggingClass, boolean 
amHistoryLoggingEnabled,
-                                     boolean dagHistoryLoggingEnabled) {
+                                     boolean dagHistoryLoggingEnabled) throws 
IOException {
     TezConfiguration tezConf = new TezConfiguration();
     YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
 
@@ -441,8 +443,8 @@ public class TestDAGClient {
 
     public DAGClientRPCImplForTest(ApplicationId appId, String dagId,
                                    TezConfiguration conf,
-                                   @Nullable FrameworkClient frameworkClient) {
-      super(appId, dagId, conf, frameworkClient);
+                                   @Nullable FrameworkClient frameworkClient) 
throws IOException {
+      super(appId, dagId, conf, frameworkClient, 
UserGroupInformation.getCurrentUser());
     }
 
     void setAMProxy(DAGClientAMProtocolBlockingPB proxy) {
@@ -477,8 +479,8 @@ public class TestDAGClient {
 
     public DAGClientImplForTest(ApplicationId appId, String dagId, 
TezConfiguration conf,
         YarnConfiguration yarnConf,
-        @Nullable FrameworkClient frameworkClient) {
-      super(appId, dagId, conf, yarnConf, frameworkClient);
+        @Nullable FrameworkClient frameworkClient) throws IOException {
+      super(appId, dagId, conf, yarnConf, frameworkClient, 
UserGroupInformation.getCurrentUser());
     }
 
     private void setRealClient(DAGClientRPCImplForTest 
dagClientRpcImplForTest) {

Reply via email to