This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 55d64a9 [Style] Check indentation (#56)
55d64a9 is described below
commit 55d64a99d17b7199a7619d2da4238113c3fa2d3f
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Jul 15 14:39:03 2022 +0800
[Style] Check indentation (#56)
### What changes were proposed in this pull request?
1. Set indentation rules in checkstyle
2. Fix violations.
### Why are the changes needed?
#54
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
https://github.com/kaijchen/incubator-uniffle/runs/7351765430?check_suite_focus=true
---
checkstyle.xml | 18 +---
.../hadoop/mapred/SortWriteBufferManager.java | 2 +-
.../org/apache/hadoop/mapreduce/RssMRUtils.java | 8 +-
.../mapreduce/task/reduce/RssEventFetcher.java | 20 ++---
.../hadoop/mapreduce/task/reduce/RssFetcher.java | 26 +++---
.../hadoop/mapreduce/task/reduce/RssShuffle.java | 52 ++++++------
.../hadoop/mapreduce/v2/app/RssMRAppMaster.java | 2 +-
.../apache/spark/shuffle/RssShuffleManager.java | 4 +-
.../spark/shuffle/writer/RssShuffleWriter.java | 32 ++++----
.../spark/shuffle/DelegationRssShuffleManager.java | 76 ++++++++---------
.../apache/spark/shuffle/RssShuffleManager.java | 82 +++++++++---------
.../client/impl/ShuffleWriteClientImpl.java | 40 +++++----
.../apache/uniffle/common/config/ConfigUtils.java | 18 ++--
.../apache/uniffle/common/util/UnitConverter.java | 54 ++++++------
.../PartitionBalanceAssignmentStrategy.java | 96 +++++++++++-----------
.../client/impl/grpc/ShuffleServerGrpcClient.java | 4 +-
.../apache/uniffle/server/LocalStorageChecker.java | 20 ++---
.../server/buffer/ShuffleBufferManager.java | 4 +-
.../uniffle/storage/common/LocalStorage.java | 12 +--
.../uniffle/storage/common/LocalStorageMeta.java | 12 +--
.../handler/impl/ComposedClientReadHandler.java | 22 ++---
.../handler/impl/DataSkippableReadHandler.java | 12 +--
.../handler/impl/HdfsClientReadHandler.java | 4 +-
.../storage/handler/impl/HdfsFileWriter.java | 12 +--
.../handler/impl/HdfsShuffleReadHandler.java | 4 +-
.../impl/LocalFileQuorumClientReadHandler.java | 38 ++++-----
.../handler/impl/MemoryClientReadHandler.java | 2 +-
.../impl/MemoryQuorumClientReadHandler.java | 4 +-
28 files changed, 332 insertions(+), 348 deletions(-)
diff --git a/checkstyle.xml b/checkstyle.xml
index b02acfa..c94bf90 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -249,26 +249,12 @@
value="GenericWhitespace ''{0}'' is not preceded with
whitespace."/>
</module>
<module name="Indentation">
- <!--
- <property name="basicOffset" value="4"/>
+ <property name="basicOffset" value="2"/>
<property name="braceAdjustment" value="0"/>
- <property name="caseIndent" value="4"/>
+ <property name="caseIndent" value="2"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="4"/>
<property name="arrayInitIndent" value="4"/>
- -->
- <!-- new add 由于checkstyle 未修复部分缩进场景有问题选择先屏蔽规则
- https://github.com/checkstyle/checkstyle/issues/3342
- -->
-
- <property name="severity" value="ignore"/>
-
- <property name="basicOffset" value="4"/>
- <property name="braceAdjustment" value="0"/>
- <property name="caseIndent" value="4"/>
- <property name="throwsIndent" value="8"/>
- <property name="lineWrappingIndentation" value="8"/>
- <property name="arrayInitIndent" value="4"/>
</module>
<module name="AbbreviationAsWordInName">
<!-- new add -->
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 0382538..4995cc1 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -176,7 +176,7 @@ public class SortWriteBufferManager<K, V> {
}
}
if (memoryUsedSize.get() > maxMemSize * memoryThreshold
- && inSendListBytes.get() <= maxMemSize * sendThreshold) {
+ && inSendListBytes.get() <= maxMemSize * sendThreshold) {
sendBuffersToServers();
}
mapOutputRecordCounter.increment(1);
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 97bd716..f2e7984 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -95,13 +95,13 @@ public class RssMRUtils {
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum, replica, replicaWrite, replicaRead,
replicaSkipEnabled,
- dataTransferPoolSize);
+ dataTransferPoolSize);
return client;
}
public static Set<ShuffleServerInfo> getAssignedServers(JobConf jobConf, int
reduceID) {
String servers = jobConf.get(RssMRConfig.RSS_ASSIGNMENT_PREFIX
- + String.valueOf(reduceID));
+ + String.valueOf(reduceID));
String[] splitServers = servers.split(",");
Set<ShuffleServerInfo> assignServers = Sets.newHashSet();
for (String splitServer : splitServers) {
@@ -110,7 +110,7 @@ public class RssMRUtils {
throw new RssException("partition " + reduceID + " server info isn't
right");
}
ShuffleServerInfo sever = new
ShuffleServerInfo(StringUtils.join(serverInfo, "-"),
- serverInfo[0], Integer.parseInt(serverInfo[1]));
+ serverInfo[0], Integer.parseInt(serverInfo[1]));
assignServers.add(sever);
}
return assignServers;
@@ -118,7 +118,7 @@ public class RssMRUtils {
public static ApplicationAttemptId getApplicationAttemptId() {
String containerIdStr =
- System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
ContainerId containerId = ContainerId.fromString(containerIdStr);
return containerId.getApplicationAttemptId();
}
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
index d30a192..1619678 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
@@ -67,8 +67,8 @@ public class RssEventFetcher<K,V> {
acceptMapCompletionEvents();
} catch (Exception e) {
throw new RssException("Reduce: " + reduce
- + " fails to accept completion events due to: "
- + e.getMessage());
+ + " fails to accept completion events due to: "
+ + e.getMessage());
}
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -117,13 +117,13 @@ public class RssEventFetcher<K,V> {
case OBSOLETE:
obsoleteMaps.add(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of "
- + event.getTaskStatus() + " map-task: '" + event.getTaskAttemptId()
+ "'");
+ + event.getTaskStatus() + " map-task: '" +
event.getTaskAttemptId() + "'");
break;
case TIPFAILED:
tipFailedCount++;
LOG.info("Ignoring output of failed map TIP: '"
- + event.getTaskAttemptId() + "'");
+ + event.getTaskAttemptId() + "'");
break;
default:
@@ -138,14 +138,14 @@ public class RssEventFetcher<K,V> {
do {
MapTaskCompletionEventsUpdate update =
- umbilical.getMapCompletionEvents(
- (org.apache.hadoop.mapred.JobID) reduce.getJobID(),
- fromEventIdx,
- maxEventsToFetch,
- (org.apache.hadoop.mapred.TaskAttemptID) reduce);
+ umbilical.getMapCompletionEvents(
+ (org.apache.hadoop.mapred.JobID) reduce.getJobID(),
+ fromEventIdx,
+ maxEventsToFetch,
+ (org.apache.hadoop.mapred.TaskAttemptID) reduce);
events = update.getMapTaskCompletionEvents();
LOG.debug("Got " + events.length + " map completion events from "
- + fromEventIdx);
+ + fromEventIdx);
assert !update.shouldReset() : "Unexpected legacy state";
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
index ba9a1c8..128bfb9 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
@@ -100,17 +100,17 @@ public class RssFetcher<K,V> {
this.metrics = metrics;
this.reduceId = reduceId;
ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
- RssFetcher.ShuffleErrors.IO_ERROR.toString());
+ RssFetcher.ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
- RssFetcher.ShuffleErrors.WRONG_LENGTH.toString());
+ RssFetcher.ShuffleErrors.WRONG_LENGTH.toString());
badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
- RssFetcher.ShuffleErrors.BAD_ID.toString());
+ RssFetcher.ShuffleErrors.BAD_ID.toString());
wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
- RssFetcher.ShuffleErrors.WRONG_MAP.toString());
+ RssFetcher.ShuffleErrors.WRONG_MAP.toString());
connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
- RssFetcher.ShuffleErrors.CONNECTION.toString());
+ RssFetcher.ShuffleErrors.CONNECTION.toString());
wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
- RssFetcher.ShuffleErrors.WRONG_REDUCE.toString());
+ RssFetcher.ShuffleErrors.WRONG_REDUCE.toString());
this.shuffleReadClient = shuffleReadClient;
this.totalBlockCount = totalBlockCount;
@@ -151,7 +151,7 @@ public class RssFetcher<K,V> {
if (!hasPendingData && compressedData != null) {
final long startDecompress = System.currentTimeMillis();
uncompressedData = RssShuffleUtils.decompressData(
- compressedData, compressedBlock.getUncompressLength(), false).array();
+ compressedData, compressedBlock.getUncompressLength(),
false).array();
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
@@ -187,9 +187,9 @@ public class RssFetcher<K,V> {
shuffleReadClient.logStatics();
metrics.inputBytes(unCompressionLength);
LOG.info("reduce task " + reduceId.toString() + " cost " + readTime + "
ms to fetch and "
- + decompressTime + " ms to decompress with unCompressionLength["
- + unCompressionLength + "] and " + serializeTime + " ms to serialize
and "
- + waitTime + " ms to wait resource");
+ + decompressTime + " ms to decompress with unCompressionLength["
+ + unCompressionLength + "] and " + serializeTime + " ms to serialize
and "
+ + waitTime + " ms to wait resource");
stopFetch();
}
}
@@ -226,13 +226,13 @@ public class RssFetcher<K,V> {
mapOutput.commit();
if (mapOutput instanceof OnDiskMapOutput) {
LOG.info("Reduce: " + reduceId + " allocates disk to accept block "
- + " with byte sizes: " + uncompressedData.length);
+ + " with byte sizes: " + uncompressedData.length);
}
} catch (Throwable t) {
ioErrs.increment(1);
mapOutput.abort();
throw new RssException("Reduce: " + reduceId + " cannot write block to "
- + mapOutput.getClass().getSimpleName() + " due to: " +
t.getClass().getName());
+ + mapOutput.getClass().getSimpleName() + " due to: " +
t.getClass().getName());
}
return true;
}
@@ -258,7 +258,7 @@ public class RssFetcher<K,V> {
double transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
progress.setStatus("copy(" + copyBlockCount + " of " + totalBlockCount + "
at "
- + mbpsFormat.format(transferRate) + " MB/s)");
+ + mbpsFormat.format(transferRate) + " MB/s)");
}
@VisibleForTesting
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 618f80b..8f3efd3 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -105,34 +105,34 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
this.appAttemptId = RssMRUtils.getApplicationAttemptId().getAttemptId();
this.storageType = RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_STORAGE_TYPE);
this.replicaWrite = RssMRUtils.getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_DATA_REPLICA_WRITE,
- RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
+ RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
this.replicaRead = RssMRUtils.getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_DATA_REPLICA_READ,
- RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+ RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
this.replica = RssMRUtils.getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_DATA_REPLICA,
- RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
+ RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
this.partitionNum = mrJobConf.getNumReduceTasks();
this.partitionNumPerRange = RssMRUtils.getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_PARTITION_NUM_PER_RANGE,
- RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
+ RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
this.basePath = RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_REMOTE_STORAGE_PATH);
this.indexReadLimit = RssMRUtils.getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_INDEX_READ_LIMIT,
- RssMRConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE);
+ RssMRConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE);
this.readBufferSize = (int)UnitConverter.byteStringAsBytes(
- RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE,
- RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE));
+ RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE,
+ RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE));
String remoteStorageConf = RssMRUtils.getString(rssJobConf, mrJobConf,
RssMRConfig.RSS_REMOTE_STORAGE_CONF, "");
this.remoteStorageInfo = new RemoteStorageInfo(basePath,
remoteStorageConf);
- }
+ }
protected MergeManager<K, V> createMergeManager(
- ShuffleConsumerPlugin.Context context) {
+ ShuffleConsumerPlugin.Context context) {
return new MergeManagerImpl<K, V>(reduceId, mrJobConf,
context.getLocalFS(),
- context.getLocalDirAllocator(), reporter, context.getCodec(),
- context.getCombinerClass(), context.getCombineCollector(),
- context.getSpilledRecordsCounter(),
- context.getReduceCombineInputCounter(),
- context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
- context.getMapOutputFile());
+ context.getLocalDirAllocator(), reporter, context.getCodec(),
+ context.getCombinerClass(), context.getCombineCollector(),
+ context.getSpilledRecordsCounter(),
+ context.getReduceCombineInputCounter(),
+ context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
+ context.getMapOutputFile());
}
@Override
@@ -149,21 +149,21 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
// just get blockIds from RSS servers
ShuffleWriteClient writeClient = RssMRUtils.createShuffleClient(mrJobConf);
Roaring64NavigableMap blockIdBitmap = writeClient.getShuffleResult(
- clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId());
+ clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId());
writeClient.close();
// get map-completion events to generate RSS taskIDs
final RssEventFetcher<K,V> eventFetcher =
- new RssEventFetcher<K,V>(appAttemptId, reduceId, umbilical, mrJobConf,
MAX_EVENTS_TO_FETCH);
+ new RssEventFetcher<K,V>(appAttemptId, reduceId, umbilical, mrJobConf,
MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap taskIdBitmap = eventFetcher.fetchAllRssTaskIds();
LOG.info("In reduce: " + reduceId
- + ", RSS MR client has fetched blockIds and taskIds successfully");
+ + ", RSS MR client has fetched blockIds and taskIds successfully");
// start fetcher to fetch blocks from RSS servers
if (!taskIdBitmap.isEmpty()) {
LOG.info("In reduce: " + reduceId
- + ", Rss MR client starts to fetch blocks from RSS server");
+ + ", Rss MR client starts to fetch blocks from RSS server");
JobConf readerJobConf = new JobConf((mrJobConf));
if (!remoteStorageInfo.isEmpty()) {
for (Map.Entry<String, String> entry :
remoteStorageInfo.getConfItems().entrySet()) {
@@ -171,15 +171,15 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
}
}
CreateShuffleReadClientRequest request = new
CreateShuffleReadClientRequest(
- appId, 0, reduceId.getTaskID().getId(), storageType, basePath,
indexReadLimit, readBufferSize,
- partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
serverInfoList,
- readerJobConf, new MRIdHelper());
+ appId, 0, reduceId.getTaskID().getId(), storageType, basePath,
indexReadLimit, readBufferSize,
+ partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
serverInfoList,
+ readerJobConf, new MRIdHelper());
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus,
merger, copyPhase, reporter, metrics,
- shuffleReadClient, blockIdBitmap.getLongCardinality());
+ shuffleReadClient, blockIdBitmap.getLongCardinality());
fetcher.fetchAllRssBlocks();
LOG.info("In reduce: " + reduceId
- + ", Rss MR client fetches blocks from RSS server successfully");
+ + ", Rss MR client fetches blocks from RSS server successfully");
}
copyPhase.complete();
@@ -198,12 +198,12 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
synchronized (this) {
if (throwable != null) {
throw new Shuffle.ShuffleError("error in shuffle in " +
throwingThreadName,
- throwable);
+ throwable);
}
}
LOG.info("In reduce: " + reduceId
- + ", Rss MR client returns sorted data to reduce successfully");
+ + ", Rss MR client returns sorted data to reduce successfully");
return kvIter;
}
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index db8d7b9..e15351b 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -194,7 +194,7 @@ public class RssMRAppMaster extends MRAppMaster {
RemoteStorageInfo defaultRemoteStorage =
new RemoteStorageInfo(conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH,
""));
RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(
- appId, defaultRemoteStorage, dynamicConfEnabled, storageType, client);
+ appId, defaultRemoteStorage, dynamicConfEnabled, storageType,
client);
// set the remote storage with actual value
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH,
remoteStorage.getPath());
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF,
remoteStorage.getConfString());
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c5da1ac..705d66b 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -147,7 +147,7 @@ public class RssShuffleManager implements ShuffleManager {
this.dataReplicaRead =
sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ,
RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
this.dataTransferPoolSize =
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
- RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+ RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
this.dataReplicaSkipEnabled =
sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
LOG.info("Check quorum config ["
@@ -171,7 +171,7 @@ public class RssShuffleManager implements ShuffleManager {
shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
+ dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index be347a3..1901258 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -287,23 +287,23 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
public Option<MapStatus> stop(boolean success) {
try {
if (success) {
- // fill partitionLengths with non zero dummy value so map output
tracker could work correctly
- long[] partitionLengths = new long[partitioner.numPartitions()];
- Arrays.fill(partitionLengths, 1);
- final BlockManagerId blockManagerId =
- createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId);
+ // fill partitionLengths with non zero dummy value so map output
tracker could work correctly
+ long[] partitionLengths = new long[partitioner.numPartitions()];
+ Arrays.fill(partitionLengths, 1);
+ final BlockManagerId blockManagerId =
+ createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId);
- Map<Integer, List<Long>> ptb = Maps.newHashMap();
- for (Map.Entry<Integer, Set<Long>> entry :
partitionToBlockIds.entrySet()) {
- ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue()));
- }
- long start = System.currentTimeMillis();
- shuffleWriteClient.reportShuffleResult(partitionToServers, appId,
shuffleId,
- taskAttemptId, ptb, bitmapSplitNum);
- LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost
{} ms",
- taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() -
start));
- MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId,
partitionLengths);
- return Option.apply(mapStatus);
+ Map<Integer, List<Long>> ptb = Maps.newHashMap();
+ for (Map.Entry<Integer, Set<Long>> entry :
partitionToBlockIds.entrySet()) {
+ ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue()));
+ }
+ long start = System.currentTimeMillis();
+ shuffleWriteClient.reportShuffleResult(partitionToServers, appId,
shuffleId,
+ taskAttemptId, ptb, bitmapSplitNum);
+ LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost
{} ms",
+ taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() -
start));
+ MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId,
partitionLengths);
+ return Option.apply(mapStatus);
} else {
return Option.empty();
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 6b8a77e..6ec8901 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -172,7 +172,7 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
TaskContext context,
ShuffleReadMetricsReporter metrics) {
return delegate.getReader(handle,
- startPartition, endPartition, context, metrics);
+ startPartition, endPartition, context, metrics);
}
// The interface is only used for compatibility with spark 3.1.2
@@ -187,21 +187,21 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
ShuffleReader<K, C> reader = null;
try {
reader = (ShuffleReader<K, C>)delegate.getClass().getDeclaredMethod(
- "getReader",
- ShuffleHandle.class,
- int.class,
- int.class,
- int.class,
- int.class,
- TaskContext.class,
- ShuffleReadMetricsReporter.class).invoke(
- handle,
- startMapIndex,
- endMapIndex,
- startPartition,
- endPartition,
- context,
- metrics);
+ "getReader",
+ ShuffleHandle.class,
+ int.class,
+ int.class,
+ int.class,
+ int.class,
+ TaskContext.class,
+ ShuffleReadMetricsReporter.class).invoke(
+ handle,
+ startMapIndex,
+ endMapIndex,
+ startPartition,
+ endPartition,
+ context,
+ metrics);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -210,31 +210,31 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
// The interface is only used for compatibility with spark 3.0.1
public <K, C> ShuffleReader<K, C> getReaderForRange(
- ShuffleHandle handle,
- int startMapIndex,
- int endMapIndex,
- int startPartition,
- int endPartition,
- TaskContext context,
- ShuffleReadMetricsReporter metrics) {
+ ShuffleHandle handle,
+ int startMapIndex,
+ int endMapIndex,
+ int startPartition,
+ int endPartition,
+ TaskContext context,
+ ShuffleReadMetricsReporter metrics) {
ShuffleReader<K, C> reader = null;
try {
reader = (ShuffleReader<K, C>)delegate.getClass().getDeclaredMethod(
- "getReaderForRange",
- ShuffleHandle.class,
- int.class,
- int.class,
- int.class,
- int.class,
- TaskContext.class,
- ShuffleReadMetricsReporter.class).invoke(
- handle,
- startMapIndex,
- endMapIndex,
- startPartition,
- endPartition,
- context,
- metrics);
+ "getReaderForRange",
+ ShuffleHandle.class,
+ int.class,
+ int.class,
+ int.class,
+ int.class,
+ TaskContext.class,
+ ShuffleReadMetricsReporter.class).invoke(
+ handle,
+ startMapIndex,
+ endMapIndex,
+ startPartition,
+ endPartition,
+ context,
+ metrics);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 032767a..b71f1b3 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -168,12 +168,12 @@ public class RssShuffleManager implements ShuffleManager {
RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
this.dataTransferPoolSize =
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
- RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+ RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
+ dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
registerCoordinator();
// fetch client conf and apply them if necessary and disable ESS
if (isDriver && dynamicConfEnabled) {
@@ -221,28 +221,28 @@ public class RssShuffleManager implements ShuffleManager {
this.dataReplica = sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA,
RssSparkConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
this.dataReplicaWrite =
sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_WRITE,
- RssSparkConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
+ RssSparkConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
this.dataReplicaRead =
sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ,
- RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+ RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
this.dataReplicaSkipEnabled =
sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
- RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
+ RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
LOG.info("Check quorum config ["
- + dataReplica + ":" + dataReplicaWrite + ":" + dataReplicaRead + ":" +
dataReplicaSkipEnabled + "]");
+ + dataReplica + ":" + dataReplicaWrite + ":" + dataReplicaRead + ":" +
dataReplicaSkipEnabled + "]");
RssUtils.checkQuorumSetting(dataReplica, dataReplicaWrite,
dataReplicaRead);
int retryMax = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_RETRY_MAX,
- RssSparkConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+ RssSparkConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
long retryIntervalMax =
sparkConf.getLong(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
- RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
+ RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
int heartBeatThreadNum =
sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
- RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+ RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
this.dataTransferPoolSize =
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
- RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+ RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
- shuffleWriteClient = ShuffleClientFactory
+ shuffleWriteClient = ShuffleClientFactory
.getInstance()
.createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
heartBeatThreadNum,
- dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
+ dataReplica, dataReplicaWrite, dataReplicaRead,
dataReplicaSkipEnabled, dataTransferPoolSize);
this.taskToSuccessBlockIds = taskToSuccessBlockIds;
this.taskToFailedBlockIds = taskToFailedBlockIds;
if (loop != null) {
@@ -339,7 +339,7 @@ public class RssShuffleManager implements ShuffleManager {
TaskContext context,
ShuffleReadMetricsReporter metrics) {
return getReader(handle, 0, Integer.MAX_VALUE, startPartition,
endPartition,
- context, metrics);
+ context, metrics);
}
// The interface is used for compatibility with spark 3.0.1
@@ -353,14 +353,14 @@ public class RssShuffleManager implements ShuffleManager {
ShuffleReadMetricsReporter metrics) {
long start = System.currentTimeMillis();
Roaring64NavigableMap taskIdBitmap = getExpectedTasksByExecutorId(
- handle.shuffleId(),
- startPartition,
- endPartition,
- startMapIndex,
- endMapIndex);
+ handle.shuffleId(),
+ startPartition,
+ endPartition,
+ startMapIndex,
+ endMapIndex);
LOG.info("Get taskId cost " + (System.currentTimeMillis() - start) + " ms,
and request expected blockIds from "
- + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" +
handle.shuffleId() + "], partitionId["
- + startPartition + ", " + endPartition + "]");
+ + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" +
handle.shuffleId() + "], partitionId["
+ + startPartition + ", " + endPartition + "]");
return getReaderImpl(handle, startMapIndex, endMapIndex, startPartition,
endPartition,
context, metrics, taskIdBitmap);
}
@@ -376,14 +376,14 @@ public class RssShuffleManager implements ShuffleManager {
ShuffleReadMetricsReporter metrics) {
long start = System.currentTimeMillis();
Roaring64NavigableMap taskIdBitmap = getExpectedTasksByRange(
- handle.shuffleId(),
- startPartition,
- endPartition,
- startMapIndex,
- endMapIndex);
+ handle.shuffleId(),
+ startPartition,
+ endPartition,
+ startMapIndex,
+ endMapIndex);
LOG.info("Get taskId cost " + (System.currentTimeMillis() - start) + " ms,
and request expected blockIds from "
- + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" +
handle.shuffleId() + "], partitionId["
- + startPartition + ", " + endPartition + "]");
+ + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" +
handle.shuffleId() + "], partitionId["
+ + startPartition + ", " + endPartition + "]");
return getReaderImpl(handle, startMapIndex, endMapIndex, startPartition,
endPartition,
context, metrics, taskIdBitmap);
}
@@ -507,24 +507,24 @@ public class RssShuffleManager implements ShuffleManager {
// This API is only used by Spark3.0 and removed since 3.1,
// so we extract it from getExpectedTasksByExecutorId.
private Roaring64NavigableMap getExpectedTasksByRange(
- int shuffleId,
- int startPartition,
- int endPartition,
- int startMapIndex,
- int endMapIndex) {
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ int startMapIndex,
+ int endMapIndex) {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>
mapStatusIter = null;
try {
mapStatusIter = (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId,
Object, Object>>>>)
- SparkEnv.get().mapOutputTracker().getClass()
- .getDeclaredMethod("getMapSizesByRange",
- int.class, int.class, int.class, int.class, int.class)
- .invoke(SparkEnv.get().mapOutputTracker(),
- shuffleId,
- startMapIndex,
- endMapIndex,
- startPartition,
- endPartition);
+ SparkEnv.get().mapOutputTracker().getClass()
+ .getDeclaredMethod("getMapSizesByRange",
+ int.class, int.class, int.class, int.class, int.class)
+ .invoke(SparkEnv.get().mapOutputTracker(),
+ shuffleId,
+ startMapIndex,
+ endMapIndex,
+ startPartition,
+ endPartition);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 669431c..ce4c247 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -121,7 +121,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks
= entry.getValue();
// todo: compact unnecessary blocks that reach replicaWrite
RssSendShuffleDataRequest request = new RssSendShuffleDataRequest(
- appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
+ appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
long s = System.currentTimeMillis();
RssSendShuffleDataResponse response =
getShuffleServerClient(ssi).sendShuffleData(request);
LOG.info("ShuffleWriteClientImpl sendShuffleData cost:" +
(System.currentTimeMillis() - s) + "(ms)");
@@ -130,11 +130,11 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
// mark a replica of block that has been sent
serverToBlockIds.get(ssi).forEach(block ->
blockIdsTracker.get(block).incrementAndGet());
LOG.info("Send: " + serverToBlockIds.get(ssi).size()
- + " blocks to [" + ssi.getId() + "] successfully");
+ + " blocks to [" + ssi.getId() + "] successfully");
} else {
isAllServersSuccess.set(false);
LOG.warn("Send: " + serverToBlockIds.get(ssi).size() + " blocks to
[" + ssi.getId()
- + "] failed with statusCode[" + response.getStatusCode() +
"], ");
+ + "] failed with statusCode[" + response.getStatusCode() + "],
");
}
} catch (Exception e) {
isAllServersSuccess.set(false);
@@ -146,9 +146,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
}
private void genServerToBlocks(ShuffleBlockInfo sbi, List<ShuffleServerInfo>
serverList,
- Map<ShuffleServerInfo,
- Map<Integer, Map<Integer,
List<ShuffleBlockInfo>>>> serverToBlocks,
- Map<ShuffleServerInfo, List<Long>>
serverToBlockIds) {
+ Map<ShuffleServerInfo, Map<Integer, Map<Integer,
List<ShuffleBlockInfo>>>> serverToBlocks,
+ Map<ShuffleServerInfo, List<Long>> serverToBlockIds) {
int partitionId = sbi.getPartitionId();
int shuffleId = sbi.getShuffleId();
for (ShuffleServerInfo ssi : serverList) {
@@ -180,7 +179,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
Map<ShuffleServerInfo, Map<Integer,
Map<Integer, List<ShuffleBlockInfo>>>> primaryServerToBlocks =
Maps.newHashMap();
Map<ShuffleServerInfo, Map<Integer,
- Map<Integer, List<ShuffleBlockInfo>>>> secondaryServerToBlocks =
Maps.newHashMap();
+ Map<Integer, List<ShuffleBlockInfo>>>> secondaryServerToBlocks =
Maps.newHashMap();
Map<ShuffleServerInfo, List<Long>> primaryServerToBlockIds =
Maps.newHashMap();
Map<ShuffleServerInfo, List<Long>> secondaryServerToBlockIds =
Maps.newHashMap();
@@ -198,23 +197,23 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
if (replicaSkipEnabled) {
genServerToBlocks(sbi, allServers.subList(0, replicaWrite),
- primaryServerToBlocks, primaryServerToBlockIds);
+ primaryServerToBlocks, primaryServerToBlockIds);
genServerToBlocks(sbi, allServers.subList(replicaWrite, replica),
- secondaryServerToBlocks, secondaryServerToBlockIds);
+ secondaryServerToBlocks, secondaryServerToBlockIds);
} else {
// When replicaSkip is disabled, we send data to all replicas within
one round.
genServerToBlocks(sbi, allServers,
- primaryServerToBlocks, primaryServerToBlockIds);
+ primaryServerToBlocks, primaryServerToBlockIds);
}
}
// maintain the count of blocks that have been sent to the server
Map<Long, AtomicInteger> blockIdsTracker = Maps.newConcurrentMap();
primaryServerToBlockIds.values().forEach(
- blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0)))
+ blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0)))
);
secondaryServerToBlockIds.values().forEach(
- blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0)))
+ blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0)))
);
Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
@@ -237,15 +236,14 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
// check success and failed blocks according to the replicaWrite
blockIdsTracker.entrySet().forEach(blockCt -> {
- long blockId = blockCt.getKey();
- int count = blockCt.getValue().get();
- if (count >= replicaWrite) {
- successBlockIds.add(blockId);
- } else {
- failedBlockIds.add(blockId);
- }
+ long blockId = blockCt.getKey();
+ int count = blockCt.getValue().get();
+ if (count >= replicaWrite) {
+ successBlockIds.add(blockId);
+ } else {
+ failedBlockIds.add(blockId);
}
- );
+ });
return new SendShuffleDataResult(successBlockIds, failedBlockIds);
}
@@ -420,7 +418,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
for (Map.Entry<Integer, Integer> entry: partitionReportTracker.entrySet())
{
if (entry.getValue() < replicaWrite) {
throw new RssException("Quorum check of report shuffle result is
failed for appId["
- + appId + "], shuffleId[" + shuffleId + "]");
+ + appId + "], shuffleId[" + shuffleId + "]");
}
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index a9f989a..3de5692 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -83,8 +83,8 @@ public class ConfigUtils {
return (int) value;
} else {
throw new IllegalArgumentException(String.format(
- "Configuration value %s overflows/underflows the integer type.",
- value));
+ "Configuration value %s overflows/underflows the integer type.",
+ value));
}
}
return Integer.parseInt(o.toString());
@@ -126,8 +126,8 @@ public class ConfigUtils {
return false;
default:
throw new IllegalArgumentException(String.format(
- "Unrecognized option for boolean: %s. Expected either true or
false(case insensitive)",
- o));
+ "Unrecognized option for boolean: %s. Expected either true or
false(case insensitive)",
+ o));
}
}
@@ -137,13 +137,13 @@ public class ConfigUtils {
} else if (o.getClass() == Double.class) {
double value = ((Double) o);
if (value == 0.0
- || (value >= Float.MIN_VALUE && value <= Float.MAX_VALUE)
- || (value >= -Float.MAX_VALUE && value <= -Float.MIN_VALUE)) {
+ || (value >= Float.MIN_VALUE && value <= Float.MAX_VALUE)
+ || (value >= -Float.MAX_VALUE && value <= -Float.MIN_VALUE)) {
return (float) value;
} else {
throw new IllegalArgumentException(String.format(
- "Configuration value %s overflows/underflows the float type.",
- value));
+ "Configuration value %s overflows/underflows the float type.",
+ value));
}
}
@@ -189,5 +189,5 @@ public class ConfigUtils {
public static final Function<Double, Boolean> PERCENTAGE_DOUBLE_VALIDATOR =
(Function<Double, Boolean>) value -> Double.compare(value, 100.0) <= 0
&& Double.compare(value, 0.0) >= 0;
-
+
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
index 57347fb..97cc769 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
@@ -29,30 +29,30 @@ import com.google.common.collect.ImmutableMap;
public class UnitConverter {
private static final Map<String, ByteUnit> byteSuffixes =
- ImmutableMap.<String, ByteUnit>builder()
- .put("b", ByteUnit.BYTE)
- .put("k", ByteUnit.KiB)
- .put("kb", ByteUnit.KiB)
- .put("m", ByteUnit.MiB)
- .put("mb", ByteUnit.MiB)
- .put("g", ByteUnit.GiB)
- .put("gb", ByteUnit.GiB)
- .put("t", ByteUnit.TiB)
- .put("tb", ByteUnit.TiB)
- .put("p", ByteUnit.PiB)
- .put("pb", ByteUnit.PiB)
- .build();
+ ImmutableMap.<String, ByteUnit>builder()
+ .put("b", ByteUnit.BYTE)
+ .put("k", ByteUnit.KiB)
+ .put("kb", ByteUnit.KiB)
+ .put("m", ByteUnit.MiB)
+ .put("mb", ByteUnit.MiB)
+ .put("g", ByteUnit.GiB)
+ .put("gb", ByteUnit.GiB)
+ .put("t", ByteUnit.TiB)
+ .put("tb", ByteUnit.TiB)
+ .put("p", ByteUnit.PiB)
+ .put("pb", ByteUnit.PiB)
+ .build();
private static final Map<String, TimeUnit> timeSuffixes =
- ImmutableMap.<String, TimeUnit>builder()
- .put("us", TimeUnit.MICROSECONDS)
- .put("ms", TimeUnit.MILLISECONDS)
- .put("s", TimeUnit.SECONDS)
- .put("m", TimeUnit.MINUTES)
- .put("min", TimeUnit.MINUTES)
- .put("h", TimeUnit.HOURS)
- .put("d", TimeUnit.DAYS)
- .build();
+ ImmutableMap.<String, TimeUnit>builder()
+ .put("us", TimeUnit.MICROSECONDS)
+ .put("ms", TimeUnit.MILLISECONDS)
+ .put("s", TimeUnit.SECONDS)
+ .put("m", TimeUnit.MINUTES)
+ .put("min", TimeUnit.MINUTES)
+ .put("h", TimeUnit.HOURS)
+ .put("d", TimeUnit.DAYS)
+ .build();
public static boolean isByteString(String str) {
String strLower = str.toLowerCase();
@@ -94,14 +94,14 @@ public class UnitConverter {
return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix)
: unit);
} else if (fractionMatcher.matches()) {
throw new NumberFormatException("Fractional values are not supported.
Input was: "
- + fractionMatcher.group(1));
+ + fractionMatcher.group(1));
} else {
throw new NumberFormatException("Failed to parse byte string: " + str);
}
} catch (NumberFormatException e) {
String byteError = "Size must be specified as bytes (b), "
- + "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or
pebibytes(p). "
- + "E.g. 50b, 100k, or 250m.";
+ + "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or
pebibytes(p). "
+ + "E.g. 50b, 100k, or 250m.";
throw new NumberFormatException(byteError + "\n" + e.getMessage());
}
}
@@ -171,8 +171,8 @@ public class UnitConverter {
return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) :
unit);
} catch (NumberFormatException e) {
String timeError = "Time must be specified as seconds (s), "
- + "milliseconds (ms), microseconds (us), minutes (m or min), hour (h),
or day (d). "
- + "E.g. 50s, 100ms, or 250us.";
+ + "milliseconds (ms), microseconds (us), minutes (m or min), hour
(h), or day (d). "
+ + "E.g. 50s, 100ms, or 250us.";
throw new NumberFormatException(timeError + "\n" + e.getMessage());
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
index 3dffb8b..27d8e93 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
@@ -74,58 +74,58 @@ public class PartitionBalanceAssignmentStrategy implements
AssignmentStrategy {
SortedMap<PartitionRange, List<ServerNode>> assignments = new TreeMap<>();
synchronized (this) {
- List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
- Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos =
Maps.newConcurrentMap();
- for (ServerNode node : nodes) {
- PartitionAssignmentInfo partitionInfo;
- if (serverToPartitions.containsKey(node)) {
- partitionInfo = serverToPartitions.get(node);
- if (partitionInfo.getTimestamp() < node.getTimestamp()) {
- partitionInfo.resetPartitionNum();
- partitionInfo.setTimestamp(node.getTimestamp());
- }
- } else {
- partitionInfo = new PartitionAssignmentInfo();
+ List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
+ Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos =
Maps.newConcurrentMap();
+ for (ServerNode node : nodes) {
+ PartitionAssignmentInfo partitionInfo;
+ if (serverToPartitions.containsKey(node)) {
+ partitionInfo = serverToPartitions.get(node);
+ if (partitionInfo.getTimestamp() < node.getTimestamp()) {
+ partitionInfo.resetPartitionNum();
+ partitionInfo.setTimestamp(node.getTimestamp());
}
- newPartitionInfos.putIfAbsent(node, partitionInfo);
+ } else {
+ partitionInfo = new PartitionAssignmentInfo();
}
- serverToPartitions = newPartitionInfos;
- int averagePartitions = totalPartitionNum * replica /
clusterManager.getShuffleNodesMax();
- int assignPartitions = averagePartitions < 1 ? 1 : averagePartitions;
- nodes.sort(new Comparator<ServerNode>() {
- @Override
- public int compare(ServerNode o1, ServerNode o2) {
- PartitionAssignmentInfo partitionInfo1 =
serverToPartitions.get(o1);
- PartitionAssignmentInfo partitionInfo2 =
serverToPartitions.get(o2);
- double v1 = o1.getAvailableMemory() * 1.0 /
(partitionInfo1.getPartitionNum() + assignPartitions);
- double v2 = o2.getAvailableMemory() * 1.0 /
(partitionInfo2.getPartitionNum() + assignPartitions);
- return -Double.compare(v1, v2);
- }
- });
-
- if (nodes.isEmpty() || nodes.size() < replica) {
- throw new RuntimeException("There isn't enough shuffle servers");
- }
-
- int expectNum = clusterManager.getShuffleNodesMax();
- if (nodes.size() < clusterManager.getShuffleNodesMax()) {
- LOG.warn("Can't get expected servers [" + expectNum + "] and found
only [" + nodes.size() + "]");
- expectNum = nodes.size();
+ newPartitionInfos.putIfAbsent(node, partitionInfo);
+ }
+ serverToPartitions = newPartitionInfos;
+ int averagePartitions = totalPartitionNum * replica /
clusterManager.getShuffleNodesMax();
+ int assignPartitions = averagePartitions < 1 ? 1 : averagePartitions;
+ nodes.sort(new Comparator<ServerNode>() {
+ @Override
+ public int compare(ServerNode o1, ServerNode o2) {
+ PartitionAssignmentInfo partitionInfo1 = serverToPartitions.get(o1);
+ PartitionAssignmentInfo partitionInfo2 = serverToPartitions.get(o2);
+ double v1 = o1.getAvailableMemory() * 1.0 /
(partitionInfo1.getPartitionNum() + assignPartitions);
+ double v2 = o2.getAvailableMemory() * 1.0 /
(partitionInfo2.getPartitionNum() + assignPartitions);
+ return -Double.compare(v1, v2);
}
-
- List<ServerNode> candidatesNodes = nodes.subList(0, expectNum);
- int idx = 0;
- List<PartitionRange> ranges =
CoordinatorUtils.generateRanges(totalPartitionNum, 1);
- for (PartitionRange range : ranges) {
- List<ServerNode> assignNodes = Lists.newArrayList();
- for (int rc = 0; rc < replica; rc++) {
- ServerNode node = candidatesNodes.get(idx);
- idx = CoordinatorUtils.nextIdx(idx, candidatesNodes.size());
- serverToPartitions.get(node).incrementPartitionNum();
- assignNodes.add(node);
- }
- assignments.put(range, assignNodes);
+ });
+
+ if (nodes.isEmpty() || nodes.size() < replica) {
+ throw new RuntimeException("There isn't enough shuffle servers");
+ }
+
+ int expectNum = clusterManager.getShuffleNodesMax();
+ if (nodes.size() < clusterManager.getShuffleNodesMax()) {
+ LOG.warn("Can't get expected servers [" + expectNum + "] and found
only [" + nodes.size() + "]");
+ expectNum = nodes.size();
+ }
+
+ List<ServerNode> candidatesNodes = nodes.subList(0, expectNum);
+ int idx = 0;
+ List<PartitionRange> ranges =
CoordinatorUtils.generateRanges(totalPartitionNum, 1);
+ for (PartitionRange range : ranges) {
+ List<ServerNode> assignNodes = Lists.newArrayList();
+ for (int rc = 0; rc < replica; rc++) {
+ ServerNode node = candidatesNodes.get(idx);
+ idx = CoordinatorUtils.nextIdx(idx, candidatesNodes.size());
+ serverToPartitions.get(node).incrementPartitionNum();
+ assignNodes.add(node);
}
+ assignments.put(range, assignNodes);
+ }
}
return new PartitionRangeAssignment(assignments);
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 79f7bd5..7c5beaf 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -416,8 +416,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
.setPartitionId(request.getPartitionId())
.build();
GetShuffleResultResponse rpcResponse = blockingStub
- .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS)
- .getShuffleResult(rpcRequest);
+ .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS)
+ .getShuffleResult(rpcRequest);
StatusCode statusCode = rpcResponse.getStatus();
RssGetShuffleResultResponse response;
diff --git
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index e6a1811..1a54ee2 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -165,18 +165,18 @@ public class LocalStorageChecker extends Checker {
byte[] readData = new byte[1024];
int readBytes = -1;
try (FileInputStream fis = new FileInputStream(writeFile)) {
- int hasReadBytes = 0;
- do {
- readBytes = fis.read(readData);
- if (hasReadBytes < 1024) {
- for (int i = 0; i < readBytes; i++) {
- if (data[hasReadBytes + i] != readData[i]) {
- return false;
- }
+ int hasReadBytes = 0;
+ do {
+ readBytes = fis.read(readData);
+ if (hasReadBytes < 1024) {
+ for (int i = 0; i < readBytes; i++) {
+ if (data[hasReadBytes + i] != readData[i]) {
+ return false;
}
}
- hasReadBytes += readBytes;
- } while (readBytes != -1);
+ }
+ hasReadBytes += readBytes;
+ } while (readBytes != -1);
}
} catch (Exception e) {
LOG.error("Storage read and write error ", e);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 99efc50..bc3ec39 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -290,8 +290,8 @@ public class ShuffleBufferManager {
// flush the buffer with required map which is <appId -> shuffleId>
public synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
- for (Map.Entry<String, Map<Integer, RangeMap<Integer,
- ShuffleBuffer>>> appIdToBuffers : bufferPool.entrySet()) {
+ for (Map.Entry<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>>
+ appIdToBuffers : bufferPool.entrySet()) {
String appId = appIdToBuffers.getKey();
if (requiredFlush.containsKey(appId)) {
for (Map.Entry<Integer, RangeMap<Integer, ShuffleBuffer>>
shuffleIdToBuffers :
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index a49b12e..117c347 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -133,16 +133,16 @@ public class LocalStorage extends AbstractStorage {
@Override
public void updateWriteMetrics(StorageWriteMetrics metrics) {
- updateWrite(RssUtils.generateShuffleKey(metrics.getAppId(),
metrics.getShuffleId()),
- metrics.getDataSize(),
- metrics.getPartitions());
+ updateWrite(RssUtils.generateShuffleKey(metrics.getAppId(),
metrics.getShuffleId()),
+ metrics.getDataSize(),
+ metrics.getPartitions());
}
@Override
public void updateReadMetrics(StorageReadMetrics metrics) {
- String shuffleKey = RssUtils.generateShuffleKey(metrics.getAppId(),
metrics.getShuffleId());
- prepareStartRead(shuffleKey);
- updateShuffleLastReadTs(shuffleKey);
+ String shuffleKey = RssUtils.generateShuffleKey(metrics.getAppId(),
metrics.getShuffleId());
+ prepareStartRead(shuffleKey);
+ updateShuffleLastReadTs(shuffleKey);
}
@Override
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index 2505cd1..a27136b 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -55,11 +55,11 @@ public class LocalStorageMeta {
.filter(e -> (!checkRead || e.getValue().isStartRead.get()) &&
e.getValue().getNotUploadedSize() > 0)
.collect(Collectors.toList());
- shuffleMetaList.sort((Entry<String, ShuffleMeta> o1, Entry<String,
ShuffleMeta> o2) -> {
- long sz1 = o1.getValue().getSize().longValue();
- long sz2 = o2.getValue().getSize().longValue();
- return -Long.compare(sz1, sz2);
- });
+ shuffleMetaList.sort((Entry<String, ShuffleMeta> o1, Entry<String,
ShuffleMeta> o2) -> {
+ long sz1 = o1.getValue().getSize().longValue();
+ long sz2 = o2.getValue().getSize().longValue();
+ return -Long.compare(sz1, sz2);
+ });
return shuffleMetaList
.subList(0, Math.min(shuffleMetaList.size(), hint))
@@ -82,7 +82,7 @@ public class LocalStorageMeta {
uploadedPartitionBitmap = shuffleMeta.uploadedPartitionBitmap.clone();
}
for (int partition : uploadedPartitionBitmap) {
- partitionBitmap.remove(partition);
+ partitionBitmap.remove(partition);
}
return partitionBitmap;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index 9819c01..f990c38 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -231,30 +231,30 @@ public class ComposedClientReadHandler implements
ClientReadHandler {
@VisibleForTesting
public String getReadBlokNumInfo() {
long totalBlockNum = hotReadBlockNum + warmReadBlockNum
- + coldReadBlockNum + frozenReadBlockNum;
+ + coldReadBlockNum + frozenReadBlockNum;
return "Client read " + totalBlockNum + " blocks ["
- + " hot:" + hotReadBlockNum + " warm:" + warmReadBlockNum
- + " cold:" + coldReadBlockNum + " frozen:" + frozenReadBlockNum + " ]";
+ + " hot:" + hotReadBlockNum + " warm:" + warmReadBlockNum
+ + " cold:" + coldReadBlockNum + " frozen:" + frozenReadBlockNum + " ]";
}
@VisibleForTesting
public String getReadLengthInfo() {
long totalReadLength = hotReadLength + warmReadLength
- + coldReadLength + frozenReadLength;
+ + coldReadLength + frozenReadLength;
return "Client read " + totalReadLength + " bytes ["
- + " hot:" + hotReadLength + " warm:" + warmReadLength
- + " cold:" + coldReadLength + " frozen:" + frozenReadLength + " ]";
+ + " hot:" + hotReadLength + " warm:" + warmReadLength
+ + " cold:" + coldReadLength + " frozen:" + frozenReadLength + " ]";
}
@VisibleForTesting
public String getReadUncompressLengthInfo() {
long totalReadUncompressLength = hotReadUncompressLength +
warmReadUncompressLength
- + coldReadUncompressLength + frozenReadUncompressLength;
+ + coldReadUncompressLength + frozenReadUncompressLength;
return "Client read " + totalReadUncompressLength + " uncompressed bytes ["
- + " hot:" + hotReadUncompressLength
- + " warm:" + warmReadUncompressLength
- + " cold:" + coldReadUncompressLength
- + " frozen:" + frozenReadUncompressLength + " ]";
+ + " hot:" + hotReadUncompressLength
+ + " warm:" + warmReadUncompressLength
+ + " cold:" + coldReadUncompressLength
+ + " frozen:" + frozenReadUncompressLength + " ]";
}
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
index 6bc11f1..ce7a515 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
@@ -39,12 +39,12 @@ public abstract class DataSkippableReadHandler extends
AbstractClientReadHandler
protected Roaring64NavigableMap processBlockIds;
public DataSkippableReadHandler(
- String appId,
- int shuffleId,
- int partitionId,
- int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds) {
+ String appId,
+ int shuffleId,
+ int partitionId,
+ int readBufferSize,
+ Roaring64NavigableMap expectBlockIds,
+ Roaring64NavigableMap processBlockIds) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index 7151d94..f0e7491 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -122,8 +122,8 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
// init lazily like LocalFileClientRead
if (readHandlers.isEmpty()) {
String fullShufflePath =
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
- ShuffleStorageUtils.getShuffleDataPathWithRange(appId,
- shuffleId, partitionId, partitionNumPerRange, partitionNum));
+ ShuffleStorageUtils.getShuffleDataPathWithRange(appId,
+ shuffleId, partitionId, partitionNumPerRange, partitionNum));
init(fullShufflePath);
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
index 75dae67..955bd5f 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
@@ -109,12 +109,12 @@ public class HdfsFileWriter implements Closeable {
fsDataOutputStream.writeInt(partitionList.size());
headerContentBuf.putInt(partitionList.size());
for (int i = 0; i < partitionList.size(); i++) {
- fsDataOutputStream.writeInt(partitionList.get(i));
- fsDataOutputStream.writeLong(indexFileSizeList.get(i));
- fsDataOutputStream.writeLong(dataFileSizeList.get(i));
- headerContentBuf.putInt(partitionList.get(i));
- headerContentBuf.putLong(indexFileSizeList.get(i));
- headerContentBuf.putLong(dataFileSizeList.get(i));
+ fsDataOutputStream.writeInt(partitionList.get(i));
+ fsDataOutputStream.writeLong(indexFileSizeList.get(i));
+ fsDataOutputStream.writeLong(dataFileSizeList.get(i));
+ headerContentBuf.putInt(partitionList.get(i));
+ headerContentBuf.putLong(indexFileSizeList.get(i));
+ headerContentBuf.putLong(dataFileSizeList.get(i));
}
headerContentBuf.flip();
fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf));
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index f8a58f6..de54241 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -82,14 +82,14 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
byte[] data = readShuffleData(shuffleDataSegment.getOffset(),
expectedLength);
if (data.length == 0) {
LOG.warn("Fail to read expected[{}] data, actual[{}] and segment is {}
from file {}.data",
- expectedLength, data.length, shuffleDataSegment, filePrefix);
+ expectedLength, data.length, shuffleDataSegment, filePrefix);
return null;
}
ShuffleDataResult shuffleDataResult = new ShuffleDataResult(data,
shuffleDataSegment.getBufferSegments());
if (shuffleDataResult.isEmpty()) {
LOG.warn("Shuffle data is empty, expected length {}, data length {},
segment {} in file {}.data",
- expectedLength, data.length, shuffleDataSegment, filePrefix);
+ expectedLength, data.length, shuffleDataSegment, filePrefix);
return null;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
index 1c9c048..8cf1fe2 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
@@ -40,22 +40,22 @@ public class LocalFileQuorumClientReadHandler extends
AbstractClientReadHandler
private long readUncompressLength = 0L;
public LocalFileQuorumClientReadHandler(
- String appId,
- int shuffleId,
- int partitionId,
- int indexReadLimit,
- int partitionNumPerRange,
- int partitionNum,
- int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
- List<ShuffleServerClient> shuffleServerClients) {
- this.appId = appId;
- this.shuffleId = shuffleId;
- this.partitionId = partitionId;
- this.readBufferSize = readBufferSize;
- for (ShuffleServerClient client: shuffleServerClients) {
- handlers.add(new LocalFileClientReadHandler(
+ String appId,
+ int shuffleId,
+ int partitionId,
+ int indexReadLimit,
+ int partitionNumPerRange,
+ int partitionNum,
+ int readBufferSize,
+ Roaring64NavigableMap expectBlockIds,
+ Roaring64NavigableMap processBlockIds,
+ List<ShuffleServerClient> shuffleServerClients) {
+ this.appId = appId;
+ this.shuffleId = shuffleId;
+ this.partitionId = partitionId;
+ this.readBufferSize = readBufferSize;
+ for (ShuffleServerClient client: shuffleServerClients) {
+ handlers.add(new LocalFileClientReadHandler(
appId,
shuffleId,
partitionId,
@@ -66,8 +66,8 @@ public class LocalFileQuorumClientReadHandler extends
AbstractClientReadHandler
expectBlockIds,
processBlockIds,
client
- ));
- }
+ ));
+ }
}
@Override
@@ -85,7 +85,7 @@ public class LocalFileQuorumClientReadHandler extends
AbstractClientReadHandler
}
if (!readSuccessful) {
throw new RssException("Failed to read all replicas for appId[" + appId
+ "], shuffleId["
- + shuffleId + "], partitionId[" + partitionId + "]");
+ + shuffleId + "], partitionId[" + partitionId + "]");
}
return result;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index 5f0db19..73bcc46 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -54,7 +54,7 @@ public class MemoryClientReadHandler extends
AbstractClientReadHandler {
ShuffleDataResult result = null;
RssGetInMemoryShuffleDataRequest request = new
RssGetInMemoryShuffleDataRequest(
- appId,shuffleId, partitionId, lastBlockId, readBufferSize);
+ appId,shuffleId, partitionId, lastBlockId, readBufferSize);
try {
RssGetInMemoryShuffleDataResponse response =
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
index a3e6a5b..5fbe8ec 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
@@ -45,8 +45,8 @@ public class MemoryQuorumClientReadHandler extends
AbstractClientReadHandler {
this.partitionId = partitionId;
this.readBufferSize = readBufferSize;
shuffleServerClients.forEach(client ->
- handlers.add(new MemoryClientReadHandler(
- appId, shuffleId, partitionId, readBufferSize, client))
+ handlers.add(new MemoryClientReadHandler(
+ appId, shuffleId, partitionId, readBufferSize, client))
);
}