zhengchenyu commented on code in PR #890: URL: https://github.com/apache/incubator-uniffle/pull/890#discussion_r1198513685
########## client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java: ########## @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.client.api.ShuffleWriteClient; +import org.apache.uniffle.client.factory.ShuffleClientFactory; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.Constants; + +public class RssTezUtils { + + private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class); + + private static final int MAX_ATTEMPT_LENGTH = 6; + private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1; + + public static final String HOST_NAME = "hostname"; + + public static final String PLUS_DELIMITER = "+"; + public static final String UNDERLINE_DELIMITER = "_"; + public static final String COLON_DELIMITER = ":"; + public static final String COMMA_DELIMITER = ","; + + // constant to compute shuffle id + private static final int VERTEX_ID_MAPPING_MAX_ID = 500; + private static final String VERTEX_ID_MAPPING_MAP = "Map"; + private static final String VERTEX_ID_MAPPING_REDUCER = "Reducer"; + private static final int VERTEX_ID_MAPPING_MAGIC = 600; + private static final int SHUFFLE_ID_MAGIC = 1000; + + + + private RssTezUtils() { + } + + public static ShuffleWriteClient createShuffleClient(Configuration conf) { + int heartBeatThreadNum = conf.getInt(RssTezConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM, + RssTezConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE); + int retryMax = conf.getInt(RssTezConfig.RSS_CLIENT_RETRY_MAX, + RssTezConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE); + long retryIntervalMax = conf.getLong(RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, + RssTezConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE); + String clientType = conf.get(RssTezConfig.RSS_CLIENT_TYPE, + RssTezConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE); + int replicaWrite = conf.getInt(RssTezConfig.RSS_DATA_REPLICA_WRITE, + RssTezConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE); + int replicaRead = conf.getInt(RssTezConfig.RSS_DATA_REPLICA_READ, + RssTezConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); + int replica = conf.getInt(RssTezConfig.RSS_DATA_REPLICA, + RssTezConfig.RSS_DATA_REPLICA_DEFAULT_VALUE); + boolean replicaSkipEnabled = conf.getBoolean(RssTezConfig.RSS_DATA_REPLICA_SKIP_ENABLED, + RssTezConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE); + int dataTransferPoolSize = conf.getInt(RssTezConfig.RSS_DATA_TRANSFER_POOL_SIZE, + RssTezConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE); + int dataCommitPoolSize = conf.getInt(RssTezConfig.RSS_DATA_COMMIT_POOL_SIZE, + RssTezConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE); + ShuffleWriteClient client = ShuffleClientFactory + .getInstance() + .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, + heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled, + dataTransferPoolSize, dataCommitPoolSize); + return client; + } + + public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) { + long initialMemRequestMb = conf.getLong(RssTezConfig.RSS_RUNTIME_IO_SORT_MB, + RssTezConfig.RSS_DEFAULT_RUNTIME_IO_SORT_MB); + LOG.info("InitialMemRequestMb is {}", initialMemRequestMb); + LOG.info("MaxAvailableTaskMemory is {}", maxAvailableTaskMemory); + long reqBytes = initialMemRequestMb << 20; + Preconditions.checkArgument(initialMemRequestMb > 0 && reqBytes < maxAvailableTaskMemory, + RssTezConfig.RSS_RUNTIME_IO_SORT_MB + initialMemRequestMb + + " should be " + "larger than 0 and should be less than the available task memory (MB):" + + (maxAvailableTaskMemory >> 20)); + LOG.info("Requested BufferSize (" + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB + + ") : " + initialMemRequestMb); + return reqBytes; + } + + public static String uniformPartitionHostInfo(Map<Integer, List<ShuffleServerInfo>> map) { + List<String> res = new ArrayList<>(); + String tmp; + Set<Integer> pidSet = map.keySet(); + for (Integer pid : pidSet) { + for (ShuffleServerInfo shuffleServerInfo : map.get(pid)) { + tmp = pid + UNDERLINE_DELIMITER + shuffleServerInfo.getHost() + COLON_DELIMITER + shuffleServerInfo.getNettyPort(); + res.add(tmp); + } + } + return org.apache.commons.lang.StringUtils.join(res, COMMA_DELIMITER); + } + + public static Map<String, List<String>> uniformServerToPartitions(String partitionToServers) { + Map<String, List<String>> serverToPartitions = new HashMap<>(); + List<String> list; + + String[] pidWithWorkerInfos = partitionToServers.split(COMMA_DELIMITER); + for (String pidWithWorkerInfo : pidWithWorkerInfos) { + String[] pidUnderLineWorkerInfo = pidWithWorkerInfo.split(UNDERLINE_DELIMITER); + if (serverToPartitions.containsKey(pidUnderLineWorkerInfo[1])) { + list = serverToPartitions.get(pidUnderLineWorkerInfo[1]); + list.add(pidUnderLineWorkerInfo[0]); + } else { + list = new ArrayList<>(); + list.add(pidUnderLineWorkerInfo[0]); + serverToPartitions.put(pidUnderLineWorkerInfo[1], list); + } + } + + return serverToPartitions; + } + + public static String uniformServerToPartitions(Map<String, List<String>> map) { + List<String> res = new ArrayList<>(); + Set<String> keySet = map.keySet(); + for (String s : keySet) { + String join = org.apache.commons.lang.StringUtils.join(map.get(s), UNDERLINE_DELIMITER); + res.add(s + PLUS_DELIMITER + join); + } + + return org.apache.commons.lang.StringUtils.join(res,COMMA_DELIMITER); + } + + public static ApplicationAttemptId getApplicationAttemptId() { + String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()); + ContainerId containerId = ContainerId.fromString(containerIdStr); + return containerId.getApplicationAttemptId(); + } + + public static String uniqueIdentifierToAttemptId(String uniqueIdentifier) { + if (uniqueIdentifier == null) { + throw new RuntimeException("uniqueIdentifier should not be null"); + } + String[] ids = uniqueIdentifier.split("_"); + return StringUtils.join(ids, "_", 0, 7); + } + + public static int getInt(Configuration rssJobConf, Configuration mrJobCOnf, String key, int defaultValue) { Review Comment: 1. name fix rssJobConf -> rssConf, mrJobCOnf -> tezConf 2. In client-mr mode, we use rss_conf.xml to transmit rss config. In client-tez mode, I think rss_conf.xml is unnecessary, so we can retain only one config in parameters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
