This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 55d64a9  [Style] Check indentation (#56)
55d64a9 is described below

commit 55d64a99d17b7199a7619d2da4238113c3fa2d3f
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Jul 15 14:39:03 2022 +0800

    [Style] Check indentation (#56)
    
    ### What changes were proposed in this pull request?
    
    1. Set indentation rules in checkstyle
    2. Fix violations.
    
    ### Why are the changes needed?
    
    #54
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    
https://github.com/kaijchen/incubator-uniffle/runs/7351765430?check_suite_focus=true
---
 checkstyle.xml                                     | 18 +---
 .../hadoop/mapred/SortWriteBufferManager.java      |  2 +-
 .../org/apache/hadoop/mapreduce/RssMRUtils.java    |  8 +-
 .../mapreduce/task/reduce/RssEventFetcher.java     | 20 ++---
 .../hadoop/mapreduce/task/reduce/RssFetcher.java   | 26 +++---
 .../hadoop/mapreduce/task/reduce/RssShuffle.java   | 52 ++++++------
 .../hadoop/mapreduce/v2/app/RssMRAppMaster.java    |  2 +-
 .../apache/spark/shuffle/RssShuffleManager.java    |  4 +-
 .../spark/shuffle/writer/RssShuffleWriter.java     | 32 ++++----
 .../spark/shuffle/DelegationRssShuffleManager.java | 76 ++++++++---------
 .../apache/spark/shuffle/RssShuffleManager.java    | 82 +++++++++---------
 .../client/impl/ShuffleWriteClientImpl.java        | 40 +++++----
 .../apache/uniffle/common/config/ConfigUtils.java  | 18 ++--
 .../apache/uniffle/common/util/UnitConverter.java  | 54 ++++++------
 .../PartitionBalanceAssignmentStrategy.java        | 96 +++++++++++-----------
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |  4 +-
 .../apache/uniffle/server/LocalStorageChecker.java | 20 ++---
 .../server/buffer/ShuffleBufferManager.java        |  4 +-
 .../uniffle/storage/common/LocalStorage.java       | 12 +--
 .../uniffle/storage/common/LocalStorageMeta.java   | 12 +--
 .../handler/impl/ComposedClientReadHandler.java    | 22 ++---
 .../handler/impl/DataSkippableReadHandler.java     | 12 +--
 .../handler/impl/HdfsClientReadHandler.java        |  4 +-
 .../storage/handler/impl/HdfsFileWriter.java       | 12 +--
 .../handler/impl/HdfsShuffleReadHandler.java       |  4 +-
 .../impl/LocalFileQuorumClientReadHandler.java     | 38 ++++-----
 .../handler/impl/MemoryClientReadHandler.java      |  2 +-
 .../impl/MemoryQuorumClientReadHandler.java        |  4 +-
 28 files changed, 332 insertions(+), 348 deletions(-)

diff --git a/checkstyle.xml b/checkstyle.xml
index b02acfa..c94bf90 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -249,26 +249,12 @@
              value="GenericWhitespace ''{0}'' is not preceded with 
whitespace."/>
         </module>
         <module name="Indentation">
-            <!--
-            <property name="basicOffset" value="4"/>
+            <property name="basicOffset" value="2"/>
             <property name="braceAdjustment" value="0"/>
-            <property name="caseIndent" value="4"/>
+            <property name="caseIndent" value="2"/>
             <property name="throwsIndent" value="4"/>
             <property name="lineWrappingIndentation" value="4"/>
             <property name="arrayInitIndent" value="4"/>
-            -->
-            <!-- new add 由于checkstyle 未修复部分缩进场景有问题选择先屏蔽规则
-             https://github.com/checkstyle/checkstyle/issues/3342
-             -->
-
-            <property name="severity" value="ignore"/>
-
-            <property name="basicOffset" value="4"/>
-            <property name="braceAdjustment" value="0"/>
-            <property name="caseIndent" value="4"/>
-            <property name="throwsIndent" value="8"/>
-            <property name="lineWrappingIndentation" value="8"/>
-            <property name="arrayInitIndent" value="4"/>
         </module>
         <module name="AbbreviationAsWordInName">
             <!-- new add -->
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java 
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 0382538..4995cc1 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -176,7 +176,7 @@ public class SortWriteBufferManager<K, V> {
       }
     }
     if (memoryUsedSize.get() > maxMemSize * memoryThreshold
-      && inSendListBytes.get() <= maxMemSize * sendThreshold) {
+        && inSendListBytes.get() <= maxMemSize * sendThreshold) {
       sendBuffersToServers();
     }
     mapOutputRecordCounter.increment(1);
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 97bd716..f2e7984 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -95,13 +95,13 @@ public class RssMRUtils {
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
             heartBeatThreadNum, replica, replicaWrite, replicaRead, 
replicaSkipEnabled,
-                dataTransferPoolSize);
+            dataTransferPoolSize);
     return client;
   }
 
   public static Set<ShuffleServerInfo> getAssignedServers(JobConf jobConf, int 
reduceID) {
     String servers = jobConf.get(RssMRConfig.RSS_ASSIGNMENT_PREFIX
-      + String.valueOf(reduceID));
+        + String.valueOf(reduceID));
     String[] splitServers = servers.split(",");
     Set<ShuffleServerInfo> assignServers = Sets.newHashSet();
     for (String splitServer : splitServers) {
@@ -110,7 +110,7 @@ public class RssMRUtils {
         throw new RssException("partition " + reduceID + " server info isn't 
right");
       }
       ShuffleServerInfo sever = new 
ShuffleServerInfo(StringUtils.join(serverInfo, "-"),
-        serverInfo[0], Integer.parseInt(serverInfo[1]));
+          serverInfo[0], Integer.parseInt(serverInfo[1]));
       assignServers.add(sever);
     }
     return assignServers;
@@ -118,7 +118,7 @@ public class RssMRUtils {
 
   public static ApplicationAttemptId getApplicationAttemptId() {
     String containerIdStr =
-      System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+        System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
     ContainerId containerId = ContainerId.fromString(containerIdStr);
     return containerId.getApplicationAttemptId();
   }
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
index d30a192..1619678 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
@@ -67,8 +67,8 @@ public class RssEventFetcher<K,V> {
       acceptMapCompletionEvents();
     } catch (Exception e) {
       throw new RssException("Reduce: " + reduce
-        + " fails to accept completion events due to: "
-        + e.getMessage());
+          + " fails to accept completion events due to: "
+          + e.getMessage());
     }
 
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
@@ -117,13 +117,13 @@ public class RssEventFetcher<K,V> {
       case OBSOLETE:
         obsoleteMaps.add(event.getTaskAttemptId());
         LOG.info("Ignoring obsolete output of "
-          + event.getTaskStatus() + " map-task: '" + event.getTaskAttemptId() 
+ "'");
+            + event.getTaskStatus() + " map-task: '" + 
event.getTaskAttemptId() + "'");
         break;
 
       case TIPFAILED:
         tipFailedCount++;
         LOG.info("Ignoring output of failed map TIP: '"
-          + event.getTaskAttemptId() + "'");
+            + event.getTaskAttemptId() + "'");
         break;
 
       default:
@@ -138,14 +138,14 @@ public class RssEventFetcher<K,V> {
 
     do {
       MapTaskCompletionEventsUpdate update =
-        umbilical.getMapCompletionEvents(
-          (org.apache.hadoop.mapred.JobID) reduce.getJobID(),
-          fromEventIdx,
-          maxEventsToFetch,
-          (org.apache.hadoop.mapred.TaskAttemptID) reduce);
+          umbilical.getMapCompletionEvents(
+              (org.apache.hadoop.mapred.JobID) reduce.getJobID(),
+              fromEventIdx,
+              maxEventsToFetch,
+              (org.apache.hadoop.mapred.TaskAttemptID) reduce);
       events = update.getMapTaskCompletionEvents();
       LOG.debug("Got " + events.length + " map completion events from "
-        + fromEventIdx);
+          + fromEventIdx);
 
       assert !update.shouldReset() : "Unexpected legacy state";
 
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
index ba9a1c8..128bfb9 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssFetcher.java
@@ -100,17 +100,17 @@ public class RssFetcher<K,V> {
     this.metrics = metrics;
     this.reduceId = reduceId;
     ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
-      RssFetcher.ShuffleErrors.IO_ERROR.toString());
+        RssFetcher.ShuffleErrors.IO_ERROR.toString());
     wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
-      RssFetcher.ShuffleErrors.WRONG_LENGTH.toString());
+        RssFetcher.ShuffleErrors.WRONG_LENGTH.toString());
     badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
-      RssFetcher.ShuffleErrors.BAD_ID.toString());
+        RssFetcher.ShuffleErrors.BAD_ID.toString());
     wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
-      RssFetcher.ShuffleErrors.WRONG_MAP.toString());
+        RssFetcher.ShuffleErrors.WRONG_MAP.toString());
     connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
-      RssFetcher.ShuffleErrors.CONNECTION.toString());
+        RssFetcher.ShuffleErrors.CONNECTION.toString());
     wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
-      RssFetcher.ShuffleErrors.WRONG_REDUCE.toString());
+        RssFetcher.ShuffleErrors.WRONG_REDUCE.toString());
 
     this.shuffleReadClient = shuffleReadClient;
     this.totalBlockCount = totalBlockCount;
@@ -151,7 +151,7 @@ public class RssFetcher<K,V> {
     if (!hasPendingData && compressedData != null) {
       final long startDecompress = System.currentTimeMillis();
       uncompressedData = RssShuffleUtils.decompressData(
-        compressedData, compressedBlock.getUncompressLength(), false).array();
+          compressedData, compressedBlock.getUncompressLength(), 
false).array();
       unCompressionLength += compressedBlock.getUncompressLength();
       long decompressDuration = System.currentTimeMillis() - startDecompress;
       decompressTime += decompressDuration;
@@ -187,9 +187,9 @@ public class RssFetcher<K,V> {
       shuffleReadClient.logStatics();
       metrics.inputBytes(unCompressionLength);
       LOG.info("reduce task " + reduceId.toString() + " cost " + readTime + " 
ms to fetch and "
-        + decompressTime + " ms to decompress with unCompressionLength["
-        + unCompressionLength + "] and " + serializeTime + " ms to serialize 
and "
-        + waitTime + " ms to wait resource");
+          + decompressTime + " ms to decompress with unCompressionLength["
+          + unCompressionLength + "] and " + serializeTime + " ms to serialize 
and "
+          + waitTime + " ms to wait resource");
       stopFetch();
     }
   }
@@ -226,13 +226,13 @@ public class RssFetcher<K,V> {
       mapOutput.commit();
       if (mapOutput instanceof OnDiskMapOutput) {
         LOG.info("Reduce: " + reduceId + " allocates disk to accept block "
-          + " with byte sizes: " + uncompressedData.length);
+            + " with byte sizes: " + uncompressedData.length);
       }
     } catch (Throwable t) {
       ioErrs.increment(1);
       mapOutput.abort();
       throw new RssException("Reduce: " + reduceId + " cannot write block to "
-        + mapOutput.getClass().getSimpleName() + " due to: " + 
t.getClass().getName());
+          + mapOutput.getClass().getSimpleName() + " due to: " + 
t.getClass().getName());
     }
     return true;
   }
@@ -258,7 +258,7 @@ public class RssFetcher<K,V> {
     double transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
 
     progress.setStatus("copy(" + copyBlockCount + " of " + totalBlockCount + " 
at "
-      + mbpsFormat.format(transferRate) + " MB/s)");
+        + mbpsFormat.format(transferRate) + " MB/s)");
   }
 
   @VisibleForTesting
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 618f80b..8f3efd3 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -105,34 +105,34 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
     this.appAttemptId = RssMRUtils.getApplicationAttemptId().getAttemptId();
     this.storageType = RssMRUtils.getString(rssJobConf, mrJobConf, 
RssMRConfig.RSS_STORAGE_TYPE);
     this.replicaWrite = RssMRUtils.getInt(rssJobConf, mrJobConf, 
RssMRConfig.RSS_DATA_REPLICA_WRITE,
-      RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
+        RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
     this.replicaRead = RssMRUtils.getInt(rssJobConf, mrJobConf, 
RssMRConfig.RSS_DATA_REPLICA_READ,
-      RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+        RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
     this.replica = RssMRUtils.getInt(rssJobConf, mrJobConf, 
RssMRConfig.RSS_DATA_REPLICA,
-      RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
+        RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
 
     this.partitionNum = mrJobConf.getNumReduceTasks();
     this.partitionNumPerRange = RssMRUtils.getInt(rssJobConf, mrJobConf, 
RssMRConfig.RSS_PARTITION_NUM_PER_RANGE,
-      RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
+        RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);
     this.basePath = RssMRUtils.getString(rssJobConf, mrJobConf, 
RssMRConfig.RSS_REMOTE_STORAGE_PATH);
     this.indexReadLimit = RssMRUtils.getInt(rssJobConf, mrJobConf, 
RssMRConfig.RSS_INDEX_READ_LIMIT,
-      RssMRConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE);
+        RssMRConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE);
     this.readBufferSize = (int)UnitConverter.byteStringAsBytes(
-      RssMRUtils.getString(rssJobConf, mrJobConf, 
RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE,
-        RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE));
+        RssMRUtils.getString(rssJobConf, mrJobConf, 
RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE,
+            RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE));
     String remoteStorageConf = RssMRUtils.getString(rssJobConf, mrJobConf, 
RssMRConfig.RSS_REMOTE_STORAGE_CONF, "");
     this.remoteStorageInfo = new RemoteStorageInfo(basePath, 
remoteStorageConf);
-   }
+  }
 
   protected MergeManager<K, V> createMergeManager(
-    ShuffleConsumerPlugin.Context context) {
+      ShuffleConsumerPlugin.Context context) {
     return new MergeManagerImpl<K, V>(reduceId, mrJobConf, 
context.getLocalFS(),
-      context.getLocalDirAllocator(), reporter, context.getCodec(),
-      context.getCombinerClass(), context.getCombineCollector(),
-      context.getSpilledRecordsCounter(),
-      context.getReduceCombineInputCounter(),
-      context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
-      context.getMapOutputFile());
+        context.getLocalDirAllocator(), reporter, context.getCodec(),
+        context.getCombinerClass(), context.getCombineCollector(),
+        context.getSpilledRecordsCounter(),
+        context.getReduceCombineInputCounter(),
+        context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
+        context.getMapOutputFile());
   }
 
   @Override
@@ -149,21 +149,21 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
     // just get blockIds from RSS servers
     ShuffleWriteClient writeClient = RssMRUtils.createShuffleClient(mrJobConf);
     Roaring64NavigableMap blockIdBitmap = writeClient.getShuffleResult(
-      clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId());
+        clientType, serverInfoSet, appId, 0, reduceId.getTaskID().getId());
     writeClient.close();
 
     // get map-completion events to generate RSS taskIDs
     final RssEventFetcher<K,V> eventFetcher =
-      new RssEventFetcher<K,V>(appAttemptId, reduceId, umbilical, mrJobConf, 
MAX_EVENTS_TO_FETCH);
+        new RssEventFetcher<K,V>(appAttemptId, reduceId, umbilical, mrJobConf, 
MAX_EVENTS_TO_FETCH);
     Roaring64NavigableMap taskIdBitmap = eventFetcher.fetchAllRssTaskIds();
 
     LOG.info("In reduce: " + reduceId
-      + ", RSS MR client has fetched blockIds and taskIds successfully");
+        + ", RSS MR client has fetched blockIds and taskIds successfully");
 
     // start fetcher to fetch blocks from RSS servers
     if (!taskIdBitmap.isEmpty()) {
       LOG.info("In reduce: " + reduceId
-        + ", Rss MR client starts to fetch blocks from RSS server");
+          + ", Rss MR client starts to fetch blocks from RSS server");
       JobConf readerJobConf = new JobConf((mrJobConf));
       if (!remoteStorageInfo.isEmpty()) {
         for (Map.Entry<String, String> entry : 
remoteStorageInfo.getConfItems().entrySet()) {
@@ -171,15 +171,15 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
         }
       }
       CreateShuffleReadClientRequest request = new 
CreateShuffleReadClientRequest(
-        appId, 0, reduceId.getTaskID().getId(), storageType, basePath, 
indexReadLimit, readBufferSize,
-        partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
serverInfoList,
-        readerJobConf, new MRIdHelper());
+          appId, 0, reduceId.getTaskID().getId(), storageType, basePath, 
indexReadLimit, readBufferSize,
+          partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
serverInfoList,
+          readerJobConf, new MRIdHelper());
       ShuffleReadClient shuffleReadClient = 
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
       RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, 
merger, copyPhase, reporter, metrics,
-        shuffleReadClient, blockIdBitmap.getLongCardinality());
+          shuffleReadClient, blockIdBitmap.getLongCardinality());
       fetcher.fetchAllRssBlocks();
       LOG.info("In reduce: " + reduceId
-        + ", Rss MR client fetches blocks from RSS server successfully");
+          + ", Rss MR client fetches blocks from RSS server successfully");
     }
 
     copyPhase.complete();
@@ -198,12 +198,12 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
     synchronized (this) {
       if (throwable != null) {
         throw new Shuffle.ShuffleError("error in shuffle in " + 
throwingThreadName,
-          throwable);
+            throwable);
       }
     }
 
     LOG.info("In reduce: " + reduceId
-      + ", Rss MR client returns sorted data to reduce successfully");
+        + ", Rss MR client returns sorted data to reduce successfully");
 
     return kvIter;
   }
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index db8d7b9..e15351b 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -194,7 +194,7 @@ public class RssMRAppMaster extends MRAppMaster {
       RemoteStorageInfo defaultRemoteStorage =
           new RemoteStorageInfo(conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, 
""));
       RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(
-        appId, defaultRemoteStorage, dynamicConfEnabled, storageType, client);
+          appId, defaultRemoteStorage, dynamicConfEnabled, storageType, 
client);
       // set the remote storage with actual value
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, 
remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, 
remoteStorage.getConfString());
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c5da1ac..705d66b 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -147,7 +147,7 @@ public class RssShuffleManager implements ShuffleManager {
     this.dataReplicaRead =  
sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ,
         RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
     this.dataTransferPoolSize = 
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
-            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+        RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
     this.dataReplicaSkipEnabled = 
sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
         RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
     LOG.info("Check quorum config ["
@@ -171,7 +171,7 @@ public class RssShuffleManager implements ShuffleManager {
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
+            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index be347a3..1901258 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -287,23 +287,23 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   public Option<MapStatus> stop(boolean success) {
     try {
       if (success) {
-          // fill partitionLengths with non zero dummy value so map output 
tracker could work correctly
-          long[] partitionLengths = new long[partitioner.numPartitions()];
-          Arrays.fill(partitionLengths, 1);
-          final BlockManagerId blockManagerId =
-              createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId);
+        // fill partitionLengths with non zero dummy value so map output 
tracker could work correctly
+        long[] partitionLengths = new long[partitioner.numPartitions()];
+        Arrays.fill(partitionLengths, 1);
+        final BlockManagerId blockManagerId =
+            createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId);
 
-          Map<Integer, List<Long>> ptb = Maps.newHashMap();
-          for (Map.Entry<Integer, Set<Long>> entry : 
partitionToBlockIds.entrySet()) {
-            ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue()));
-          }
-          long start = System.currentTimeMillis();
-          shuffleWriteClient.reportShuffleResult(partitionToServers, appId, 
shuffleId,
-              taskAttemptId, ptb, bitmapSplitNum);
-          LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost 
{} ms",
-              taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() - 
start));
-          MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, 
partitionLengths);
-          return Option.apply(mapStatus);
+        Map<Integer, List<Long>> ptb = Maps.newHashMap();
+        for (Map.Entry<Integer, Set<Long>> entry : 
partitionToBlockIds.entrySet()) {
+          ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue()));
+        }
+        long start = System.currentTimeMillis();
+        shuffleWriteClient.reportShuffleResult(partitionToServers, appId, 
shuffleId,
+            taskAttemptId, ptb, bitmapSplitNum);
+        LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost 
{} ms",
+            taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() - 
start));
+        MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, 
partitionLengths);
+        return Option.apply(mapStatus);
       } else {
         return Option.empty();
       }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 6b8a77e..6ec8901 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -172,7 +172,7 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
       TaskContext context,
       ShuffleReadMetricsReporter metrics) {
     return delegate.getReader(handle,
-      startPartition, endPartition, context, metrics);
+        startPartition, endPartition, context, metrics);
   }
 
   // The interface is only used for compatibility with spark 3.1.2
@@ -187,21 +187,21 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     ShuffleReader<K, C> reader = null;
     try {
       reader = (ShuffleReader<K, C>)delegate.getClass().getDeclaredMethod(
-        "getReader",
-        ShuffleHandle.class,
-        int.class,
-        int.class,
-        int.class,
-        int.class,
-        TaskContext.class,
-        ShuffleReadMetricsReporter.class).invoke(
-        handle,
-        startMapIndex,
-        endMapIndex,
-        startPartition,
-        endPartition,
-        context,
-        metrics);
+          "getReader",
+          ShuffleHandle.class,
+          int.class,
+          int.class,
+          int.class,
+          int.class,
+          TaskContext.class,
+          ShuffleReadMetricsReporter.class).invoke(
+          handle,
+          startMapIndex,
+          endMapIndex,
+          startPartition,
+          endPartition,
+          context,
+          metrics);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -210,31 +210,31 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
 
   // The interface is only used for compatibility with spark 3.0.1
   public <K, C> ShuffleReader<K, C> getReaderForRange(
-    ShuffleHandle handle,
-    int startMapIndex,
-    int endMapIndex,
-    int startPartition,
-    int endPartition,
-    TaskContext context,
-    ShuffleReadMetricsReporter metrics) {
+      ShuffleHandle handle,
+      int startMapIndex,
+      int endMapIndex,
+      int startPartition,
+      int endPartition,
+      TaskContext context,
+      ShuffleReadMetricsReporter metrics) {
     ShuffleReader<K, C> reader = null;
     try {
       reader = (ShuffleReader<K, C>)delegate.getClass().getDeclaredMethod(
-        "getReaderForRange",
-        ShuffleHandle.class,
-        int.class,
-        int.class,
-        int.class,
-        int.class,
-        TaskContext.class,
-        ShuffleReadMetricsReporter.class).invoke(
-        handle,
-        startMapIndex,
-        endMapIndex,
-        startPartition,
-        endPartition,
-        context,
-        metrics);
+          "getReaderForRange",
+          ShuffleHandle.class,
+          int.class,
+          int.class,
+          int.class,
+          int.class,
+          TaskContext.class,
+          ShuffleReadMetricsReporter.class).invoke(
+          handle,
+          startMapIndex,
+          endMapIndex,
+          startPartition,
+          endPartition,
+          context,
+          metrics);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 032767a..b71f1b3 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -168,12 +168,12 @@ public class RssShuffleManager implements ShuffleManager {
         RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
 
     this.dataTransferPoolSize = 
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
-            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+        RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
 
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
+            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
@@ -221,28 +221,28 @@ public class RssShuffleManager implements ShuffleManager {
     this.dataReplica = sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA,
         RssSparkConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
     this.dataReplicaWrite =  
sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_WRITE,
-      RssSparkConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
+        RssSparkConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
     this.dataReplicaRead =  
sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ,
-      RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+        RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
     this.dataReplicaSkipEnabled = 
sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
-      RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
+        RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
     LOG.info("Check quorum config ["
-      + dataReplica + ":" + dataReplicaWrite + ":" + dataReplicaRead + ":" + 
dataReplicaSkipEnabled + "]");
+        + dataReplica + ":" + dataReplicaWrite + ":" + dataReplicaRead + ":" + 
dataReplicaSkipEnabled + "]");
     RssUtils.checkQuorumSetting(dataReplica, dataReplicaWrite, 
dataReplicaRead);
 
     int retryMax = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_RETRY_MAX,
-      RssSparkConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+        RssSparkConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
     long retryIntervalMax = 
sparkConf.getLong(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
-      RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
+        RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
     int heartBeatThreadNum = 
sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
-      RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+        RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
     this.dataTransferPoolSize = 
sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
-            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+        RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
 
-     shuffleWriteClient = ShuffleClientFactory
+    shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, 
heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
+            dataReplica, dataReplicaWrite, dataReplicaRead, 
dataReplicaSkipEnabled, dataTransferPoolSize);
     this.taskToSuccessBlockIds = taskToSuccessBlockIds;
     this.taskToFailedBlockIds = taskToFailedBlockIds;
     if (loop != null) {
@@ -339,7 +339,7 @@ public class RssShuffleManager implements ShuffleManager {
       TaskContext context,
       ShuffleReadMetricsReporter metrics) {
     return getReader(handle, 0, Integer.MAX_VALUE, startPartition, 
endPartition,
-      context, metrics);
+        context, metrics);
   }
 
   // The interface is used for compatibility with spark 3.0.1
@@ -353,14 +353,14 @@ public class RssShuffleManager implements ShuffleManager {
       ShuffleReadMetricsReporter metrics) {
     long start = System.currentTimeMillis();
     Roaring64NavigableMap taskIdBitmap = getExpectedTasksByExecutorId(
-      handle.shuffleId(),
-      startPartition,
-      endPartition,
-      startMapIndex,
-      endMapIndex);
+        handle.shuffleId(),
+        startPartition,
+        endPartition,
+        startMapIndex,
+        endMapIndex);
     LOG.info("Get taskId cost " + (System.currentTimeMillis() - start) + " ms, 
and request expected blockIds from "
-      + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + 
handle.shuffleId() + "], partitionId["
-      + startPartition + ", " + endPartition + "]");
+        + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + 
handle.shuffleId() + "], partitionId["
+        + startPartition + ", " + endPartition + "]");
     return getReaderImpl(handle, startMapIndex, endMapIndex, startPartition, 
endPartition,
         context, metrics, taskIdBitmap);
   }
@@ -376,14 +376,14 @@ public class RssShuffleManager implements ShuffleManager {
       ShuffleReadMetricsReporter metrics) {
     long start = System.currentTimeMillis();
     Roaring64NavigableMap taskIdBitmap = getExpectedTasksByRange(
-      handle.shuffleId(),
-      startPartition,
-      endPartition,
-      startMapIndex,
-      endMapIndex);
+        handle.shuffleId(),
+        startPartition,
+        endPartition,
+        startMapIndex,
+        endMapIndex);
     LOG.info("Get taskId cost " + (System.currentTimeMillis() - start) + " ms, 
and request expected blockIds from "
-      + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + 
handle.shuffleId() + "], partitionId["
-      + startPartition + ", " + endPartition + "]");
+        + taskIdBitmap.getLongCardinality() + " tasks for shuffleId[" + 
handle.shuffleId() + "], partitionId["
+        + startPartition + ", " + endPartition + "]");
     return getReaderImpl(handle, startMapIndex, endMapIndex, startPartition, 
endPartition,
         context, metrics, taskIdBitmap);
   }
@@ -507,24 +507,24 @@ public class RssShuffleManager implements ShuffleManager {
   // This API is only used by Spark3.0 and removed since 3.1,
   // so we extract it from getExpectedTasksByExecutorId.
   private Roaring64NavigableMap getExpectedTasksByRange(
-    int shuffleId,
-    int startPartition,
-    int endPartition,
-    int startMapIndex,
-    int endMapIndex) {
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      int startMapIndex,
+      int endMapIndex) {
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
     Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>> 
mapStatusIter = null;
     try {
       mapStatusIter = (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, 
Object, Object>>>>)
-        SparkEnv.get().mapOutputTracker().getClass()
-          .getDeclaredMethod("getMapSizesByRange",
-            int.class, int.class, int.class, int.class, int.class)
-          .invoke(SparkEnv.get().mapOutputTracker(),
-            shuffleId,
-            startMapIndex,
-            endMapIndex,
-            startPartition,
-            endPartition);
+          SparkEnv.get().mapOutputTracker().getClass()
+              .getDeclaredMethod("getMapSizesByRange",
+                  int.class, int.class, int.class, int.class, int.class)
+              .invoke(SparkEnv.get().mapOutputTracker(),
+                  shuffleId,
+                  startMapIndex,
+                  endMapIndex,
+                  startPartition,
+                  endPartition);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 669431c..ce4c247 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -121,7 +121,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
           Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks 
= entry.getValue();
           // todo: compact unnecessary blocks that reach replicaWrite
           RssSendShuffleDataRequest request = new RssSendShuffleDataRequest(
-                  appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
+              appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
           long s = System.currentTimeMillis();
           RssSendShuffleDataResponse response = 
getShuffleServerClient(ssi).sendShuffleData(request);
           LOG.info("ShuffleWriteClientImpl sendShuffleData cost:" + 
(System.currentTimeMillis() - s) + "(ms)");
@@ -130,11 +130,11 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
             // mark a replica of block that has been sent
             serverToBlockIds.get(ssi).forEach(block -> 
blockIdsTracker.get(block).incrementAndGet());
             LOG.info("Send: " + serverToBlockIds.get(ssi).size()
-                    + " blocks to [" + ssi.getId() + "] successfully");
+                + " blocks to [" + ssi.getId() + "] successfully");
           } else {
             isAllServersSuccess.set(false);
             LOG.warn("Send: " + serverToBlockIds.get(ssi).size() + " blocks to 
[" + ssi.getId()
-                    + "] failed with statusCode[" + response.getStatusCode() + 
"], ");
+                + "] failed with statusCode[" + response.getStatusCode() + "], 
");
           }
         } catch (Exception e) {
           isAllServersSuccess.set(false);
@@ -146,9 +146,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
   }
 
   private void genServerToBlocks(ShuffleBlockInfo sbi, List<ShuffleServerInfo> 
serverList,
-                                 Map<ShuffleServerInfo,
-                                  Map<Integer, Map<Integer, 
List<ShuffleBlockInfo>>>> serverToBlocks,
-                                 Map<ShuffleServerInfo, List<Long>> 
serverToBlockIds) {
+      Map<ShuffleServerInfo, Map<Integer, Map<Integer, 
List<ShuffleBlockInfo>>>> serverToBlocks,
+      Map<ShuffleServerInfo, List<Long>> serverToBlockIds) {
     int partitionId = sbi.getPartitionId();
     int shuffleId = sbi.getShuffleId();
     for (ShuffleServerInfo ssi : serverList) {
@@ -180,7 +179,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     Map<ShuffleServerInfo, Map<Integer,
         Map<Integer, List<ShuffleBlockInfo>>>> primaryServerToBlocks = 
Maps.newHashMap();
     Map<ShuffleServerInfo, Map<Integer,
-      Map<Integer, List<ShuffleBlockInfo>>>> secondaryServerToBlocks = 
Maps.newHashMap();
+        Map<Integer, List<ShuffleBlockInfo>>>> secondaryServerToBlocks = 
Maps.newHashMap();
     Map<ShuffleServerInfo, List<Long>> primaryServerToBlockIds = 
Maps.newHashMap();
     Map<ShuffleServerInfo, List<Long>> secondaryServerToBlockIds = 
Maps.newHashMap();
 
@@ -198,23 +197,23 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
       if (replicaSkipEnabled) {
         genServerToBlocks(sbi, allServers.subList(0, replicaWrite),
-          primaryServerToBlocks, primaryServerToBlockIds);
+            primaryServerToBlocks, primaryServerToBlockIds);
         genServerToBlocks(sbi, allServers.subList(replicaWrite, replica),
-          secondaryServerToBlocks, secondaryServerToBlockIds);
+            secondaryServerToBlocks, secondaryServerToBlockIds);
       } else {
         // When replicaSkip is disabled, we send data to all replicas within 
one round.
         genServerToBlocks(sbi, allServers,
-          primaryServerToBlocks, primaryServerToBlockIds);
+            primaryServerToBlocks, primaryServerToBlockIds);
       }
     }
 
     // maintain the count of blocks that have been sent to the server
     Map<Long, AtomicInteger> blockIdsTracker = Maps.newConcurrentMap();
     primaryServerToBlockIds.values().forEach(
-      blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new 
AtomicInteger(0)))
+        blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new 
AtomicInteger(0)))
     );
     secondaryServerToBlockIds.values().forEach(
-      blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new 
AtomicInteger(0)))
+        blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new 
AtomicInteger(0)))
     );
 
     Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
@@ -237,15 +236,14 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
 
     // check success and failed blocks according to the replicaWrite
     blockIdsTracker.entrySet().forEach(blockCt -> {
-        long blockId = blockCt.getKey();
-        int count = blockCt.getValue().get();
-        if (count >= replicaWrite) {
-          successBlockIds.add(blockId);
-        } else {
-          failedBlockIds.add(blockId);
-        }
+      long blockId = blockCt.getKey();
+      int count = blockCt.getValue().get();
+      if (count >= replicaWrite) {
+        successBlockIds.add(blockId);
+      } else {
+        failedBlockIds.add(blockId);
       }
-    );
+    });
     return new SendShuffleDataResult(successBlockIds, failedBlockIds);
   }
 
@@ -420,7 +418,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     for (Map.Entry<Integer, Integer> entry: partitionReportTracker.entrySet()) 
{
       if (entry.getValue() < replicaWrite) {
         throw new RssException("Quorum check of report shuffle result is 
failed for appId["
-          + appId + "], shuffleId[" + shuffleId + "]");
+            + appId + "], shuffleId[" + shuffleId + "]");
       }
     }
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java 
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index a9f989a..3de5692 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -83,8 +83,8 @@ public class ConfigUtils {
         return (int) value;
       } else {
         throw new IllegalArgumentException(String.format(
-          "Configuration value %s overflows/underflows the integer type.",
-          value));
+            "Configuration value %s overflows/underflows the integer type.",
+            value));
       }
     }
     return Integer.parseInt(o.toString());
@@ -126,8 +126,8 @@ public class ConfigUtils {
         return false;
       default:
         throw new IllegalArgumentException(String.format(
-          "Unrecognized option for boolean: %s. Expected either true or 
false(case insensitive)",
-          o));
+            "Unrecognized option for boolean: %s. Expected either true or 
false(case insensitive)",
+            o));
     }
   }
 
@@ -137,13 +137,13 @@ public class ConfigUtils {
     } else if (o.getClass() == Double.class) {
       double value = ((Double) o);
       if (value == 0.0
-        || (value >= Float.MIN_VALUE && value <= Float.MAX_VALUE)
-        || (value >= -Float.MAX_VALUE && value <= -Float.MIN_VALUE)) {
+          || (value >= Float.MIN_VALUE && value <= Float.MAX_VALUE)
+          || (value >= -Float.MAX_VALUE && value <= -Float.MIN_VALUE)) {
         return (float) value;
       } else {
         throw new IllegalArgumentException(String.format(
-          "Configuration value %s overflows/underflows the float type.",
-          value));
+            "Configuration value %s overflows/underflows the float type.",
+            value));
       }
     }
 
@@ -189,5 +189,5 @@ public class ConfigUtils {
 
   public static final Function<Double, Boolean> PERCENTAGE_DOUBLE_VALIDATOR =
       (Function<Double, Boolean>) value -> Double.compare(value, 100.0) <= 0 
&& Double.compare(value, 0.0) >= 0;
-  
+
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java 
b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
index 57347fb..97cc769 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
@@ -29,30 +29,30 @@ import com.google.common.collect.ImmutableMap;
 public class UnitConverter {
 
   private static final Map<String, ByteUnit> byteSuffixes =
-    ImmutableMap.<String, ByteUnit>builder()
-      .put("b", ByteUnit.BYTE)
-      .put("k", ByteUnit.KiB)
-      .put("kb", ByteUnit.KiB)
-      .put("m", ByteUnit.MiB)
-      .put("mb", ByteUnit.MiB)
-      .put("g", ByteUnit.GiB)
-      .put("gb", ByteUnit.GiB)
-      .put("t", ByteUnit.TiB)
-      .put("tb", ByteUnit.TiB)
-      .put("p", ByteUnit.PiB)
-      .put("pb", ByteUnit.PiB)
-      .build();
+      ImmutableMap.<String, ByteUnit>builder()
+          .put("b", ByteUnit.BYTE)
+          .put("k", ByteUnit.KiB)
+          .put("kb", ByteUnit.KiB)
+          .put("m", ByteUnit.MiB)
+          .put("mb", ByteUnit.MiB)
+          .put("g", ByteUnit.GiB)
+          .put("gb", ByteUnit.GiB)
+          .put("t", ByteUnit.TiB)
+          .put("tb", ByteUnit.TiB)
+          .put("p", ByteUnit.PiB)
+          .put("pb", ByteUnit.PiB)
+          .build();
 
   private static final Map<String, TimeUnit> timeSuffixes =
-    ImmutableMap.<String, TimeUnit>builder()
-      .put("us", TimeUnit.MICROSECONDS)
-      .put("ms", TimeUnit.MILLISECONDS)
-      .put("s", TimeUnit.SECONDS)
-      .put("m", TimeUnit.MINUTES)
-      .put("min", TimeUnit.MINUTES)
-      .put("h", TimeUnit.HOURS)
-      .put("d", TimeUnit.DAYS)
-      .build();
+      ImmutableMap.<String, TimeUnit>builder()
+          .put("us", TimeUnit.MICROSECONDS)
+          .put("ms", TimeUnit.MILLISECONDS)
+          .put("s", TimeUnit.SECONDS)
+          .put("m", TimeUnit.MINUTES)
+          .put("min", TimeUnit.MINUTES)
+          .put("h", TimeUnit.HOURS)
+          .put("d", TimeUnit.DAYS)
+          .build();
 
   public static boolean isByteString(String str) {
     String strLower = str.toLowerCase();
@@ -94,14 +94,14 @@ public class UnitConverter {
         return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix) 
: unit);
       } else if (fractionMatcher.matches()) {
         throw new NumberFormatException("Fractional values are not supported. 
Input was: "
-          + fractionMatcher.group(1));
+            + fractionMatcher.group(1));
       } else {
         throw new NumberFormatException("Failed to parse byte string: " + str);
       }
     } catch (NumberFormatException e) {
       String byteError = "Size must be specified as bytes (b), "
-        + "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or 
pebibytes(p). "
-        + "E.g. 50b, 100k, or 250m.";
+          + "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or 
pebibytes(p). "
+          + "E.g. 50b, 100k, or 250m.";
       throw new NumberFormatException(byteError + "\n" + e.getMessage());
     }
   }
@@ -171,8 +171,8 @@ public class UnitConverter {
       return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : 
unit);
     } catch (NumberFormatException e) {
       String timeError = "Time must be specified as seconds (s), "
-        + "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), 
or day (d). "
-        + "E.g. 50s, 100ms, or 250us.";
+          + "milliseconds (ms), microseconds (us), minutes (m or min), hour 
(h), or day (d). "
+          + "E.g. 50s, 100ms, or 250us.";
 
       throw new NumberFormatException(timeError + "\n" + e.getMessage());
     }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
index 3dffb8b..27d8e93 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
@@ -74,58 +74,58 @@ public class PartitionBalanceAssignmentStrategy implements 
AssignmentStrategy {
 
     SortedMap<PartitionRange, List<ServerNode>> assignments = new TreeMap<>();
     synchronized (this) {
-        List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
-        Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos = 
Maps.newConcurrentMap();
-        for (ServerNode node : nodes) {
-          PartitionAssignmentInfo partitionInfo;
-          if (serverToPartitions.containsKey(node)) {
-            partitionInfo = serverToPartitions.get(node);
-            if (partitionInfo.getTimestamp() < node.getTimestamp()) {
-              partitionInfo.resetPartitionNum();
-              partitionInfo.setTimestamp(node.getTimestamp());
-            }
-          } else {
-            partitionInfo = new PartitionAssignmentInfo();
+      List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
+      Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos = 
Maps.newConcurrentMap();
+      for (ServerNode node : nodes) {
+        PartitionAssignmentInfo partitionInfo;
+        if (serverToPartitions.containsKey(node)) {
+          partitionInfo = serverToPartitions.get(node);
+          if (partitionInfo.getTimestamp() < node.getTimestamp()) {
+            partitionInfo.resetPartitionNum();
+            partitionInfo.setTimestamp(node.getTimestamp());
           }
-          newPartitionInfos.putIfAbsent(node, partitionInfo);
+        } else {
+          partitionInfo = new PartitionAssignmentInfo();
         }
-        serverToPartitions = newPartitionInfos;
-        int averagePartitions = totalPartitionNum * replica / 
clusterManager.getShuffleNodesMax();
-        int assignPartitions = averagePartitions < 1 ? 1 : averagePartitions;
-        nodes.sort(new Comparator<ServerNode>() {
-          @Override
-          public int compare(ServerNode o1, ServerNode o2) {
-            PartitionAssignmentInfo partitionInfo1 = 
serverToPartitions.get(o1);
-            PartitionAssignmentInfo partitionInfo2 = 
serverToPartitions.get(o2);
-            double v1 = o1.getAvailableMemory() * 1.0 / 
(partitionInfo1.getPartitionNum() + assignPartitions);
-            double v2 = o2.getAvailableMemory() * 1.0 / 
(partitionInfo2.getPartitionNum() + assignPartitions);
-            return -Double.compare(v1, v2);
-          }
-        });
-
-        if (nodes.isEmpty() || nodes.size() < replica) {
-          throw new RuntimeException("There isn't enough shuffle servers");
-        }
-
-        int expectNum = clusterManager.getShuffleNodesMax();
-        if (nodes.size() < clusterManager.getShuffleNodesMax()) {
-          LOG.warn("Can't get expected servers [" + expectNum + "] and found 
only [" + nodes.size() + "]");
-          expectNum = nodes.size();
+        newPartitionInfos.putIfAbsent(node, partitionInfo);
+      }
+      serverToPartitions = newPartitionInfos;
+      int averagePartitions = totalPartitionNum * replica / 
clusterManager.getShuffleNodesMax();
+      int assignPartitions = averagePartitions < 1 ? 1 : averagePartitions;
+      nodes.sort(new Comparator<ServerNode>() {
+        @Override
+        public int compare(ServerNode o1, ServerNode o2) {
+          PartitionAssignmentInfo partitionInfo1 = serverToPartitions.get(o1);
+          PartitionAssignmentInfo partitionInfo2 = serverToPartitions.get(o2);
+          double v1 = o1.getAvailableMemory() * 1.0 / 
(partitionInfo1.getPartitionNum() + assignPartitions);
+          double v2 = o2.getAvailableMemory() * 1.0 / 
(partitionInfo2.getPartitionNum() + assignPartitions);
+          return -Double.compare(v1, v2);
         }
-
-        List<ServerNode> candidatesNodes = nodes.subList(0, expectNum);
-        int idx = 0;
-        List<PartitionRange> ranges = 
CoordinatorUtils.generateRanges(totalPartitionNum, 1);
-        for (PartitionRange range : ranges) {
-          List<ServerNode> assignNodes = Lists.newArrayList();
-          for (int rc = 0; rc < replica; rc++) {
-            ServerNode node =  candidatesNodes.get(idx);
-            idx = CoordinatorUtils.nextIdx(idx,  candidatesNodes.size());
-            serverToPartitions.get(node).incrementPartitionNum();
-            assignNodes.add(node);
-          }
-          assignments.put(range, assignNodes);
+      });
+
+      if (nodes.isEmpty() || nodes.size() < replica) {
+        throw new RuntimeException("There isn't enough shuffle servers");
+      }
+
+      int expectNum = clusterManager.getShuffleNodesMax();
+      if (nodes.size() < clusterManager.getShuffleNodesMax()) {
+        LOG.warn("Can't get expected servers [" + expectNum + "] and found 
only [" + nodes.size() + "]");
+        expectNum = nodes.size();
+      }
+
+      List<ServerNode> candidatesNodes = nodes.subList(0, expectNum);
+      int idx = 0;
+      List<PartitionRange> ranges = 
CoordinatorUtils.generateRanges(totalPartitionNum, 1);
+      for (PartitionRange range : ranges) {
+        List<ServerNode> assignNodes = Lists.newArrayList();
+        for (int rc = 0; rc < replica; rc++) {
+          ServerNode node =  candidatesNodes.get(idx);
+          idx = CoordinatorUtils.nextIdx(idx,  candidatesNodes.size());
+          serverToPartitions.get(node).incrementPartitionNum();
+          assignNodes.add(node);
         }
+        assignments.put(range, assignNodes);
+      }
     }
     return new PartitionRangeAssignment(assignments);
   }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 79f7bd5..7c5beaf 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -416,8 +416,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         .setPartitionId(request.getPartitionId())
         .build();
     GetShuffleResultResponse rpcResponse = blockingStub
-      .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS)
-      .getShuffleResult(rpcRequest);
+        .withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS)
+        .getShuffleResult(rpcRequest);
     StatusCode statusCode = rpcResponse.getStatus();
 
     RssGetShuffleResultResponse response;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java 
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index e6a1811..1a54ee2 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -165,18 +165,18 @@ public class LocalStorageChecker extends Checker {
         byte[] readData = new byte[1024];
         int readBytes = -1;
         try (FileInputStream fis = new FileInputStream(writeFile)) {
-            int hasReadBytes = 0;
-            do {
-              readBytes = fis.read(readData);
-              if (hasReadBytes < 1024) {
-                for (int i = 0; i < readBytes; i++) {
-                  if (data[hasReadBytes + i] != readData[i]) {
-                    return false;
-                  }
+          int hasReadBytes = 0;
+          do {
+            readBytes = fis.read(readData);
+            if (hasReadBytes < 1024) {
+              for (int i = 0; i < readBytes; i++) {
+                if (data[hasReadBytes + i] != readData[i]) {
+                  return false;
                 }
               }
-              hasReadBytes += readBytes;
-            } while (readBytes != -1);
+            }
+            hasReadBytes += readBytes;
+          } while (readBytes != -1);
         }
       } catch (Exception e) {
         LOG.error("Storage read and write error ", e);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 99efc50..bc3ec39 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -290,8 +290,8 @@ public class ShuffleBufferManager {
 
   // flush the buffer with required map which is <appId -> shuffleId>
   public synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
-    for (Map.Entry<String, Map<Integer, RangeMap<Integer,
-        ShuffleBuffer>>> appIdToBuffers : bufferPool.entrySet()) {
+    for (Map.Entry<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>>
+        appIdToBuffers : bufferPool.entrySet()) {
       String appId = appIdToBuffers.getKey();
       if (requiredFlush.containsKey(appId)) {
         for (Map.Entry<Integer, RangeMap<Integer, ShuffleBuffer>> 
shuffleIdToBuffers :
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index a49b12e..117c347 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -133,16 +133,16 @@ public class LocalStorage extends AbstractStorage {
 
   @Override
   public void updateWriteMetrics(StorageWriteMetrics metrics) {
-      updateWrite(RssUtils.generateShuffleKey(metrics.getAppId(), 
metrics.getShuffleId()),
-          metrics.getDataSize(),
-          metrics.getPartitions());
+    updateWrite(RssUtils.generateShuffleKey(metrics.getAppId(), 
metrics.getShuffleId()),
+        metrics.getDataSize(),
+        metrics.getPartitions());
   }
 
   @Override
   public void updateReadMetrics(StorageReadMetrics metrics) {
-      String shuffleKey = RssUtils.generateShuffleKey(metrics.getAppId(), 
metrics.getShuffleId());
-      prepareStartRead(shuffleKey);
-      updateShuffleLastReadTs(shuffleKey);
+    String shuffleKey = RssUtils.generateShuffleKey(metrics.getAppId(), 
metrics.getShuffleId());
+    prepareStartRead(shuffleKey);
+    updateShuffleLastReadTs(shuffleKey);
   }
 
   @Override
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index 2505cd1..a27136b 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -55,11 +55,11 @@ public class LocalStorageMeta {
         .filter(e -> (!checkRead || e.getValue().isStartRead.get()) && 
e.getValue().getNotUploadedSize() > 0)
         .collect(Collectors.toList());
 
-      shuffleMetaList.sort((Entry<String, ShuffleMeta> o1, Entry<String, 
ShuffleMeta> o2) -> {
-        long sz1 = o1.getValue().getSize().longValue();
-        long sz2 = o2.getValue().getSize().longValue();
-        return -Long.compare(sz1, sz2);
-      });
+    shuffleMetaList.sort((Entry<String, ShuffleMeta> o1, Entry<String, 
ShuffleMeta> o2) -> {
+      long sz1 = o1.getValue().getSize().longValue();
+      long sz2 = o2.getValue().getSize().longValue();
+      return -Long.compare(sz1, sz2);
+    });
 
     return shuffleMetaList
         .subList(0, Math.min(shuffleMetaList.size(), hint))
@@ -82,7 +82,7 @@ public class LocalStorageMeta {
       uploadedPartitionBitmap = shuffleMeta.uploadedPartitionBitmap.clone();
     }
     for (int partition : uploadedPartitionBitmap) {
-        partitionBitmap.remove(partition);
+      partitionBitmap.remove(partition);
     }
     return partitionBitmap;
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index 9819c01..f990c38 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -231,30 +231,30 @@ public class ComposedClientReadHandler implements 
ClientReadHandler {
   @VisibleForTesting
   public String getReadBlokNumInfo() {
     long totalBlockNum = hotReadBlockNum + warmReadBlockNum
-      + coldReadBlockNum + frozenReadBlockNum;
+        + coldReadBlockNum + frozenReadBlockNum;
     return "Client read " + totalBlockNum + " blocks ["
-      + " hot:" + hotReadBlockNum + " warm:" + warmReadBlockNum
-      + " cold:" + coldReadBlockNum + " frozen:" + frozenReadBlockNum + " ]";
+        + " hot:" + hotReadBlockNum + " warm:" + warmReadBlockNum
+        + " cold:" + coldReadBlockNum + " frozen:" + frozenReadBlockNum + " ]";
   }
 
   @VisibleForTesting
   public String getReadLengthInfo() {
     long totalReadLength = hotReadLength + warmReadLength
-      + coldReadLength + frozenReadLength;
+        + coldReadLength + frozenReadLength;
     return "Client read " + totalReadLength + " bytes ["
-      + " hot:" + hotReadLength + " warm:" + warmReadLength
-      + " cold:" + coldReadLength + " frozen:" + frozenReadLength + " ]";
+        + " hot:" + hotReadLength + " warm:" + warmReadLength
+        + " cold:" + coldReadLength + " frozen:" + frozenReadLength + " ]";
   }
 
   @VisibleForTesting
   public String getReadUncompressLengthInfo() {
     long totalReadUncompressLength = hotReadUncompressLength + 
warmReadUncompressLength
-      + coldReadUncompressLength + frozenReadUncompressLength;
+        + coldReadUncompressLength + frozenReadUncompressLength;
     return "Client read " + totalReadUncompressLength + " uncompressed bytes ["
-      + " hot:" + hotReadUncompressLength
-      + " warm:" + warmReadUncompressLength
-      + " cold:" + coldReadUncompressLength
-      + " frozen:" + frozenReadUncompressLength + " ]";
+        + " hot:" + hotReadUncompressLength
+        + " warm:" + warmReadUncompressLength
+        + " cold:" + coldReadUncompressLength
+        + " frozen:" + frozenReadUncompressLength + " ]";
   }
 
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
index 6bc11f1..ce7a515 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
@@ -39,12 +39,12 @@ public abstract class DataSkippableReadHandler extends 
AbstractClientReadHandler
   protected Roaring64NavigableMap processBlockIds;
 
   public DataSkippableReadHandler(
-    String appId,
-    int shuffleId,
-    int partitionId,
-    int readBufferSize,
-    Roaring64NavigableMap expectBlockIds,
-    Roaring64NavigableMap processBlockIds) {
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int readBufferSize,
+      Roaring64NavigableMap expectBlockIds,
+      Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index 7151d94..f0e7491 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -122,8 +122,8 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
     // init lazily like LocalFileClientRead
     if (readHandlers.isEmpty()) {
       String fullShufflePath = 
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
-        ShuffleStorageUtils.getShuffleDataPathWithRange(appId,
-          shuffleId, partitionId, partitionNumPerRange, partitionNum));
+          ShuffleStorageUtils.getShuffleDataPathWithRange(appId,
+              shuffleId, partitionId, partitionNumPerRange, partitionNum));
       init(fullShufflePath);
     }
 
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
index 75dae67..955bd5f 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
@@ -109,12 +109,12 @@ public class HdfsFileWriter implements Closeable {
     fsDataOutputStream.writeInt(partitionList.size());
     headerContentBuf.putInt(partitionList.size());
     for (int i = 0; i < partitionList.size(); i++) {
-        fsDataOutputStream.writeInt(partitionList.get(i));
-        fsDataOutputStream.writeLong(indexFileSizeList.get(i));
-        fsDataOutputStream.writeLong(dataFileSizeList.get(i));
-        headerContentBuf.putInt(partitionList.get(i));
-        headerContentBuf.putLong(indexFileSizeList.get(i));
-        headerContentBuf.putLong(dataFileSizeList.get(i));
+      fsDataOutputStream.writeInt(partitionList.get(i));
+      fsDataOutputStream.writeLong(indexFileSizeList.get(i));
+      fsDataOutputStream.writeLong(dataFileSizeList.get(i));
+      headerContentBuf.putInt(partitionList.get(i));
+      headerContentBuf.putLong(indexFileSizeList.get(i));
+      headerContentBuf.putLong(dataFileSizeList.get(i));
     }
     headerContentBuf.flip();
     fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf));
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index f8a58f6..de54241 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -82,14 +82,14 @@ public class HdfsShuffleReadHandler extends 
DataSkippableReadHandler {
     byte[] data = readShuffleData(shuffleDataSegment.getOffset(), 
expectedLength);
     if (data.length == 0) {
       LOG.warn("Fail to read expected[{}] data, actual[{}] and segment is {} 
from file {}.data",
-        expectedLength, data.length, shuffleDataSegment, filePrefix);
+          expectedLength, data.length, shuffleDataSegment, filePrefix);
       return null;
     }
 
     ShuffleDataResult shuffleDataResult = new ShuffleDataResult(data, 
shuffleDataSegment.getBufferSegments());
     if (shuffleDataResult.isEmpty()) {
       LOG.warn("Shuffle data is empty, expected length {}, data length {}, 
segment {} in file {}.data",
-        expectedLength, data.length, shuffleDataSegment, filePrefix);
+          expectedLength, data.length, shuffleDataSegment, filePrefix);
       return null;
     }
 
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
index 1c9c048..8cf1fe2 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
@@ -40,22 +40,22 @@ public class LocalFileQuorumClientReadHandler extends 
AbstractClientReadHandler
   private long readUncompressLength = 0L;
 
   public LocalFileQuorumClientReadHandler(
-    String appId,
-    int shuffleId,
-    int partitionId,
-    int indexReadLimit,
-    int partitionNumPerRange,
-    int partitionNum,
-    int readBufferSize,
-    Roaring64NavigableMap expectBlockIds,
-    Roaring64NavigableMap processBlockIds,
-    List<ShuffleServerClient> shuffleServerClients) {
-      this.appId = appId;
-      this.shuffleId = shuffleId;
-      this.partitionId = partitionId;
-      this.readBufferSize = readBufferSize;
-      for (ShuffleServerClient client: shuffleServerClients) {
-        handlers.add(new LocalFileClientReadHandler(
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int indexReadLimit,
+      int partitionNumPerRange,
+      int partitionNum,
+      int readBufferSize,
+      Roaring64NavigableMap expectBlockIds,
+      Roaring64NavigableMap processBlockIds,
+      List<ShuffleServerClient> shuffleServerClients) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.partitionId = partitionId;
+    this.readBufferSize = readBufferSize;
+    for (ShuffleServerClient client: shuffleServerClients) {
+      handlers.add(new LocalFileClientReadHandler(
           appId,
           shuffleId,
           partitionId,
@@ -66,8 +66,8 @@ public class LocalFileQuorumClientReadHandler extends 
AbstractClientReadHandler
           expectBlockIds,
           processBlockIds,
           client
-        ));
-      }
+      ));
+    }
   }
 
   @Override
@@ -85,7 +85,7 @@ public class LocalFileQuorumClientReadHandler extends 
AbstractClientReadHandler
     }
     if (!readSuccessful) {
       throw new RssException("Failed to read all replicas for appId[" + appId 
+ "], shuffleId["
-        + shuffleId + "], partitionId[" + partitionId + "]");
+          + shuffleId + "], partitionId[" + partitionId + "]");
     }
     return result;
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index 5f0db19..73bcc46 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -54,7 +54,7 @@ public class MemoryClientReadHandler extends 
AbstractClientReadHandler {
     ShuffleDataResult result = null;
 
     RssGetInMemoryShuffleDataRequest request = new 
RssGetInMemoryShuffleDataRequest(
-      appId,shuffleId, partitionId, lastBlockId, readBufferSize);
+        appId,shuffleId, partitionId, lastBlockId, readBufferSize);
 
     try {
       RssGetInMemoryShuffleDataResponse response =
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
index a3e6a5b..5fbe8ec 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
@@ -45,8 +45,8 @@ public class MemoryQuorumClientReadHandler extends 
AbstractClientReadHandler {
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
-      handlers.add(new MemoryClientReadHandler(
-          appId, shuffleId, partitionId, readBufferSize, client))
+        handlers.add(new MemoryClientReadHandler(
+            appId, shuffleId, partitionId, readBufferSize, client))
     );
   }
 

Reply via email to