smallzhongfeng commented on code in PR #210:
URL: https://github.com/apache/incubator-uniffle/pull/210#discussion_r980836844
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java:
##########
@@ -39,106 +45,127 @@
public class AppBalanceSelectStorageStrategy implements SelectStorageStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(AppBalanceSelectStorageStrategy.class);
- /**
- * store appId -> remote path to make sure all shuffle data of the same
application
- * will be written to the same remote storage
- */
- private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
/**
* store remote path -> application count for assignment strategy
*/
- private final Map<String, RankValue> remoteStoragePathCounter;
+ private final Map<String, RankValue> remoteStoragePathRankValue;
+ private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
-
- public AppBalanceSelectStorageStrategy() {
- this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
- this.remoteStoragePathCounter = Maps.newConcurrentMap();
- this.availableRemoteStorageInfo = Maps.newHashMap();
+ private final Configuration hdfsConf;
+ private final int fileSize;
+ private final int readAndWriteTimes;
+ private boolean remotePathIsHealthy = true;
+
+ public AppBalanceSelectStorageStrategy(
+ Map<String, RankValue> remoteStoragePathRankValue,
+ Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo,
+ Map<String, RemoteStorageInfo> availableRemoteStorageInfo,
+ CoordinatorConf conf) {
+ this.remoteStoragePathRankValue = remoteStoragePathRankValue;
+ this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo;
+ this.availableRemoteStorageInfo = availableRemoteStorageInfo;
+ this.hdfsConf = new Configuration();
+ fileSize =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+ readAndWriteTimes =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES);
}
- /**
- * the strategy of pick remote storage is according to assignment count
- */
- @Override
- public RemoteStorageInfo pickRemoteStorage(String appId) {
- if (appIdToRemoteStorageInfo.containsKey(appId)) {
- return appIdToRemoteStorageInfo.get(appId);
- }
-
- // create list for sort
- List<Map.Entry<String, RankValue>> sizeList =
-
Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull)
- .sorted(Comparator.comparingInt(entry ->
entry.getValue().getAppNum().get())).collect(Collectors.toList());
-
- for (Map.Entry<String, RankValue> entry : sizeList) {
- String storagePath = entry.getKey();
- if (availableRemoteStorageInfo.containsKey(storagePath)) {
- appIdToRemoteStorageInfo.putIfAbsent(appId,
availableRemoteStorageInfo.get(storagePath));
- incRemoteStorageCounter(storagePath);
- break;
- }
- }
- return appIdToRemoteStorageInfo.get(appId);
- }
-
- @Override
@VisibleForTesting
- public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
- RankValue counter = remoteStoragePathCounter.get(remoteStoragePath);
- if (counter != null) {
- counter.getAppNum().incrementAndGet();
- } else {
- // it may be happened when assignment remote storage
- // and refresh remote storage at the same time
- LOG.warn("Remote storage path lost during assignment: {} doesn't exist,
reset it to 1",
- remoteStoragePath);
- remoteStoragePathCounter.put(remoteStoragePath, new RankValue(1));
+ public List<Map.Entry<String, RankValue>> sortPathByRankValue(
+ String path, String test, boolean isHealthy) {
+ try {
+ FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path),
hdfsConf);
+ fs.delete(new Path(test),true);
+ if (isHealthy) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(0,
rankValue.getAppNum().get()));
+ }
+ } catch (Exception e) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE,
rankValue.getAppNum().get()));
+ LOG.error("Failed to sort, we will not use this remote path {}.", path,
e);
}
+ return
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream()
+ .filter(Objects::nonNull).collect(Collectors.toList());
}
@Override
- @VisibleForTesting
- public synchronized void decRemoteStorageCounter(String storagePath) {
- if (!StringUtils.isEmpty(storagePath)) {
- RankValue atomic = remoteStoragePathCounter.get(storagePath);
- if (atomic != null) {
- double count = atomic.getAppNum().decrementAndGet();
- if (count < 0) {
- LOG.warn("Unexpected counter for remote storage: {}, which is {},
reset to 0",
- storagePath, count);
- atomic.getAppNum().set(0);
+ public List<Map.Entry<String, RankValue>> detectStorage(String uri) {
+ if (uri.startsWith(ApplicationManager.REMOTE_PATH_SCHEMA.get(0))) {
+ setRemotePathIsHealthy(true);
+ Path remotePath = new Path(uri);
+ String rssTest = uri + "/rssTest";
+ Path testPath = new Path(rssTest);
+ try {
+ FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath,
hdfsConf);
+ for (int j = 0; j < readAndWriteTimes; j++) {
Review Comment:
For appBalance, it can indeed be changed once, but for io sampling, the
number of times can be increased, perhaps we can set the parameter to 1 by
default.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]