This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3a6a9e3b [#1081] fix(tez): shuffle can not read the data which is
flushed to hdfs (#1118)
3a6a9e3b is described below
commit 3a6a9e3b4d1585208c34161d086c267ece62fb0f
Author: zhengchenyu <[email protected]>
AuthorDate: Wed Aug 9 18:58:38 2023 +0800
[#1081] fix(tez): shuffle can not read the data which is flushed to hdfs
(#1118)
### What changes were proposed in this pull request?
Apply remote storage configuration.
### Why are the changes needed?
Reduce does not load remote storage path. If shuffle data have flushed to
remote storage, reduce can not read.
Fix: #1081
### How was this patch tested?
test in cluster and UT.
---
client-tez/pom.xml | 11 ++
.../java/org/apache/tez/common/RssTezConfig.java | 2 +
.../org/apache/tez/dag/app/RssDAGAppMaster.java | 51 ++++--
.../tez/dag/app/TezRemoteShuffleManager.java | 21 +--
.../common/shuffle/impl/RssTezFetcherTask.java | 20 ++-
.../orderedgrouped/RssShuffleScheduler.java | 20 ++-
.../apache/tez/dag/app/RssDAGAppMasterTest.java | 197 +++++++++++++++++++++
.../tez/dag/app/TezRemoteShuffleManagerTest.java | 4 +-
pom.xml | 6 +
9 files changed, 293 insertions(+), 39 deletions(-)
diff --git a/client-tez/pom.xml b/client-tez/pom.xml
index efa3141a..916d30f6 100644
--- a/client-tez/pom.xml
+++ b/client-tez/pom.xml
@@ -105,6 +105,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
index 52f8cd36..cffaf7b9 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -174,6 +174,8 @@ public class RssTezConfig {
public static final String RSS_REMOTE_STORAGE_PATH =
TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;
+ public static final String RSS_REMOTE_STORAGE_CONF =
+ TEZ_RSS_CONFIG_PREFIX + "rss.remote.storage.conf";
// Whether enable test mode for the MR Client
public static final String RSS_TEST_MODE_ENABLE =
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 8249cde9..9c7e6652 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -76,6 +76,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.util.ClientUtils;
+import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -177,25 +179,29 @@ public class RssDAGAppMaster extends DAGAppMaster {
*/
public static void initAndStartRSSClient(final RssDAGAppMaster appMaster,
Configuration conf)
throws Exception {
-
- ShuffleWriteClient client = RssTezUtils.createShuffleClient(conf);
- appMaster.setShuffleWriteClient(client);
+ ShuffleWriteClient client = appMaster.getShuffleWriteClient();
+ if (client == null) {
+ client = RssTezUtils.createShuffleClient(conf);
+ appMaster.setShuffleWriteClient(client);
+ }
String coordinators = conf.get(RssTezConfig.RSS_COORDINATOR_QUORUM);
LOG.info("Registering coordinators {}", coordinators);
- client.registerCoordinators(coordinators);
+ appMaster.getShuffleWriteClient().registerCoordinators(coordinators);
String strAppAttemptId = appMaster.getAttemptID().toString();
long heartbeatInterval =
conf.getLong(
RssTezConfig.RSS_HEARTBEAT_INTERVAL,
RssTezConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
long heartbeatTimeout = conf.getLong(RssTezConfig.RSS_HEARTBEAT_TIMEOUT,
heartbeatInterval / 2);
- client.registerApplicationInfo(strAppAttemptId, heartbeatTimeout, "user");
+ appMaster
+ .getShuffleWriteClient()
+ .registerApplicationInfo(strAppAttemptId, heartbeatTimeout, "user");
appMaster.heartBeatExecutorService.scheduleAtFixedRate(
() -> {
try {
- client.sendAppHeartbeat(strAppAttemptId, heartbeatTimeout);
+
appMaster.getShuffleWriteClient().sendAppHeartbeat(strAppAttemptId,
heartbeatTimeout);
LOG.debug("Finish send heartbeat to coordinator and servers");
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
@@ -218,8 +224,32 @@ public class RssDAGAppMaster extends DAGAppMaster {
RssTezConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE));
}
- Configuration shuffleManagerConf = new Configuration(conf);
- RssTezUtils.applyDynamicClientConf(shuffleManagerConf,
appMaster.getClusterClientConf());
+ Configuration mergedConf = new Configuration(conf);
+ RssTezUtils.applyDynamicClientConf(mergedConf,
appMaster.getClusterClientConf());
+
+ // get remote storage from coordinator if necessary
+ RemoteStorageInfo defaultRemoteStorage =
+ new RemoteStorageInfo(
+ mergedConf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH, ""),
+ mergedConf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF, ""));
+ String storageType =
+ mergedConf.get(RssTezConfig.RSS_STORAGE_TYPE,
RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
+ boolean testMode =
mergedConf.getBoolean(RssTezConfig.RSS_TEST_MODE_ENABLE, false);
+ ClientUtils.validateTestModeConf(testMode, storageType);
+ RemoteStorageInfo remoteStorage =
+ ClientUtils.fetchRemoteStorage(
+ appMaster.getAppID().toString(),
+ defaultRemoteStorage,
+ dynamicConfEnabled,
+ storageType,
+ client);
+ // set the remote storage with actual value
+ appMaster
+ .getClusterClientConf()
+ .put(RssTezConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
+ appMaster
+ .getClusterClientConf()
+ .put(RssTezConfig.RSS_REMOTE_STORAGE_CONF,
remoteStorage.getConfString());
Token<JobTokenIdentifier> sessionToken =
TokenCache.getSessionToken(appMaster.getContext().getAppCredentials());
@@ -227,9 +257,10 @@ public class RssDAGAppMaster extends DAGAppMaster {
new TezRemoteShuffleManager(
appMaster.getAppID().toString(),
sessionToken,
- shuffleManagerConf,
+ mergedConf,
strAppAttemptId,
- client));
+ client,
+ remoteStorage));
appMaster.getTezRemoteShuffleManager().initialize();
appMaster.getTezRemoteShuffleManager().start();
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index 4060b552..4b4c6eca 100644
---
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -53,7 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -77,19 +76,22 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
private TezRemoteShuffleUmbilicalProtocolImpl tezRemoteShuffleUmbilical;
private ShuffleWriteClient rssClient;
private String appId;
+ private RemoteStorageInfo remoteStorage;
public TezRemoteShuffleManager(
String tokenIdentifier,
Token<JobTokenIdentifier> sessionToken,
Configuration conf,
String appId,
- ShuffleWriteClient rssClient) {
+ ShuffleWriteClient rssClient,
+ RemoteStorageInfo remoteStorage) {
this.tokenIdentifier = tokenIdentifier;
this.sessionToken = sessionToken;
this.conf = conf;
this.appId = appId;
this.rssClient = rssClient;
this.tezRemoteShuffleUmbilical = new
TezRemoteShuffleUmbilicalProtocolImpl();
+ this.remoteStorage = remoteStorage;
}
@Override
@@ -192,21 +194,6 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
}
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
- // get remote storage from coordinator if necessary
- boolean dynamicConfEnabled =
- conf.getBoolean(
- RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
- RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
- RemoteStorageInfo defaultRemoteStorage =
- new RemoteStorageInfo(conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH,
""));
- String storageType =
- conf.get(RssTezConfig.RSS_STORAGE_TYPE,
RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
- boolean testMode = conf.getBoolean(RssTezConfig.RSS_TEST_MODE_ENABLE,
false);
- ClientUtils.validateTestModeConf(testMode, storageType);
- RemoteStorageInfo remoteStorage =
- ClientUtils.fetchRemoteStorage(
- appId, defaultRemoteStorage, dynamicConfEnabled, storageType,
rssClient);
-
try {
shuffleAssignmentsInfo =
RetryUtils.retry(
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
index 7e57060f..39b60d37 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
@@ -25,7 +25,6 @@ import java.util.Objects;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.RssTezConfig;
@@ -43,6 +42,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.UnitConverter;
@@ -68,6 +68,7 @@ public class RssTezFetcherTask extends
CallableWithNdc<FetchResult> {
private String storageType;
private String basePath;
+ private RemoteStorageInfo remoteStorageInfo;
private final int readBufferSize;
private final int partitionNumPerRange;
private final int partitionNum;
@@ -117,6 +118,9 @@ public class RssTezFetcherTask extends
CallableWithNdc<FetchResult> {
this.storageType =
conf.get(RssTezConfig.RSS_STORAGE_TYPE,
RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
LOG.info("RssTezFetcherTask storageType:{}", storageType);
+ this.basePath = this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH);
+ String remoteStorageConf =
this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF);
+ this.remoteStorageInfo = new RemoteStorageInfo(basePath,
remoteStorageConf);
String readBufferSize =
conf.get(
@@ -171,7 +175,7 @@ public class RssTezFetcherTask extends
CallableWithNdc<FetchResult> {
if (!taskIdBitmap.isEmpty()) {
LOG.info(
"In reduce: " + reduceId + ", Rss Tez client starts to fetch blocks
from RSS server");
- JobConf readerJobConf = getRemoteConf();
+ Configuration hadoopConf = getRemoteConf();
LOG.info("RssTezFetcherTask storageType:{}", storageType);
boolean expectedTaskIdsBitmapFilterEnable = serverInfoSet.size() > 1;
CreateShuffleReadClientRequest request =
@@ -185,7 +189,7 @@ public class RssTezFetcherTask extends
CallableWithNdc<FetchResult> {
blockIdBitmap,
taskIdBitmap,
new ArrayList<>(serverInfoSet),
- readerJobConf,
+ hadoopConf,
expectedTaskIdsBitmapFilterEnable,
RssTezConfig.toRssConf(this.conf));
ShuffleReadClient shuffleReadClient =
@@ -209,8 +213,14 @@ public class RssTezFetcherTask extends
CallableWithNdc<FetchResult> {
public void shutdown() {}
- private JobConf getRemoteConf() {
- return new JobConf(conf);
+ private Configuration getRemoteConf() {
+ Configuration remoteConf = new Configuration(conf);
+ if (!remoteStorageInfo.isEmpty()) {
+ for (Map.Entry<String, String> entry :
remoteStorageInfo.getConfItems().entrySet()) {
+ remoteConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ return remoteConf;
}
public int getPartitionId() {
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index e6ae3ff2..89146ae9 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -61,7 +61,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.InputContextUtils;
@@ -97,6 +96,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.UnitConverter;
@@ -280,6 +280,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
private final int readBufferSize;
private final int partitionNumPerRange;
private String basePath;
+ private RemoteStorageInfo remoteStorageInfo;
private int indexReadLimit;
RssShuffleScheduler(
@@ -535,6 +536,9 @@ class RssShuffleScheduler extends ShuffleScheduler {
conf.getInt(
RssTezConfig.RSS_PARTITION_NUM_PER_RANGE,
RssTezConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
+ this.basePath = this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH);
+ String remoteStorageConf =
this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF);
+ this.remoteStorageInfo = new RemoteStorageInfo(basePath,
remoteStorageConf);
LOG.info(
"RSSShuffleScheduler running for sourceVertex: "
@@ -1785,8 +1789,14 @@ class RssShuffleScheduler extends ShuffleScheduler {
return true;
}
- private JobConf getRemoteConf() {
- return new JobConf(conf);
+ private Configuration getRemoteConf() {
+ Configuration remoteConf = new Configuration(conf);
+ if (!remoteStorageInfo.isEmpty()) {
+ for (Map.Entry<String, String> entry :
remoteStorageInfo.getConfItems().entrySet()) {
+ remoteConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ return remoteConf;
}
private synchronized void waitAndNotifyProgress() throws
InterruptedException {
@@ -1836,7 +1846,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
"In reduce: "
+ inputContext.getTaskVertexName()
+ ", Rss Tez client starts to fetch blocks from RSS server");
- JobConf readerJobConf = getRemoteConf();
+ Configuration hadoopConf = getRemoteConf();
int partitionNum = partitionToServers.size();
boolean expectedTaskIdsBitmapFilterEnable = shuffleServerInfoSet.size()
> 1;
@@ -1852,7 +1862,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
blockIdBitmap,
taskIdBitmap,
shuffleServerInfoList,
- readerJobConf,
+ hadoopConf,
new TezIdHelper(),
expectedTaskIdsBitmapFilterEnable,
RssTezConfig.toRssConf(conf));
diff --git
a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
index 55371a30..759580a9 100644
--- a/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
+++ b/client-tez/src/test/java/org/apache/tez/dag/app/RssDAGAppMasterTest.java
@@ -17,6 +17,7 @@
package org.apache.tez.dag.app;
+import java.io.File;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
@@ -25,17 +26,29 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.TezApiVersionInfo;
import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.RssTezUtils;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
@@ -45,6 +58,8 @@ import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.records.DAGProtos;
@@ -72,6 +87,9 @@ import
org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
@@ -87,6 +105,12 @@ import static org.mockito.Mockito.when;
public class RssDAGAppMasterTest {
+ private static final File TEST_DIR =
+ new File(
+ System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
+ RssDAGAppMasterTest.class.getSimpleName())
+ .getAbsoluteFile();
+
@Test
public void testHookAfterDagInited() throws Exception {
// 1 Init and mock some basic module
@@ -348,4 +372,177 @@ public class RssDAGAppMasterTest {
return null;
}
}
+
+ @Test
+ public void testFetchRemoteStorageFromDynamicConf() throws Exception {
+ final ApplicationId appId = ApplicationId.newInstance(1, 1);
+ final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
+ TezConfiguration conf = new TezConfiguration();
+
+ Credentials amCreds = new Credentials();
+ JobTokenSecretManager jtsm = new JobTokenSecretManager();
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new
Text(appId.toString()));
+ Token<JobTokenIdentifier> sessionToken = new
Token<JobTokenIdentifier>(identifier, jtsm);
+ sessionToken.setService(identifier.getJobId());
+ TokenCache.setSessionToken(sessionToken, amCreds);
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ FSDataOutputStream sessionJarsPBOutStream =
+ TezCommonUtils.createFileForAM(
+ fs, new Path(TEST_DIR.toString(),
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+
DAGProtos.PlanLocalResourcesProto.getDefaultInstance().writeDelimitedTo(sessionJarsPBOutStream);
+ sessionJarsPBOutStream.close();
+
+ RssDAGAppMaster appMaster =
+ new RssDAGAppMaster(
+ appAttemptId,
+ ContainerId.newInstance(appAttemptId, 1),
+ "127.0.0.1",
+ 0,
+ 0,
+ new SystemClock(),
+ 1,
+ true,
+ TEST_DIR.toString(),
+ new String[] {TEST_DIR.toString()},
+ new String[] {TEST_DIR.toString()},
+ new TezApiVersionInfo().getVersion(),
+ amCreds,
+ "someuser",
+ null);
+ appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(1));
+ appMaster.init(conf);
+
+ Configuration mergedConf = new Configuration(false);
+ RssTezUtils.applyDynamicClientConf(mergedConf,
appMaster.getClusterClientConf());
+ Assertions.assertEquals(4, mergedConf.size());
+ Assertions.assertEquals("hdfs://ns1/rss/",
mergedConf.get("tez.rss.remote.storage.path"));
+ Assertions.assertEquals(
+ "key1=value1,key2=value2",
mergedConf.get("tez.rss.remote.storage.conf"));
+ Assertions.assertEquals("MEMORY_LOCALFILE_HDFS",
mergedConf.get("tez.rss.storage.type"));
+ Assertions.assertEquals("testvalue",
mergedConf.get("tez.rss.test.config"));
+ }
+
+ @Test
+ public void testFetchRemoteStorageFromCoordinator() throws Exception {
+ final ApplicationId appId = ApplicationId.newInstance(1, 1);
+ final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
+ TezConfiguration conf = new TezConfiguration();
+
+ Credentials amCreds = new Credentials();
+ JobTokenSecretManager jtsm = new JobTokenSecretManager();
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new
Text(appId.toString()));
+ Token<JobTokenIdentifier> sessionToken = new
Token<JobTokenIdentifier>(identifier, jtsm);
+ sessionToken.setService(identifier.getJobId());
+ TokenCache.setSessionToken(sessionToken, amCreds);
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ FSDataOutputStream sessionJarsPBOutStream =
+ TezCommonUtils.createFileForAM(
+ fs, new Path(TEST_DIR.toString(),
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+
DAGProtos.PlanLocalResourcesProto.getDefaultInstance().writeDelimitedTo(sessionJarsPBOutStream);
+ sessionJarsPBOutStream.close();
+
+ RssDAGAppMaster appMaster =
+ new RssDAGAppMaster(
+ appAttemptId,
+ ContainerId.newInstance(appAttemptId, 1),
+ "127.0.0.1",
+ 0,
+ 0,
+ new SystemClock(),
+ 1,
+ true,
+ TEST_DIR.toString(),
+ new String[] {TEST_DIR.toString()},
+ new String[] {TEST_DIR.toString()},
+ new TezApiVersionInfo().getVersion(),
+ amCreds,
+ "someuser",
+ null);
+ appMaster.setShuffleWriteClient(new FakedShuffleWriteClient(2));
+ appMaster.init(conf);
+
+ Configuration mergedConf = new Configuration(false);
+ RssTezUtils.applyDynamicClientConf(mergedConf,
appMaster.getClusterClientConf());
+ Assertions.assertEquals(4, mergedConf.size());
+ Assertions.assertEquals("hdfs://ns2/rss/",
mergedConf.get("tez.rss.remote.storage.path"));
+ Assertions.assertEquals(
+ "key11=value11,key22=value22",
mergedConf.get("tez.rss.remote.storage.conf"));
+ Assertions.assertEquals("MEMORY_LOCALFILE_HDFS",
mergedConf.get("tez.rss.storage.type"));
+ Assertions.assertEquals("testvalue",
mergedConf.get("tez.rss.test.config"));
+ }
+
+ static class FakedShuffleWriteClient extends ShuffleWriteClientImpl {
+
+ /*
+ * Mode 1: rss.remote.storage.path and rss.remote.storage.conf is set by
dynamic config,
+ * appMaster will use this as default remote storage path.
+ * Mode 2: rss.remote.storage.path and rss.remote.storage.conf is not set
by dynamic config,
+ * appMaster will fetch remote storage conf from coordinator.
+ * */
+ private int mode;
+
+ FakedShuffleWriteClient(int mode) {
+ this("GRPC", 1, 1, 10, 1, 1, 1, false, 1, 1, 1, 1);
+ this.mode = mode;
+ }
+
+ private FakedShuffleWriteClient(
+ String clientType,
+ int retryMax,
+ long retryIntervalMax,
+ int heartBeatThreadNum,
+ int replica,
+ int replicaWrite,
+ int replicaRead,
+ boolean replicaSkipEnabled,
+ int dataTransferPoolSize,
+ int dataCommitPoolSize,
+ int unregisterThreadPoolSize,
+ int unregisterRequestTimeSec) {
+ super(
+ clientType,
+ retryMax,
+ retryIntervalMax,
+ heartBeatThreadNum,
+ replica,
+ replicaWrite,
+ replicaRead,
+ replicaSkipEnabled,
+ dataTransferPoolSize,
+ dataCommitPoolSize,
+ unregisterThreadPoolSize,
+ unregisterRequestTimeSec);
+ }
+
+ @Override
+ public void registerCoordinators(String coordinators) {}
+
+ @Override
+ public Map<String, String> fetchClientConf(int timeoutMs) {
+ Map<String, String> clientConf = new HashMap();
+ if (mode == 1) {
+ clientConf.put("rss.remote.storage.path", "hdfs://ns1/rss/");
+ clientConf.put("rss.remote.storage.conf", "key1=value1,key2=value2");
+ clientConf.put("rss.storage.type", "MEMORY_LOCALFILE_HDFS");
+ clientConf.put("rss.test.config", "testvalue");
+ } else if (mode == 2) {
+ clientConf.put("rss.storage.type", "MEMORY_LOCALFILE_HDFS");
+ clientConf.put("rss.test.config", "testvalue");
+ } else {
+ throw new RssException("Wrong test mode.");
+ }
+ return clientConf;
+ }
+
+ @Override
+ public RemoteStorageInfo fetchRemoteStorage(String appId) {
+ if (mode == 2) {
+ return new RemoteStorageInfo("hdfs://ns2/rss/",
"key11=value11,key22=value22");
+ } else {
+ throw new RssException("Wrong test mode.");
+ }
+ }
+ }
}
diff --git
a/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
b/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
index c9721893..16dc7b14 100644
---
a/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
+++
b/client-tez/src/test/java/org/apache/tez/dag/app/TezRemoteShuffleManagerTest.java
@@ -116,7 +116,7 @@ public class TezRemoteShuffleManagerTest {
secretManager.addTokenForJob(tokenIdentifier, sessionToken);
TezRemoteShuffleManager tezRemoteShuffleManager =
new TezRemoteShuffleManager(
- appId.toString(), sessionToken, conf, appId.toString(), client);
+ appId.toString(), sessionToken, conf, appId.toString(), client,
null);
tezRemoteShuffleManager.initialize();
tezRemoteShuffleManager.start();
@@ -224,7 +224,7 @@ public class TezRemoteShuffleManagerTest {
secretManager.addTokenForJob(tokenIdentifier, sessionToken);
TezRemoteShuffleManager tezRemoteShuffleManager =
new TezRemoteShuffleManager(
- appId.toString(), sessionToken, conf, appId.toString(), client);
+ appId.toString(), sessionToken, conf, appId.toString(), client,
null);
tezRemoteShuffleManager.initialize();
tezRemoteShuffleManager.start();
diff --git a/pom.xml b/pom.xml
index 2ac91097..16cdad06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1977,6 +1977,12 @@
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>