This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bde6ba3 [#1109] fix(tez): Fix the user of remote storage. (#1128)
0bde6ba3 is described below

commit 0bde6ba30f59060e52a0b857c96538780c05db7b
Author: zhengchenyu <[email protected]>
AuthorDate: Thu Aug 10 13:30:33 2023 +0800

    [#1109] fix(tez): Fix the user of remote storage. (#1128)
    
    ### What changes were proposed in this pull request?
    
    tez application should use login ugi to  communicate with shuffle sever, 
but not applicationid ugi.
    
    ### Why are the changes needed?
    
    Fix: #1109
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    test in real cluster.
---
 .../tez/dag/app/TezRemoteShuffleManager.java       | 91 +++++++++++++---------
 1 file changed, 54 insertions(+), 37 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index 4b4c6eca..d1998eeb 100644
--- 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -19,6 +19,7 @@ package org.apache.tez.dag.app;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.GetShuffleServerRequest;
@@ -76,6 +78,7 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
   private TezRemoteShuffleUmbilicalProtocolImpl tezRemoteShuffleUmbilical;
   private ShuffleWriteClient rssClient;
   private String appId;
+  private UserGroupInformation requestUgi;
   private RemoteStorageInfo remoteStorage;
 
   public TezRemoteShuffleManager(
@@ -84,13 +87,15 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
       Configuration conf,
       String appId,
       ShuffleWriteClient rssClient,
-      RemoteStorageInfo remoteStorage) {
+      RemoteStorageInfo remoteStorage)
+      throws IOException {
     this.tokenIdentifier = tokenIdentifier;
     this.sessionToken = sessionToken;
     this.conf = conf;
     this.appId = appId;
     this.rssClient = rssClient;
     this.tezRemoteShuffleUmbilical = new 
TezRemoteShuffleUmbilicalProtocolImpl();
+    this.requestUgi = UserGroupInformation.getCurrentUser();
     this.remoteStorage = remoteStorage;
   }
 
@@ -197,42 +202,54 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
     try {
       shuffleAssignmentsInfo =
           RetryUtils.retry(
-              () -> {
-                ShuffleAssignmentsInfo shuffleAssignments =
-                    rssClient.getShuffleAssignments(
-                        appId,
-                        shuffleId,
-                        partitionNum,
-                        1,
-                        Sets.newHashSet(assignmentTags),
-                        requiredAssignmentShuffleServersNum,
-                        -1);
-
-                Map<ShuffleServerInfo, List<PartitionRange>> 
serverToPartitionRanges =
-                    shuffleAssignments.getServerToPartitionRanges();
-
-                if (serverToPartitionRanges == null || 
serverToPartitionRanges.isEmpty()) {
-                  return null;
-                }
-                LOG.info("Start to register shuffle");
-                long start = System.currentTimeMillis();
-                serverToPartitionRanges
-                    .entrySet()
-                    .forEach(
-                        entry ->
-                            rssClient.registerShuffle(
-                                entry.getKey(),
-                                appId,
-                                shuffleId,
-                                entry.getValue(),
-                                remoteStorage,
-                                ShuffleDataDistributionType.NORMAL,
-                                RssTezConfig.toRssConf(conf)
-                                    
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
-                LOG.info(
-                    "Finish register shuffle with " + 
(System.currentTimeMillis() - start) + " ms");
-                return shuffleAssignments;
-              },
+              // When communicate with TezRemoteShuffleUmbilicalProtocol, tez 
use applicationId
+              // as ugi name. In security hdfs cluster, if we communicate with 
shuffle server with
+              // applicationId ugi, the user of remote storage will be 
application_xxx_xx
+              // As we knonw, the max id of hadoop user is 16777215. So we 
should use execute ugi.
+              () ->
+                  requestUgi.doAs(
+                      new PrivilegedExceptionAction<ShuffleAssignmentsInfo>() {
+                        @Override
+                        public ShuffleAssignmentsInfo run() throws Exception {
+                          ShuffleAssignmentsInfo shuffleAssignments =
+                              rssClient.getShuffleAssignments(
+                                  appId,
+                                  shuffleId,
+                                  partitionNum,
+                                  1,
+                                  Sets.newHashSet(assignmentTags),
+                                  requiredAssignmentShuffleServersNum,
+                                  -1);
+
+                          Map<ShuffleServerInfo, List<PartitionRange>> 
serverToPartitionRanges =
+                              shuffleAssignments.getServerToPartitionRanges();
+
+                          if (serverToPartitionRanges == null
+                              || serverToPartitionRanges.isEmpty()) {
+                            return null;
+                          }
+                          LOG.info("Start to register shuffle");
+                          long start = System.currentTimeMillis();
+                          serverToPartitionRanges
+                              .entrySet()
+                              .forEach(
+                                  entry ->
+                                      rssClient.registerShuffle(
+                                          entry.getKey(),
+                                          appId,
+                                          shuffleId,
+                                          entry.getValue(),
+                                          remoteStorage,
+                                          ShuffleDataDistributionType.NORMAL,
+                                          RssTezConfig.toRssConf(conf)
+                                              
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
+                          LOG.info(
+                              "Finish register shuffle with "
+                                  + (System.currentTimeMillis() - start)
+                                  + " ms");
+                          return shuffleAssignments;
+                        }
+                      }),
               retryInterval,
               retryTimes);
     } catch (Throwable throwable) {

Reply via email to