This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0bde6ba3 [#1109] fix(tez): Fix the user of remote storage. (#1128)
0bde6ba3 is described below
commit 0bde6ba30f59060e52a0b857c96538780c05db7b
Author: zhengchenyu <[email protected]>
AuthorDate: Thu Aug 10 13:30:33 2023 +0800
[#1109] fix(tez): Fix the user of remote storage. (#1128)
### What changes were proposed in this pull request?
tez application should use login ugi to communicate with shuffle sever,
but not applicationid ugi.
### Why are the changes needed?
Fix: #1109
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
test in real cluster.
---
.../tez/dag/app/TezRemoteShuffleManager.java | 91 +++++++++++++---------
1 file changed, 54 insertions(+), 37 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index 4b4c6eca..d1998eeb 100644
---
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -19,6 +19,7 @@ package org.apache.tez.dag.app;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.GetShuffleServerRequest;
@@ -76,6 +78,7 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
private TezRemoteShuffleUmbilicalProtocolImpl tezRemoteShuffleUmbilical;
private ShuffleWriteClient rssClient;
private String appId;
+ private UserGroupInformation requestUgi;
private RemoteStorageInfo remoteStorage;
public TezRemoteShuffleManager(
@@ -84,13 +87,15 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
Configuration conf,
String appId,
ShuffleWriteClient rssClient,
- RemoteStorageInfo remoteStorage) {
+ RemoteStorageInfo remoteStorage)
+ throws IOException {
this.tokenIdentifier = tokenIdentifier;
this.sessionToken = sessionToken;
this.conf = conf;
this.appId = appId;
this.rssClient = rssClient;
this.tezRemoteShuffleUmbilical = new
TezRemoteShuffleUmbilicalProtocolImpl();
+ this.requestUgi = UserGroupInformation.getCurrentUser();
this.remoteStorage = remoteStorage;
}
@@ -197,42 +202,54 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
try {
shuffleAssignmentsInfo =
RetryUtils.retry(
- () -> {
- ShuffleAssignmentsInfo shuffleAssignments =
- rssClient.getShuffleAssignments(
- appId,
- shuffleId,
- partitionNum,
- 1,
- Sets.newHashSet(assignmentTags),
- requiredAssignmentShuffleServersNum,
- -1);
-
- Map<ShuffleServerInfo, List<PartitionRange>>
serverToPartitionRanges =
- shuffleAssignments.getServerToPartitionRanges();
-
- if (serverToPartitionRanges == null ||
serverToPartitionRanges.isEmpty()) {
- return null;
- }
- LOG.info("Start to register shuffle");
- long start = System.currentTimeMillis();
- serverToPartitionRanges
- .entrySet()
- .forEach(
- entry ->
- rssClient.registerShuffle(
- entry.getKey(),
- appId,
- shuffleId,
- entry.getValue(),
- remoteStorage,
- ShuffleDataDistributionType.NORMAL,
- RssTezConfig.toRssConf(conf)
-
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
- LOG.info(
- "Finish register shuffle with " +
(System.currentTimeMillis() - start) + " ms");
- return shuffleAssignments;
- },
+ // When communicate with TezRemoteShuffleUmbilicalProtocol, tez
use applicationId
+ // as ugi name. In security hdfs cluster, if we communicate with
shuffle server with
+ // applicationId ugi, the user of remote storage will be
application_xxx_xx
+ // As we knonw, the max id of hadoop user is 16777215. So we
should use execute ugi.
+ () ->
+ requestUgi.doAs(
+ new PrivilegedExceptionAction<ShuffleAssignmentsInfo>() {
+ @Override
+ public ShuffleAssignmentsInfo run() throws Exception {
+ ShuffleAssignmentsInfo shuffleAssignments =
+ rssClient.getShuffleAssignments(
+ appId,
+ shuffleId,
+ partitionNum,
+ 1,
+ Sets.newHashSet(assignmentTags),
+ requiredAssignmentShuffleServersNum,
+ -1);
+
+ Map<ShuffleServerInfo, List<PartitionRange>>
serverToPartitionRanges =
+ shuffleAssignments.getServerToPartitionRanges();
+
+ if (serverToPartitionRanges == null
+ || serverToPartitionRanges.isEmpty()) {
+ return null;
+ }
+ LOG.info("Start to register shuffle");
+ long start = System.currentTimeMillis();
+ serverToPartitionRanges
+ .entrySet()
+ .forEach(
+ entry ->
+ rssClient.registerShuffle(
+ entry.getKey(),
+ appId,
+ shuffleId,
+ entry.getValue(),
+ remoteStorage,
+ ShuffleDataDistributionType.NORMAL,
+ RssTezConfig.toRssConf(conf)
+
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
+ LOG.info(
+ "Finish register shuffle with "
+ + (System.currentTimeMillis() - start)
+ + " ms");
+ return shuffleAssignments;
+ }
+ }),
retryInterval,
retryTimes);
} catch (Throwable throwable) {