lifeSo commented on code in PR #890:
URL: https://github.com/apache/incubator-uniffle/pull/890#discussion_r1198476549


##########
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java:
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.Constants;
+
+public class RssTezUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
+  private static final int MAX_ATTEMPT_LENGTH = 6;
+  private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+
+  public static final String HOST_NAME = "hostname";
+
+  public static final String UNDERLINE_DELIMITER = "_";
+
+  public static final String COLON_DELIMITER = ":";
+  public static final String COMMA_DELIMITER = ",";
+
+  private RssTezUtils(){
+  }
+
+  public static ShuffleWriteClient createShuffleClient(Configuration conf) {
+    int heartBeatThreadNum = 
conf.getInt(RssTezConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
+        RssTezConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+    int retryMax = conf.getInt(RssTezConfig.RSS_CLIENT_RETRY_MAX,
+        RssTezConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+    long retryIntervalMax = 
conf.getLong(RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+        RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
+    String clientType = conf.get(RssTezConfig.RSS_CLIENT_TYPE,
+        RssTezConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
+    int replicaWrite = conf.getInt(RssTezConfig.RSS_DATA_REPLICA_WRITE,
+        RssTezConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
+    int replicaRead = conf.getInt(RssTezConfig.RSS_DATA_REPLICA_READ,
+        RssTezConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+    int replica = conf.getInt(RssTezConfig.RSS_DATA_REPLICA,
+        RssTezConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
+    boolean replicaSkipEnabled = 
conf.getBoolean(RssTezConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
+        RssTezConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
+    int dataTransferPoolSize = 
conf.getInt(RssTezConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+        RssTezConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+    int dataCommitPoolSize = 
conf.getInt(RssTezConfig.RSS_DATA_COMMIT_POOL_SIZE,
+        RssTezConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
+    ShuffleWriteClient client = ShuffleClientFactory
+        .getInstance()
+        .createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
+            heartBeatThreadNum, replica, replicaWrite, replicaRead, 
replicaSkipEnabled,
+            dataTransferPoolSize, dataCommitPoolSize);
+    return client;
+  }
+
+  public static long getInitialMemoryRequirement(Configuration conf, long 
maxAvailableTaskMemory) {
+    long initialMemRequestMb = 
conf.getLong(RssTezConfig.RSS_RUNTIME_IO_SORT_MB,
+        RssTezConfig.RSS_DEFAULT_RUNTIME_IO_SORT_MB);
+
+    LOG.info("initialMemRequestMb is {}", initialMemRequestMb);
+    LOG.info("maxAvailableTaskMemory is {}", maxAvailableTaskMemory);
+    long reqBytes = initialMemRequestMb << 20;
+    Preconditions.checkArgument(initialMemRequestMb > 0 && reqBytes < 
maxAvailableTaskMemory,
+            RssTezConfig.RSS_RUNTIME_IO_SORT_MB + initialMemRequestMb
+                    + " should be " + "larger than 0 and should be less than 
the available task memory (MB):"
+                + (maxAvailableTaskMemory >> 20));
+    LOG.info("Requested BufferSize (" + 
TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB
+            + ") : " + initialMemRequestMb);
+    return reqBytes;
+  }
+
+  public static String uniformPartitionHostInfo(Map<Integer, 
List<ShuffleServerInfo>> map) {
+    List<String> res = new ArrayList<>();
+    String tmp;
+    Set<Integer> pidSet = map.keySet();
+    for (Integer pid : pidSet) {
+      for (ShuffleServerInfo shuffleServerInfo : map.get(pid)) {
+        tmp = pid + UNDERLINE_DELIMITER + shuffleServerInfo.getHost() + 
COLON_DELIMITER + shuffleServerInfo.getPort();
+        res.add(tmp);
+      }
+    }
+    String str = org.apache.commons.lang.StringUtils.join(res, 
COMMA_DELIMITER);
+
+    return str;
+  }
+
+  public static ApplicationAttemptId getApplicationAttemptId() {
+    String containerIdStr =
+        System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+    ContainerId containerId = ContainerId.fromString(containerIdStr);
+    return containerId.getApplicationAttemptId();
+  }
+
+
+
+  public static String uniqueIdentifierToAttemptId(String uniqueIdentifier) {
+    if (uniqueIdentifier == null) {
+      throw new RuntimeException("uniqueIdentifier should not be null");
+    }
+    String[] ids = uniqueIdentifier.split("_");
+    return StringUtils.join(ids, "_", 0, 7);
+  }
+
+
+
+  public static void applyDynamicClientConf(Configuration jobConf, Map<String, 
String> confItems) {
+    if (jobConf == null) {
+      LOG.warn("Job conf is null");
+      return;
+    }
+
+    if (confItems == null || confItems.isEmpty()) {
+      LOG.warn("Empty conf items");
+      return;
+    }
+
+    for (Map.Entry<String, String> kv : confItems.entrySet()) {
+      String mrConfKey = kv.getKey();
+      if (!mrConfKey.startsWith(RssTezConfig.TEZ_RSS_CONFIG_PREFIX)) {
+        mrConfKey = RssTezConfig.TEZ_RSS_CONFIG_PREFIX + mrConfKey;
+      }
+      String mrConfVal = kv.getValue();
+      if (StringUtils.isEmpty(jobConf.get(mrConfKey, ""))
+          || RssTezConfig.RSS_MANDATORY_CLUSTER_CONF.contains(mrConfKey)) {
+        LOG.warn("Use conf dynamic conf {} = {}", mrConfKey, mrConfVal);
+        jobConf.set(mrConfKey, mrConfVal);
+      }
+    }
+  }
+
+  public static int getInt(Configuration rssJobConf, Configuration mrJobCOnf, 
String key, int defaultValue) {
+    return rssJobConf.getInt(key,  mrJobCOnf.getInt(key, defaultValue));
+  }
+
+  public static long getLong(Configuration rssJobConf, Configuration 
mrJobConf, String key, long defaultValue) {
+    return rssJobConf.getLong(key, mrJobConf.getLong(key, defaultValue));
+  }
+
+  public static boolean getBoolean(Configuration rssJobConf, Configuration 
mrJobConf, String key,
+                                   boolean defaultValue) {
+    return rssJobConf.getBoolean(key, mrJobConf.getBoolean(key, defaultValue));
+  }
+
+  public static double getDouble(Configuration rssJobConf, Configuration 
mrJobConf, String key, double defaultValue) {
+    return rssJobConf.getDouble(key, mrJobConf.getDouble(key, defaultValue));
+  }
+
+  public static String getString(Configuration rssJobConf, Configuration 
mrJobConf, String key) {
+    return rssJobConf.get(key, mrJobConf.get(key));
+  }
+
+  public static String getString(Configuration rssJobConf, Configuration 
mrJobConf, String key, String defaultValue) {
+    return rssJobConf.get(key, mrJobConf.get(key, defaultValue));
+  }
+
+  public static long getBlockId(long partitionId, long taskAttemptId, int 
nextSeqNo) {
+    long attemptId = taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + 
Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
+    if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
+      throw new RuntimeException("Can't support attemptId [" + attemptId
+          + "], the max value should be " + MAX_ATTEMPT_ID);
+    }
+    long  atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
+    if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+      throw new RuntimeException("Can't support sequence [" + atomicInt
+          + "], the max value should be " + Constants.MAX_SEQUENCE_NO);
+    }
+    if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
+      throw new RuntimeException("Can't support partitionId["
+          + partitionId + "], the max value should be " + 
Constants.MAX_PARTITION_ID);
+    }
+    long taskId = taskAttemptId - (attemptId
+        << (Constants.PARTITION_ID_MAX_LENGTH + 
Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+    if (taskId < 0 ||  taskId > Constants.MAX_TASK_ATTEMPT_ID) {
+      throw new RuntimeException("Can't support taskId["
+          + taskId + "], the max value should be " + 
Constants.MAX_TASK_ATTEMPT_ID);
+    }
+    return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + 
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+        + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) + taskId;
+  }
+
+  public static long getTaskAttemptId(long blockId) {
+    long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
+    long attemptId = (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + 
Constants.PARTITION_ID_MAX_LENGTH))
+        & MAX_ATTEMPT_ID;
+    return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + 
Constants.PARTITION_ID_MAX_LENGTH)) + mapId;
+  }
+
+  public static int estimateTaskConcurrency(Configuration jobConf, int mapNum, 
int reduceNum) {
+    double dynamicFactor = 
jobConf.getDouble(RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR,
+        
RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE);
+    double slowStart = jobConf.getDouble(Constants.MR_SLOW_START, 
Constants.MR_SLOW_START_DEFAULT_VALUE);
+    int mapLimit = jobConf.getInt(Constants.MR_MAP_LIMIT, 
Constants.MR_MAP_LIMIT_DEFAULT_VALUE);
+    int reduceLimit = jobConf.getInt(Constants.MR_REDUCE_LIMIT, 
Constants.MR_REDUCE_LIMIT_DEFAULT_VALUE);
+
+    int estimateMapNum = mapLimit > 0 ? Math.min(mapNum, mapLimit) : mapNum;
+    int estimateReduceNum = reduceLimit > 0 ? Math.min(reduceNum, reduceLimit) 
: reduceNum;
+    if (slowStart == 1) {
+      return (int) (Math.max(estimateMapNum, estimateReduceNum) * 
dynamicFactor);
+    } else {
+      return (int) (((1 - slowStart) * estimateMapNum + estimateReduceNum) * 
dynamicFactor);
+    }
+  }
+
+  public static int getRequiredShuffleServerNumber(Configuration jobConf, int 
mapNum, int reduceNum) {
+    int requiredShuffleServerNumber = jobConf.getInt(
+        RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
+        RssTezConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
+    );
+    boolean enabledEstimateServer = jobConf.getBoolean(
+        RssTezConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED,
+        RssTezConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE
+    );
+    if (!enabledEstimateServer || requiredShuffleServerNumber > 0) {
+      return requiredShuffleServerNumber;
+    }
+    int taskConcurrency = estimateTaskConcurrency(jobConf, mapNum, reduceNum);
+    int taskConcurrencyPerServer = 
jobConf.getInt(RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER,
+        RssTezConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
+    return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
+  }
+
+  public static void validateRssClientConf(Configuration rssJobConf, 
Configuration mrJobConf) {
+    int retryMax = getInt(rssJobConf, mrJobConf, 
RssTezConfig.RSS_CLIENT_RETRY_MAX,
+        RssTezConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+    long retryIntervalMax = getLong(rssJobConf, mrJobConf, 
RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+        RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
+    long sendCheckTimeout = getLong(rssJobConf, mrJobConf, 
RssTezConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
+        RssTezConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
+    if (retryIntervalMax * retryMax > sendCheckTimeout) {
+      throw new IllegalArgumentException(String.format("%s(%s) * %s(%s) should 
not bigger than %s(%s)",
+          RssTezConfig.RSS_CLIENT_RETRY_MAX,
+          retryMax,
+          RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+          retryIntervalMax,
+          RssTezConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
+          sendCheckTimeout));
+    }
+  }
+
+  public static int computeShuffleId(int tezDagID, String upVertexName, String 
downVertexName) {

Review Comment:
   Ok, we added it and commit it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to