This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a6a9e3b [#1081] fix(tez): shuffle can not read the data which is 
flushed to hdfs (#1118)
3a6a9e3b is described below

commit 3a6a9e3b4d1585208c34161d086c267ece62fb0f
Author: zhengchenyu <[email protected]>
AuthorDate: Wed Aug 9 18:58:38 2023 +0800

    [#1081] fix(tez): shuffle can not read the data which is flushed to hdfs 
(#1118)
    
    ### What changes were proposed in this pull request?
    
    Apply remote storage configuration.
    
    ### Why are the changes needed?
    
    Reduce does not load remote storage path. If shuffle data have flushed to 
remote storage, reduce can not read.
    
    Fix: #1081
    
    ### How was this patch tested?
    
    test in cluster and UT.
---
 client-tez/pom.xml                                 |  11 ++
 .../java/org/apache/tez/common/RssTezConfig.java   |   2 +
 .../org/apache/tez/dag/app/RssDAGAppMaster.java    |  51 ++++--
 .../tez/dag/app/TezRemoteShuffleManager.java       |  21 +--
 .../common/shuffle/impl/RssTezFetcherTask.java     |  20 ++-
 .../orderedgrouped/RssShuffleScheduler.java        |  20 ++-
 .../apache/tez/dag/app/RssDAGAppMasterTest.java    | 197 +++++++++++++++++++++
 .../tez/dag/app/TezRemoteShuffleManagerTest.java   |   4 +-
 pom.xml                                            |   6 +
 9 files changed, 293 insertions(+), 39 deletions(-)

diff --git a/client-tez/pom.xml b/client-tez/pom.xml
index efa3141a..916d30f6 100644
--- a/client-tez/pom.xml
+++ b/client-tez/pom.xml
@@ -105,6 +105,17 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-client</artifactId>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     </dependencies>
 
     <build>
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java 
b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
index 52f8cd36..cffaf7b9 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -174,6 +174,8 @@ public class RssTezConfig {
 
   public static final String RSS_REMOTE_STORAGE_PATH =
       TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
+  public static final String RSS_REMOTE_STORAGE_CONF =
+      TEZ_RSS_CONFIG_PREFIX + "rss.remote.storage.conf";
 
   // Whether enable test mode for the MR Client
   public static final String RSS_TEST_MODE_ENABLE =
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 8249cde9..9c7e6652 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -76,6 +76,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.util.ClientUtils;
+import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -177,25 +179,29 @@ public class RssDAGAppMaster extends DAGAppMaster {
    */
   public static void initAndStartRSSClient(final RssDAGAppMaster appMaster, 
Configuration conf)
       throws Exception {
-
-    ShuffleWriteClient client = RssTezUtils.createShuffleClient(conf);
-    appMaster.setShuffleWriteClient(client);
+    ShuffleWriteClient client = appMaster.getShuffleWriteClient();
+    if (client == null) {
+      client = RssTezUtils.createShuffleClient(conf);
+      appMaster.setShuffleWriteClient(client);
+    }
 
     String coordinators = conf.get(RssTezConfig.RSS_COORDINATOR_QUORUM);
     LOG.info("Registering coordinators {}", coordinators);
-    client.registerCoordinators(coordinators);
+    appMaster.getShuffleWriteClient().registerCoordinators(coordinators);
 
     String strAppAttemptId = appMaster.getAttemptID().toString();
     long heartbeatInterval =
         conf.getLong(
             RssTezConfig.RSS_HEARTBEAT_INTERVAL, 
RssTezConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
     long heartbeatTimeout = conf.getLong(RssTezConfig.RSS_HEARTBEAT_TIMEOUT, 
heartbeatInterval / 2);
-    client.registerApplicationInfo(strAppAttemptId, heartbeatTimeout, "user");
+    appMaster
+        .getShuffleWriteClient()
+        .registerApplicationInfo(strAppAttemptId, heartbeatTimeout, "user");
 
     appMaster.heartBeatExecutorService.scheduleAtFixedRate(
         () -> {
           try {
-            client.sendAppHeartbeat(strAppAttemptId, heartbeatTimeout);
+            
appMaster.getShuffleWriteClient().sendAppHeartbeat(strAppAttemptId, 
heartbeatTimeout);
             LOG.debug("Finish send heartbeat to coordinator and servers");
           } catch (Exception e) {
             LOG.warn("Fail to send heartbeat to coordinator and servers", e);
@@ -218,8 +224,32 @@ public class RssDAGAppMaster extends DAGAppMaster {
                   RssTezConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE));
     }
 
-    Configuration shuffleManagerConf = new Configuration(conf);
-    RssTezUtils.applyDynamicClientConf(shuffleManagerConf, 
appMaster.getClusterClientConf());
+    Configuration mergedConf = new Configuration(conf);
+    RssTezUtils.applyDynamicClientConf(mergedConf, 
appMaster.getClusterClientConf());
+
+    // get remote storage from coordinator if necessary
+    RemoteStorageInfo defaultRemoteStorage =
+        new RemoteStorageInfo(
+            mergedConf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH, ""),
+            mergedConf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF, ""));
+    String storageType =
+        mergedConf.get(RssTezConfig.RSS_STORAGE_TYPE, 
RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
+    boolean testMode = 
mergedConf.getBoolean(RssTezConfig.RSS_TEST_MODE_ENABLE, false);
+    ClientUtils.validateTestModeConf(testMode, storageType);
+    RemoteStorageInfo remoteStorage =
+        ClientUtils.fetchRemoteStorage(
+            appMaster.getAppID().toString(),
+            defaultRemoteStorage,
+            dynamicConfEnabled,
+            storageType,
+            client);
+    // set the remote storage with actual value
+    appMaster
+        .getClusterClientConf()
+        .put(RssTezConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
+    appMaster
+        .getClusterClientConf()
+        .put(RssTezConfig.RSS_REMOTE_STORAGE_CONF, 
remoteStorage.getConfString());
 
     Token<JobTokenIdentifier> sessionToken =
         TokenCache.getSessionToken(appMaster.getContext().getAppCredentials());
@@ -227,9 +257,10 @@ public class RssDAGAppMaster extends DAGAppMaster {
         new TezRemoteShuffleManager(
             appMaster.getAppID().toString(),
             sessionToken,
-            shuffleManagerConf,
+            mergedConf,
             strAppAttemptId,
-            client));
+            client,
+            remoteStorage));
     appMaster.getTezRemoteShuffleManager().initialize();
     appMaster.getTezRemoteShuffleManager().start();
 
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index 4060b552..4b4c6eca 100644
--- 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -53,7 +53,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleWriteClient;
-import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -77,19 +76,22 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
   private TezRemoteShuffleUmbilicalProtocolImpl tezRemoteShuffleUmbilical;
   private ShuffleWriteClient rssClient;
   private String appId;
+  private RemoteStorageInfo remoteStorage;
 
   public TezRemoteShuffleManager(
       String tokenIdentifier,
       Token<JobTokenIdentifier> sessionToken,
       Configuration conf,
       String appId,
-      ShuffleWriteClient rssClient) {
+      ShuffleWriteClient rssClient,
+      RemoteStorageInfo remoteStorage) {
     this.tokenIdentifier = tokenIdentifier;
     this.sessionToken = sessionToken;
     this.conf = conf;
     this.appId = appId;
     this.rssClient = rssClient;
     this.tezRemoteShuffleUmbilical = new 
TezRemoteShuffleUmbilicalProtocolImpl();
+    this.remoteStorage = remoteStorage;
   }
 
   @Override
@@ -192,21 +194,6 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
     }
     assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
 
-    // get remote storage from coordinator if necessary
-    boolean dynamicConfEnabled =
-        conf.getBoolean(
-            RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
-            RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
-    RemoteStorageInfo defaultRemoteStorage =
-        new RemoteStorageInfo(conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH, 
""));
-    String storageType =
-        conf.get(RssTezConfig.RSS_STORAGE_TYPE, 
RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
-    boolean testMode = conf.getBoolean(RssTezConfig.RSS_TEST_MODE_ENABLE, 
false);
-    ClientUtils.validateTestModeConf(testMode, storageType);
-    RemoteStorageInfo remoteStorage =
-        ClientUtils.fetchRemoteStorage(
-            appId, defaultRemoteStorage, dynamicConfEnabled, storageType, 
rssClient);
-
     try {
       shuffleAssignmentsInfo =
           RetryUtils.retry(
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
index 7e57060f..39b60d37 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
@@ -25,7 +25,6 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.RssTezConfig;
@@ -43,6 +42,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.UnitConverter;
 
@@ -68,6 +68,7 @@ public class RssTezFetcherTask extends 
CallableWithNdc<FetchResult> {
 
   private String storageType;
   private String basePath;
+  private RemoteStorageInfo remoteStorageInfo;
   private final int readBufferSize;
   private final int partitionNumPerRange;
   private final int partitionNum;
@@ -117,6 +118,9 @@ public class RssTezFetcherTask extends 
CallableWithNdc<FetchResult> {
     this.storageType =
         conf.get(RssTezConfig.RSS_STORAGE_TYPE, 
RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
     LOG.info("RssTezFetcherTask storageType:{}", storageType);
+    this.basePath = this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH);
+    String remoteStorageConf = 
this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF);
+    this.remoteStorageInfo = new RemoteStorageInfo(basePath, 
remoteStorageConf);
 
     String readBufferSize =
         conf.get(
@@ -171,7 +175,7 @@ public class RssTezFetcherTask extends 
CallableWithNdc<FetchResult> {
     if (!taskIdBitmap.isEmpty()) {
       LOG.info(
           "In reduce: " + reduceId + ", Rss Tez client starts to fetch blocks 
from RSS server");
-      JobConf readerJobConf = getRemoteConf();
+      Configuration hadoopConf = getRemoteConf();
       LOG.info("RssTezFetcherTask storageType:{}", storageType);
       boolean expectedTaskIdsBitmapFilterEnable = serverInfoSet.size() > 1;
       CreateShuffleReadClientRequest request =
@@ -185,7 +189,7 @@ public class RssTezFetcherTask extends 
CallableWithNdc<FetchResult> {
               blockIdBitmap,
               taskIdBitmap,
               new ArrayList<>(serverInfoSet),
-              readerJobConf,
+              hadoopConf,
               expectedTaskIdsBitmapFilterEnable,
               RssTezConfig.toRssConf(this.conf));
       ShuffleReadClient shuffleReadClient =
@@ -209,8 +213,14 @@ public class RssTezFetcherTask extends 
CallableWithNdc<FetchResult> {
 
   public void shutdown() {}
 
-  private JobConf getRemoteConf() {
-    return new JobConf(conf);
+  private Configuration getRemoteConf() {
+    Configuration remoteConf = new Configuration(conf);
+    if (!remoteStorageInfo.isEmpty()) {
+      for (Map.Entry<String, String> entry : 
remoteStorageInfo.getConfItems().entrySet()) {
+        remoteConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return remoteConf;
   }
 
   public int getPartitionId() {
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index e6ae3ff2..89146ae9 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -61,7 +61,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.InputContextUtils;
@@ -97,6 +96,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.UnitConverter;
@@ -280,6 +280,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
   private final int readBufferSize;
   private final int partitionNumPerRange;
   private String basePath;
+  private RemoteStorageInfo remoteStorageInfo;
   private int indexReadLimit;
 
   RssShuffleScheduler(
@@ -535,6 +536,9 @@ class RssShuffleScheduler extends ShuffleScheduler {
         conf.getInt(
             RssTezConfig.RSS_PARTITION_NUM_PER_RANGE,
             RssTezConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
+    this.basePath = this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH);
+    String remoteStorageConf = 
this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF);
+    this.remoteStorageInfo = new RemoteStorageInfo(basePath, 
remoteStorageConf);
 
     LOG.info(
         "RSSShuffleScheduler running for sourceVertex: "
@@ -1785,8 +1789,14 @@ class RssShuffleScheduler extends ShuffleScheduler {
     return true;
   }
 
-  private JobConf getRemoteConf() {
-    return new JobConf(conf);
+  private Configuration getRemoteConf() {
+    Configuration remoteConf = new Configuration(conf);
+    if (!remoteStorageInfo.isEmpty()) {
+      for (Map.Entry<String, String> entry : 
remoteStorageInfo.getConfItems().entrySet()) {
+        remoteConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return remoteConf;
   }
 
   private synchronized void waitAndNotifyProgress() throws 
InterruptedException {
@@ -1836,7 +1846,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
           "In reduce: "
               + inputContext.getTaskVertexName()
               + ", Rss Tez client starts to fetch blocks from RSS server");
-      JobConf readerJobConf = getRemoteConf();
+      Configuration hadoopConf = getRemoteConf();
 
       int partitionNum = partitionToServers.size();
       boolean expectedTaskIdsBitmapFilterEnable = shuffleServerInfoSet.size() 
> 1;
@@ -1852,7 +1862,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
               blockIdBitmap,
               taskIdBitmap,
               shuffleServerInfoList,
-              readerJobConf,
+              hadoopConf,
               new TezIdHelper(),
               expectedTaskIdsBitmapFilterEnable,
               RssTezConfig.toRssConf(conf));
diff --git 
a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java 
b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
index 55371a30..759580a9 100644
--- a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
+++ b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.tez.dag.app;
 
+import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.List;
@@ -25,17 +26,29 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.TezApiVersionInfo;
 import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
@@ -45,6 +58,8 @@ import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.records.DAGProtos;
@@ -72,6 +87,9 @@ import 
org.apache.tez.runtime.library.processor.SimpleProcessor;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static 
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
@@ -87,6 +105,12 @@ import static org.mockito.Mockito.when;
 
 public class RssDAGAppMasterTest {
 
+  private static final File TEST_DIR =
+      new File(
+              System.getProperty("test.build.data", 
System.getProperty("java.io.tmpdir")),
+              RssDAGAppMasterTest.class.getSimpleName())
+          .getAbsoluteFile();
+
   @Test
   public void testHookAfterDagInited() throws Exception {
     // 1 Init and mock some basic module
@@ -348,4 +372,177 @@ public class RssDAGAppMasterTest {
       return null;
     }
   }
+
+  @Test
+  public void testFetchRemoteStorageFromDynamicConf() throws Exception {
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    final ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(appId, 1);
+    TezConfiguration conf = new TezConfiguration();
+
+    Credentials amCreds = new Credentials();
+    JobTokenSecretManager jtsm = new JobTokenSecretManager();
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new 
Text(appId.toString()));
+    Token<JobTokenIdentifier> sessionToken = new 
Token<JobTokenIdentifier>(identifier, jtsm);
+    sessionToken.setService(identifier.getJobId());
+    TokenCache.setSessionToken(sessionToken, amCreds);
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    FSDataOutputStream sessionJarsPBOutStream =
+        TezCommonUtils.createFileForAM(
+            fs, new Path(TEST_DIR.toString(), 
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+    
DAGProtos.PlanLocalResourcesProto.getDefaultInstance().writeDelimitedTo(sessionJarsPBOutStream);
+    sessionJarsPBOutStream.close();
+
+    RssDAGAppMaster appMaster =
+        new RssDAGAppMaster(
+            appAttemptId,
+            ContainerId.newInstance(appAttemptId, 1),
+            "127.0.0.1",
+            0,
+            0,
+            new SystemClock(),
+            1,
+            true,
+            TEST_DIR.toString(),
+            new String[] {TEST_DIR.toString()},
+            new String[] {TEST_DIR.toString()},
+            new TezApiVersionInfo().getVersion(),
+            amCreds,
+            "someuser",
+            null);
+    appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(1));
+    appMaster.init(conf);
+
+    Configuration mergedConf = new Configuration(false);
+    RssTezUtils.applyDynamicClientConf(mergedConf, 
appMaster.getClusterClientConf());
+    Assertions.assertEquals(4, mergedConf.size());
+    Assertions.assertEquals("hdfs://ns1/rss/", 
mergedConf.get("tez.rss.remote.storage.path"));
+    Assertions.assertEquals(
+        "key1=value1,key2=value2", 
mergedConf.get("tez.rss.remote.storage.conf"));
+    Assertions.assertEquals("MEMORY_LOCALFILE_HDFS", 
mergedConf.get("tez.rss.storage.type"));
+    Assertions.assertEquals("testvalue", 
mergedConf.get("tez.rss.test.config"));
+  }
+
+  @Test
+  public void testFetchRemoteStorageFromCoordinator() throws Exception {
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    final ApplicationAttemptId appAttemptId = 
ApplicationAttemptId.newInstance(appId, 1);
+    TezConfiguration conf = new TezConfiguration();
+
+    Credentials amCreds = new Credentials();
+    JobTokenSecretManager jtsm = new JobTokenSecretManager();
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new 
Text(appId.toString()));
+    Token<JobTokenIdentifier> sessionToken = new 
Token<JobTokenIdentifier>(identifier, jtsm);
+    sessionToken.setService(identifier.getJobId());
+    TokenCache.setSessionToken(sessionToken, amCreds);
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    FSDataOutputStream sessionJarsPBOutStream =
+        TezCommonUtils.createFileForAM(
+            fs, new Path(TEST_DIR.toString(), 
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+    
DAGProtos.PlanLocalResourcesProto.getDefaultInstance().writeDelimitedTo(sessionJarsPBOutStream);
+    sessionJarsPBOutStream.close();
+
+    RssDAGAppMaster appMaster =
+        new RssDAGAppMaster(
+            appAttemptId,
+            ContainerId.newInstance(appAttemptId, 1),
+            "127.0.0.1",
+            0,
+            0,
+            new SystemClock(),
+            1,
+            true,
+            TEST_DIR.toString(),
+            new String[] {TEST_DIR.toString()},
+            new String[] {TEST_DIR.toString()},
+            new TezApiVersionInfo().getVersion(),
+            amCreds,
+            "someuser",
+            null);
+    appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(2));
+    appMaster.init(conf);
+
+    Configuration mergedConf = new Configuration(false);
+    RssTezUtils.applyDynamicClientConf(mergedConf, 
appMaster.getClusterClientConf());
+    Assertions.assertEquals(4, mergedConf.size());
+    Assertions.assertEquals("hdfs://ns2/rss/", 
mergedConf.get("tez.rss.remote.storage.path"));
+    Assertions.assertEquals(
+        "key11=value11,key22=value22", 
mergedConf.get("tez.rss.remote.storage.conf"));
+    Assertions.assertEquals("MEMORY_LOCALFILE_HDFS", 
mergedConf.get("tez.rss.storage.type"));
+    Assertions.assertEquals("testvalue", 
mergedConf.get("tez.rss.test.config"));
+  }
+
+  static class FakedShuffleWriteClient extends ShuffleWriteClientImpl {
+
+    /*
+     * Mode 1: rss.remote.storage.path and rss.remote.storage.conf is set by 
dynamic config,
+     *         appMaster will use this as default remote storage path.
+     * Mode 2: rss.remote.storage.path and rss.remote.storage.conf is not set 
by dynamic config,
+     *         appMaster will fetch remote storage conf from coordinator.
+     * */
+    private int mode;
+
+    FakedShuffleWriteClient(int mode) {
+      this("GRPC", 1, 1, 10, 1, 1, 1, false, 1, 1, 1, 1);
+      this.mode = mode;
+    }
+
+    private FakedShuffleWriteClient(
+        String clientType,
+        int retryMax,
+        long retryIntervalMax,
+        int heartBeatThreadNum,
+        int replica,
+        int replicaWrite,
+        int replicaRead,
+        boolean replicaSkipEnabled,
+        int dataTransferPoolSize,
+        int dataCommitPoolSize,
+        int unregisterThreadPoolSize,
+        int unregisterRequestTimeSec) {
+      super(
+          clientType,
+          retryMax,
+          retryIntervalMax,
+          heartBeatThreadNum,
+          replica,
+          replicaWrite,
+          replicaRead,
+          replicaSkipEnabled,
+          dataTransferPoolSize,
+          dataCommitPoolSize,
+          unregisterThreadPoolSize,
+          unregisterRequestTimeSec);
+    }
+
+    @Override
+    public void registerCoordinators(String coordinators) {}
+
+    @Override
+    public Map<String, String> fetchClientConf(int timeoutMs) {
+      Map<String, String> clientConf = new HashMap();
+      if (mode == 1) {
+        clientConf.put("rss.remote.storage.path", "hdfs://ns1/rss/");
+        clientConf.put("rss.remote.storage.conf", "key1=value1,key2=value2");
+        clientConf.put("rss.storage.type", "MEMORY_LOCALFILE_HDFS");
+        clientConf.put("rss.test.config", "testvalue");
+      } else if (mode == 2) {
+        clientConf.put("rss.storage.type", "MEMORY_LOCALFILE_HDFS");
+        clientConf.put("rss.test.config", "testvalue");
+      } else {
+        throw new RssException("Wrong test mode.");
+      }
+      return clientConf;
+    }
+
+    @Override
+    public RemoteStorageInfo fetchRemoteStorage(String appId) {
+      if (mode == 2) {
+        return new RemoteStorageInfo("hdfs://ns2/rss/", 
"key11=value11,key22=value22");
+      } else {
+        throw new RssException("Wrong test mode.");
+      }
+    }
+  }
 }
diff --git 
a/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
index c9721893..16dc7b14 100644
--- 
a/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
@@ -116,7 +116,7 @@ public class TezRemoteShuffleManagerTest {
       secretManager.addTokenForJob(tokenIdentifier, sessionToken);
       TezRemoteShuffleManager tezRemoteShuffleManager =
           new TezRemoteShuffleManager(
-              appId.toString(), sessionToken, conf, appId.toString(), client);
+              appId.toString(), sessionToken, conf, appId.toString(), client, 
null);
       tezRemoteShuffleManager.initialize();
       tezRemoteShuffleManager.start();
 
@@ -224,7 +224,7 @@ public class TezRemoteShuffleManagerTest {
       secretManager.addTokenForJob(tokenIdentifier, sessionToken);
       TezRemoteShuffleManager tezRemoteShuffleManager =
           new TezRemoteShuffleManager(
-              appId.toString(), sessionToken, conf, appId.toString(), client);
+              appId.toString(), sessionToken, conf, appId.toString(), client, 
null);
       tezRemoteShuffleManager.initialize();
       tezRemoteShuffleManager.start();
 
diff --git a/pom.xml b/pom.xml
index 2ac91097..16cdad06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1977,6 +1977,12 @@
             <artifactId>hadoop-yarn-api</artifactId>
             <version>${hadoop.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+          </dependency>
           <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-yarn-server-common</artifactId>

Reply via email to