lifeSo commented on code in PR #948: URL: https://github.com/apache/incubator-uniffle/pull/948#discussion_r1230703024
########## client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java: ########## @@ -0,0 +1,1690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import javax.crypto.SecretKey; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.common.CallableWithNdc; +import org.apache.tez.common.IdUtils; +import org.apache.tez.common.InputContextUtils; +import org.apache.tez.common.RssTezConfig; +import org.apache.tez.common.RssTezUtils; +import org.apache.tez.common.TezIdHelper; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.common.UmbilicalUtils; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.apache.tez.runtime.library.common.shuffle.HostPort; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.client.api.ShuffleReadClient; +import org.apache.uniffle.client.api.ShuffleWriteClient; +import org.apache.uniffle.client.factory.ShuffleClientFactory; +import org.apache.uniffle.client.request.CreateShuffleReadClientRequest; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.UnitConverter; + +class RssShuffleScheduler extends ShuffleScheduler { + + public static class PathPartition { + + final String path; + final int partition; + + PathPartition(String path, int partition) { + this.path = path; + this.partition = partition; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((path == null) ? 0 : path.hashCode()); + result = prime * result + partition; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PathPartition other = (PathPartition) obj; + if (path == null) { + if (other.path != null) { + return false; + } + } else if (!path.equals(other.path)) { + return false; + } + if (partition != other.partition) { + return false; + } + return true; + } + + @Override + public String toString() { + return "PathPartition [path=" + path + ", partition=" + partition + "]"; + } + } + + @VisibleForTesting + enum ShuffleErrors { + IO_ERROR, + WRONG_LENGTH, + BAD_ID, + WRONG_MAP, + CONNECTION, + WRONG_REDUCE + } + + @VisibleForTesting + static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors"; + + private final AtomicLong shuffleStart = new AtomicLong(0); + + private static final Logger LOG = LoggerFactory.getLogger(RssShuffleScheduler.class); + private static final Logger LOG_FETCH = LoggerFactory.getLogger(LOG.getName() + ".fetch"); + private static final FetchStatsLogger fetchStatsLogger = new FetchStatsLogger(LOG_FETCH, LOG); + + static final long INITIAL_PENALTY = 2000L; // 2 seconds + private static final float PENALTY_GROWTH_RATE = 1.3f; + + private final BitSet finishedMaps; + private final int numInputs; + private int numFetchedSpills; + @VisibleForTesting + final Map<HostPortPartition, MapHost> mapLocations = new HashMap<>(); + //TODO Clean this and other maps at some point + @VisibleForTesting + final ConcurrentMap<PathPartition, InputAttemptIdentifier> pathToIdentifierMap + = new ConcurrentHashMap<>(); + + // To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is + // enabled in source. + @VisibleForTesting + final Map<Integer, ShuffleEventInfo> pipelinedShuffleInfoEventsMap; + + @VisibleForTesting + final Set<MapHost> pendingHosts = new HashSet<>(); + private final Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<>(); + + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final Random random = new Random(System.currentTimeMillis()); + private final DelayQueue<Penalty> penalties = new DelayQueue<>(); + private final Referee referee; + @VisibleForTesting + final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<>(); + final Set<HostPort> uniqueHosts = Sets.newHashSet(); + private final Map<HostPort,IntWritable> hostFailures = new HashMap<>(); + private final InputContext inputContext; + private final TezCounter shuffledInputsCounter; + private final TezCounter skippedInputCounter; + private final TezCounter reduceShuffleBytes; + private final TezCounter reduceBytesDecompressed; + @VisibleForTesting + final TezCounter failedShuffleCounter; + private final TezCounter bytesShuffledToDisk; + private final TezCounter bytesShuffledToDiskDirect; + private final TezCounter bytesShuffledToMem; + private final TezCounter firstEventReceived; + private final TezCounter lastEventReceived; + + private final String srcNameTrimmed; + @VisibleForTesting + final AtomicInteger remainingMaps; + private final long startTime; + @VisibleForTesting + long lastProgressTime; + @VisibleForTesting + long failedShufflesSinceLastCompletion; + + private final int numFetchers; + private final Set<RssTezShuffleDataFetcher> rssRunningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + private final ListeningExecutorService fetcherExecutor; + + private final HttpConnectionParams httpConnectionParams; + private final FetchedInputAllocatorOrderedGrouped allocator; + private final ExceptionReporter exceptionReporter; + private final MergeManager mergeManager; + private final JobTokenSecretManager jobTokenSecretManager; + private final boolean ifileReadAhead; + private final int ifileReadAheadLength; + private final CompressionCodec codec; + private final Configuration conf; + private final boolean localDiskFetchEnabled; + private final String localHostname; + private final int shufflePort; + private final String applicationId; + private final int dagId; + private final boolean asyncHttp; + private final boolean sslShuffle; + + private final TezCounter ioErrsCounter; + private final TezCounter wrongLengthErrsCounter; + private final TezCounter badIdErrsCounter; + private final TezCounter wrongMapErrsCounter; + private final TezCounter connectionErrsCounter; + private final TezCounter wrongReduceErrsCounter; + + private final int maxTaskOutputAtOnce; + private final int maxFetchFailuresBeforeReporting; + private final boolean reportReadErrorImmediately; + private final int maxFailedUniqueFetches; + private final int abortFailureLimit; + + private final int minFailurePerHost; + private final float hostFailureFraction; + private final float maxStallTimeFraction; + private final float minReqProgressFraction; + private final float maxAllowedFailedFetchFraction; + private final boolean checkFailedFetchSinceLastCompletion; + private final boolean verifyDiskChecksum; + private final boolean compositeFetch; + + private volatile Thread shuffleSchedulerThread = null; + + private long totalBytesShuffledTillNow = 0; + private final DecimalFormat mbpsFormat = new DecimalFormat("0.00"); + + + // For Rss + private Map<Integer, List<ShuffleServerInfo>> partitionToServers; + private final Map<Integer, MapHost> runningRssPartitionMap = new HashMap<>(); + + private final Set<Integer> successRssPartitionSet = Sets.newConcurrentHashSet(); + private final Set<Integer> allRssPartition = Sets.newConcurrentHashSet(); + + private final Map<Integer, Set<InputAttemptIdentifier>> partitionIdToSuccessMapTaskAttempts = new HashMap<>(); + private final String storageType; + + + private final int readBufferSize; + private final int partitionNumPerRange; + private String basePath; + private int indexReadLimit; + + RssShuffleScheduler(InputContext inputContext, + Configuration conf, + int numberOfInputs, + ExceptionReporter exceptionReporter, + MergeManager mergeManager, + FetchedInputAllocatorOrderedGrouped allocator, + long startTime, + CompressionCodec codec, + boolean ifileReadAhead, + int ifileReadAheadLength, + String srcNameTrimmed) throws IOException { + super(inputContext, conf, numberOfInputs, exceptionReporter, mergeManager, allocator, startTime, codec, + ifileReadAhead, ifileReadAheadLength, srcNameTrimmed); + this.inputContext = inputContext; + this.conf = conf; + this.exceptionReporter = exceptionReporter; + this.allocator = allocator; + this.mergeManager = mergeManager; + this.numInputs = numberOfInputs; + int abortFailureLimitConf = conf.getInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT); + if (abortFailureLimitConf <= -1) { + abortFailureLimit = Math.max(15, numberOfInputs / 10); + } else { + //No upper cap, as user is setting this intentionally + abortFailureLimit = abortFailureLimitConf; + } + remainingMaps = new AtomicInteger(numberOfInputs); // total up-stream task + + finishedMaps = new BitSet(numberOfInputs); + this.ifileReadAhead = ifileReadAhead; + this.ifileReadAheadLength = ifileReadAheadLength; + this.srcNameTrimmed = srcNameTrimmed; + this.codec = codec; + int configuredNumFetchers = + conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT); + numFetchers = Math.min(configuredNumFetchers, numInputs); + + localDiskFetchEnabled = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, + TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT); + + this.minFailurePerHost = conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT); + Preconditions.checkArgument(minFailurePerHost >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST + + "=" + minFailurePerHost + " should not be negative"); + + this.hostFailureFraction = conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT); + + this.maxStallTimeFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT); + Preconditions.checkArgument(maxStallTimeFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION + + "=" + maxStallTimeFraction + " should not be negative"); + + this.minReqProgressFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT); + Preconditions.checkArgument(minReqProgressFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION + + "=" + minReqProgressFraction + " should not be negative"); + + this.maxAllowedFailedFetchFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT); + Preconditions.checkArgument(maxAllowedFailedFetchFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION + + "=" + maxAllowedFailedFetchFraction + " should not be negative"); + + this.checkFailedFetchSinceLastCompletion = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT); + + this.applicationId = IdUtils.getApplicationAttemptId().toString(); + this.dagId = inputContext.getDagIdentifier(); + this.localHostname = inputContext.getExecutionContext().getHostName(); + String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + final ByteBuffer shuffleMetadata = + inputContext.getServiceProviderMetaData(auxiliaryService); + this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); + + this.referee = new Referee(); + // Counters used by the ShuffleScheduler + this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS); + this.reduceShuffleBytes = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES); + this.reduceBytesDecompressed = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED); + this.failedShuffleCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS); + this.bytesShuffledToDisk = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK); + this.bytesShuffledToDiskDirect = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT); + this.bytesShuffledToMem = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM); + + // Counters used by Fetchers + ioErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString()); + wrongLengthErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, + ShuffleErrors.WRONG_LENGTH.toString()); + badIdErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.BAD_ID.toString()); + wrongMapErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, + ShuffleErrors.WRONG_MAP.toString()); + connectionErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, + ShuffleErrors.CONNECTION.toString()); + wrongReduceErrsCounter = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, + ShuffleErrors.WRONG_REDUCE.toString()); + + this.startTime = startTime; + this.lastProgressTime = startTime; + + this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); + this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false); + this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); + SecretKey jobTokenSecret = ShuffleUtils + .getJobTokenSecretFromTokenBytes(inputContext + .getServiceConsumerMetaData(auxiliaryService)); + this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret); + + final ExecutorService fetcherRawExecutor; + if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) { + fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers, + "Fetcher_O {" + srcNameTrimmed + "} #%d"); + } else { + fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build()); + } + this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); + + this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5); + referee.start(); + this.maxFetchFailuresBeforeReporting = + conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT); + this.reportReadErrorImmediately = + conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT); + this.verifyDiskChecksum = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT); + + /** + * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would + * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. + */ + this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT))); + + this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS); + this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); + this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); + this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); + + pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap(); + + this.storageType = conf.get(RssTezConfig.RSS_STORAGE_TYPE, RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE); + String readBufferSize = conf.get(RssTezConfig.RSS_CLIENT_READ_BUFFER_SIZE, + RssTezConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE); + this.readBufferSize = (int) UnitConverter.byteStringAsBytes(readBufferSize); + this.partitionNumPerRange = conf.getInt(RssTezConfig.RSS_PARTITION_NUM_PER_RANGE, + RssTezConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE); + + LOG.info("RSSShuffleScheduler running for sourceVertex: " + + inputContext.getSourceVertexName() + " with configuration: " + + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting + + ", reportReadErrorImmediately=" + reportReadErrorImmediately + + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches + + ", abortFailureLimit=" + abortFailureLimit + + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce + + ", numFetchers=" + numFetchers + + ", hostFailureFraction=" + hostFailureFraction + + ", minFailurePerHost=" + minFailurePerHost + + ", maxAllowedFailedFetchFraction=" + maxAllowedFailedFetchFraction + + ", maxStallTimeFraction=" + maxStallTimeFraction + + ", minReqProgressFraction=" + minReqProgressFraction + + ", checkFailedFetchSinceLastCompletion=" + checkFailedFetchSinceLastCompletion + + ", storyType=" + storageType + ", readBufferSize=" + this.readBufferSize + + ", partitionNumPerRange=" + partitionNumPerRange); + } + + @Override + public void start() throws Exception { + int shuffleId = InputContextUtils.computeShuffleId(this.inputContext); + TezTaskAttemptID tezTaskAttemptID = InputContextUtils.getTezTaskAttemptID(this.inputContext); + this.partitionToServers = UmbilicalUtils.requestShuffleServer( + inputContext.getApplicationId(), conf, tezTaskAttemptID, shuffleId); + + shuffleSchedulerThread = Thread.currentThread(); + RssShuffleSchedulerCallable rssShuffleSchedulerCallable = new RssShuffleSchedulerCallable(); + rssShuffleSchedulerCallable.call(); + } + + @Override + @SuppressWarnings("NN_NAKED_NOTIFY") Review Comment: 这行代码不生成:”EQ_COMPARETO_USE_OBJECT_EQUALS“ 异常,只生成” NN_NAKED_NOTIFY“异常。 所以我认为加上”NN_NAKED_NOTIFY“就行了。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
