This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ddf83843 [Improvement] Small refactor for code quality (#394)
ddf83843 is described below
commit ddf83843a9fbce8155d159815c351b16eb5a9704
Author: advancedxy <[email protected]>
AuthorDate: Mon Dec 12 00:01:05 2022 +0800
[Improvement] Small refactor for code quality (#394)
### What changes were proposed in this pull request?
tweaks some wording, fixes some typos and removes some dead code
### Why are the changes needed?
Better code quality
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UTs.
---
.../spark/shuffle/reader/RssShuffleDataIterator.java | 6 ------
.../apache/spark/shuffle/writer/WriterBuffer.java | 2 +-
.../spark/shuffle/DelegationRssShuffleManager.java | 3 +--
.../spark/shuffle/writer/RssShuffleWriter.java | 9 ++++++---
.../uniffle/client/factory/ShuffleClientFactory.java | 4 ++--
.../uniffle/client/impl/ShuffleReadClientImpl.java | 4 +---
.../uniffle/client/impl/ShuffleWriteClientImpl.java | 4 ++--
.../org/apache/uniffle/client/util/ClientUtils.java | 4 ++--
.../common/filesystem/HadoopFilesystemProvider.java | 2 +-
.../apache/uniffle/common/metrics/GRPCMetrics.java | 20 ++++++++++----------
.../common/rpc/MonitoringServerTransportFilter.java | 6 +++---
.../apache/uniffle/coordinator/CoordinatorConf.java | 2 +-
.../uniffle/test/CoordinatorGrpcServerTest.java | 8 ++++----
.../org/apache/uniffle/test/CoordinatorGrpcTest.java | 4 ++--
.../client/factory/CoordinatorClientFactory.java | 2 +-
.../java/org/apache/uniffle/server/HealthCheck.java | 4 ++--
.../org/apache/uniffle/server/ShuffleServer.java | 6 +++---
.../apache/uniffle/server/ShuffleTaskManager.java | 2 +-
.../apache/uniffle/server/buffer/ShuffleBuffer.java | 2 +-
.../uniffle/server/buffer/ShuffleBufferManager.java | 6 +++---
.../uniffle/server/storage/HdfsStorageManager.java | 2 +-
.../uniffle/server/storage/LocalStorageManager.java | 2 +-
.../uniffle/server/storage/MultiStorageManager.java | 4 ++--
.../uniffle/server/storage/StorageManager.java | 2 +-
.../uniffle/server/ShuffleFlushManagerTest.java | 2 +-
.../uniffle/server/ShuffleTaskManagerTest.java | 2 +-
.../server/storage/LocalStorageManagerTest.java | 2 +-
.../uniffle/storage/common/LocalStorageMeta.java | 4 ++--
.../uniffle/storage/util/ShuffleStorageUtils.java | 10 +++++-----
29 files changed, 62 insertions(+), 68 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 7dec34e8..dd01d517 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -20,7 +20,6 @@ package org.apache.spark.shuffle.reader;
import java.io.IOException;
import java.nio.ByteBuffer;
-import com.esotericsoftware.kryo.io.Input;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
@@ -54,7 +53,6 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
private long readTime = 0;
private long serializeTime = 0;
private long decompressTime = 0;
- private Input deserializationInput = null;
private DeserializationStream deserializationStream = null;
private ByteBufInputStream byteBufInputStream = null;
private long compressedBytesLength = 0;
@@ -95,13 +93,9 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
LOG.warn("Can't close ByteBufInputStream, memory may be leaked.");
}
}
- if (deserializationInput != null) {
- deserializationInput.close();
- }
if (deserializationStream != null) {
deserializationStream.close();
}
- deserializationInput = null;
deserializationStream = null;
byteBufInputStream = null;
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java
index a1427c71..22bc90f9 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java
@@ -53,7 +53,7 @@ public class WriterBuffer {
try {
System.arraycopy(recordBuffer, 0, buffer, nextOffset, length);
} catch (Exception e) {
- LOG.error("Unexpect exception for System.arraycopy, length[" + length +
"], nextOffset["
+ LOG.error("Unexpected exception for System.arraycopy, length[" + length
+ "], nextOffset["
+ nextOffset + "], bufferSize[" + bufferSize + "]");
throw e;
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 79016a6d..592c28a9 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -107,8 +107,7 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
}
private boolean tryAccessCluster() {
- String accessId = sparkConf.get(
- RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
+ String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(),
"").trim();
if (StringUtils.isEmpty(accessId)) {
LOG.warn("Access id key is empty");
return false;
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 5ea1a54f..4a6f4428 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -168,6 +168,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private void writeImpl(Iterator<Product2<K,V>> records) {
List<ShuffleBlockInfo> shuffleBlockInfos = null;
Set<Long> blockIds = Sets.newConcurrentHashSet();
+ boolean isCombine = shuffleDependency.mapSideCombine();
+ Function1 createCombiner = null;
+ if (isCombine) {
+ createCombiner = shuffleDependency.aggregator().get().createCombiner();
+ }
while (records.hasNext()) {
// Task should fast fail when sending data failed
checkIfBlocksFailed();
@@ -175,9 +180,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
Product2<K, V> record = records.next();
K key = record._1();
int partition = getPartition(key);
- boolean isCombine = shuffleDependency.mapSideCombine();
if (isCombine) {
- Function1 createCombiner =
shuffleDependency.aggregator().get().createCombiner();
Object c = createCombiner.apply(record._2());
shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), c);
} else {
@@ -216,7 +219,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
shuffleBlockInfoList.forEach(sbi -> {
long blockId = sbi.getBlockId();
- // add blockId to set, check if it is send later
+ // add blockId to set, check if it is sent later
blockIds.add(blockId);
// update [partition, blockIds], it will be sent to shuffle server
int partitionId = sbi.getPartitionId();
diff --git
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 547e6af0..caaa14c5 100644
---
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -35,7 +35,7 @@ public class ShuffleClientFactory {
}
/**
- * Only for MR engine, which wont used to unregister to remote
shuffle-servers
+ * Only for MR engine, which won't used to unregister to remote
shuffle-servers
*/
public ShuffleWriteClient createShuffleWriteClient(
String clientType, int retryMax, long retryIntervalMax, int
heartBeatThreadNum,
@@ -49,7 +49,7 @@ public class ShuffleClientFactory {
String clientType, int retryMax, long retryIntervalMax, int
heartBeatThreadNum,
int replica, int replicaWrite, int replicaRead, boolean
replicaSkipEnabled, int dataTransferPoolSize,
int dataCommitPoolSize, int unregisterThreadPoolSize, int
unregisterRequestTimeoutSec) {
- // If replica > replicaWrite, blocks maybe will be sended for 2 rounds.
+ // If replica > replicaWrite, blocks maybe be sent for 2 rounds.
// We need retry less times in this case for let the first round fail fast.
if (replicaSkipEnabled && replica > replicaWrite) {
retryMax = retryMax / 2;
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index eacefe78..333fe527 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -159,7 +159,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
return null;
}
- // if need request new data from shuffle server
+ // if client need request new data from shuffle server
if (bufferSegmentQueue.isEmpty()) {
if (read() <= 0) {
return null;
@@ -186,8 +186,6 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
long actualCrc = -1;
try {
long start = System.currentTimeMillis();
- copyTime.addAndGet(System.currentTimeMillis() - start);
- start = System.currentTimeMillis();
expectedCrc = bs.getCrc();
actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(),
bs.getLength());
crcCheckTime.addAndGet(System.currentTimeMillis() - start);
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 1d76a41a..60a775f0 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -115,7 +115,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
int replicaWrite,
int replicaRead,
boolean replicaSkipEnabled,
- int dataTranferPoolSize,
+ int dataTransferPoolSize,
int dataCommitPoolSize,
int unregisterThreadPoolSize,
int unregisterRequestTimeSec) {
@@ -129,7 +129,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
this.replicaWrite = replicaWrite;
this.replicaRead = replicaRead;
this.replicaSkipEnabled = replicaSkipEnabled;
- this.dataTransferPool = Executors.newFixedThreadPool(dataTranferPoolSize);
+ this.dataTransferPool = Executors.newFixedThreadPool(dataTransferPoolSize);
this.dataCommitPoolSize = dataCommitPoolSize;
this.unregisterThreadPoolSize = unregisterThreadPoolSize;
this.unregisterRequestTimeSec = unregisterRequestTimeSec;
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
index 803d65bc..0bdf7cf0 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
@@ -30,7 +30,7 @@ import org.apache.uniffle.storage.util.StorageType;
public class ClientUtils {
- // BlockId is long and composed by partitionId, executorId and AtomicInteger
+ // BlockId is long and composed of partitionId, executorId and AtomicInteger.
// AtomicInteger is first 19 bit, max value is 2^19 - 1
// partitionId is next 24 bit, max value is 2^24 - 1
// taskAttemptId is rest of 20 bit, max value is 2^20 - 1
@@ -118,7 +118,7 @@ public class ClientUtils {
public static void validateTestModeConf(boolean testMode, String
storageType) {
if (!testMode && (StorageType.LOCALFILE.name().equals(storageType)
|| (StorageType.HDFS.name()).equals(storageType))) {
- throw new IllegalArgumentException("RSS storage type about LOCALFILE and
HDFS should be used in test mode, "
+ throw new IllegalArgumentException("LOCALFILE or HDFS storage type
should be used in test mode only, "
+ "because of the poor performance of these two types.");
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
index ada4d06d..96affc3b 100644
---
a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
+++
b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
@@ -32,7 +32,7 @@ import
org.apache.uniffle.common.security.SecurityContextFactory;
/**
* This HadoopFilesystemProvider will provide the only entrypoint to get the
hadoop filesystem whether
- * the dfs cluster is kerberized or not.
+ * the dfs cluster is kerberos enabled or not.
*/
public class HadoopFilesystemProvider {
private static final Logger LOGGER =
LoggerFactory.getLogger(HadoopFilesystemProvider.class);
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index 7d8b4846..c693604d 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -33,10 +33,10 @@ public abstract class GRPCMetrics {
private static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS =
"grpc_server_executor_active_threads";
public static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY =
"grpcServerExecutorBlockingQueueSize";
private static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE =
"grpc_server_executor_blocking_queue_size";
- public static final String GRCP_SERVER_CONNECTION_NUMBER_KEY =
"grpcServerConnectionNumber";
- private static final String GRCP_SERVER_CONNECTION_NUMBER =
"grpc_server_connection_number";
+ public static final String GRPC_SERVER_CONNECTION_NUMBER_KEY =
"grpcServerConnectionNumber";
+ private static final String GRPC_SERVER_CONNECTION_NUMBER =
"grpc_server_connection_number";
- private boolean isRegister = false;
+ private boolean isRegistered = false;
protected Map<String, Counter> counterMap = Maps.newConcurrentMap();
protected Map<String, Gauge> gaugeMap = Maps.newConcurrentMap();
protected Map<String, Summary> transportTimeSummaryMap =
Maps.newConcurrentMap();
@@ -48,11 +48,11 @@ public abstract class GRPCMetrics {
public abstract void registerMetrics();
public void register(CollectorRegistry collectorRegistry) {
- if (!isRegister) {
+ if (!isRegistered) {
metricsManager = new MetricsManager(collectorRegistry);
registerGeneralMetrics();
registerMetrics();
- isRegister = true;
+ isRegistered = true;
}
}
@@ -66,13 +66,13 @@ public abstract class GRPCMetrics {
metricsManager.addGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE)
);
gaugeMap.putIfAbsent(
- GRCP_SERVER_CONNECTION_NUMBER_KEY,
- metricsManager.addGauge(GRCP_SERVER_CONNECTION_NUMBER)
+ GRPC_SERVER_CONNECTION_NUMBER_KEY,
+ metricsManager.addGauge(GRPC_SERVER_CONNECTION_NUMBER)
);
}
public void setGauge(String tag, double value) {
- if (isRegister) {
+ if (isRegistered) {
Gauge gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.set(value);
@@ -81,7 +81,7 @@ public abstract class GRPCMetrics {
}
public void incCounter(String methodName) {
- if (isRegister) {
+ if (isRegistered) {
Gauge gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.inc();
@@ -96,7 +96,7 @@ public abstract class GRPCMetrics {
}
public void decCounter(String methodName) {
- if (isRegister) {
+ if (isRegistered) {
Gauge gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.dec();
diff --git
a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
index 62ad537a..085b13fd 100644
---
a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
+++
b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
@@ -24,7 +24,7 @@ import io.grpc.ServerTransportFilter;
import org.apache.uniffle.common.metrics.GRPCMetrics;
-import static
org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
+import static
org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
public class MonitoringServerTransportFilter extends ServerTransportFilter {
private final AtomicLong connectionSize = new AtomicLong(0);
@@ -35,12 +35,12 @@ public class MonitoringServerTransportFilter extends
ServerTransportFilter {
}
public Attributes transportReady(Attributes transportAttrs) {
- grpcMetrics.setGauge(GRCP_SERVER_CONNECTION_NUMBER_KEY,
connectionSize.incrementAndGet());
+ grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY,
connectionSize.incrementAndGet());
return super.transportReady(transportAttrs);
}
public void transportTerminated(Attributes transportAttrs) {
- grpcMetrics.setGauge(GRCP_SERVER_CONNECTION_NUMBER_KEY,
connectionSize.decrementAndGet());
+ grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY,
connectionSize.decrementAndGet());
super.transportTerminated(transportAttrs);
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index bea53018..6f2b9413 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -33,7 +33,7 @@ import static
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrat
/**
* Configuration for Coordinator Service and rss-cluster, including service
port,
- * heartbeat interval and etc.
+ * heartbeat interval, etc.
*/
public class CoordinatorConf extends RssBaseConf {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index 4b5016df..99abcee4 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -30,7 +30,7 @@ import
org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
import org.apache.uniffle.proto.CoordinatorServerGrpc;
import org.apache.uniffle.proto.RssProtos;
-import static
org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
+import static
org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -65,14 +65,14 @@ public class CoordinatorGrpcServerTest {
grpcServer.start();
// case1: test the single one connection metric
- double connSize =
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
+ double connSize =
grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
assertEquals(0, connSize);
CoordinatorGrpcClient coordinatorGrpcClient = new
CoordinatorGrpcClient("localhost", 20001);
coordinatorGrpcClient.registerApplicationInfo(
new RssApplicationInfoRequest("testGrpcConnectionSize", 10000,
"user"));
- connSize =
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
+ connSize =
grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
assertEquals(1, connSize);
// case2: test the multiple connections
@@ -81,7 +81,7 @@ public class CoordinatorGrpcServerTest {
client1.registerApplicationInfo(new
RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
client2.registerApplicationInfo(new
RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
- connSize =
grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
+ connSize =
grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
assertEquals(3, connSize);
grpcServer.stop();
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 7573c616..4b20b40c 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -48,7 +48,7 @@ import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
-import static
org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
+import static
org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -265,7 +265,7 @@ public class CoordinatorGrpcTest extends
CoordinatorTestBase {
assertEquals(oldValue + 1, newValue, 0.5);
double connectionSize = coordinators.get(0)
-
.getGrpcMetrics().getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
+
.getGrpcMetrics().getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
assertTrue(connectionSize > 0);
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index b9f23058..fab74160 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -50,7 +50,7 @@ public class CoordinatorClientFactory {
LOG.info("Start to create coordinator clients from {}", coordinators);
List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
String[] coordinatorList = coordinators.trim().split(",");
- if (coordinatorList.length <= 0) {
+ if (coordinatorList.length == 0) {
String msg = "Invalid " + coordinators;
LOG.error(msg);
throw new RuntimeException(msg);
diff --git a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
index ac21024b..ecfa28b1 100644
--- a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
+++ b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
@@ -30,8 +30,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * HealthCheck will check every server whether has the ability to process
shuffle data. Currently, we only support disk
- * checker. If enough disks don't have enough disk space, server will become
unhealthy, and only enough disks
+ * HealthCheck will check every server whether it has the ability to process
shuffle data. Currently, we only support
+ * disk checker. If enough disks don't have enough disk space, server will
become unhealthy, and only enough disks
* have enough disk space, server will become healthy again.
**/
public class HealthCheck {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 10d59f8a..56e1a68a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -179,9 +179,9 @@ public class ShuffleServer {
boolean healthCheckEnable =
shuffleServerConf.getBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE);
if (healthCheckEnable) {
- List<Checker> buildInCheckers = Lists.newArrayList();
- buildInCheckers.add(storageManager.getStorageChecker());
- healthCheck = new HealthCheck(isHealthy, shuffleServerConf,
buildInCheckers);
+ List<Checker> builtInCheckers = Lists.newArrayList();
+ builtInCheckers.add(storageManager.getStorageChecker());
+ healthCheck = new HealthCheck(isHealthy, shuffleServerConf,
builtInCheckers);
healthCheck.start();
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 71371ae8..c197b525 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -465,7 +465,7 @@ public class ShuffleTaskManager {
LOG.info("Start check leak shuffle data");
try {
Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet());
- storageManager.checkAndClearLeakShuffleData(appIds);
+ storageManager.checkAndClearLeakedShuffleData(appIds);
LOG.info("Finish check leak shuffle data");
} catch (Exception e) {
LOG.warn("Error happened in checkLeakShuffleData", e);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 9b0a50a2..187d6d65 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -255,7 +255,7 @@ public class ShuffleBuffer {
try {
System.arraycopy(block.getData(), 0, data, offset, block.getLength());
} catch (Exception e) {
- LOG.error("Unexpect exception for System.arraycopy, length["
+ LOG.error("Unexpected exception for System.arraycopy, length["
+ block.getLength() + "], offset["
+ offset + "], dataLength[" + data.length + "]", e);
throw e;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ddad74b3..a5aa9973 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -103,7 +103,7 @@ public class ShuffleBufferManager {
public StatusCode cacheShuffleData(String appId, int shuffleId,
boolean isPreAllocated, ShufflePartitionedData spd) {
if (!isPreAllocated && isFull()) {
- LOG.warn("Got unexpect data, can't cache it because the space is full");
+ LOG.warn("Got unexpected data, can't cache it because the space is
full");
return StatusCode.NO_BUFFER;
}
@@ -182,7 +182,7 @@ public class ShuffleBufferManager {
void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId,
int shuffleId, int startPartition, int endPartition) {
- // When we use multistorage and trigger single buffer flush, the buffer
size should be bigger
+ // When we use multi storage and trigger single buffer flush, the buffer
size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold
storage will be useless.
if (this.bufferFlushEnabled && buffer.getSize() >
this.bufferFlushThreshold) {
flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
@@ -428,7 +428,7 @@ public class ShuffleBufferManager {
Map<String, Set<Integer>> pickedShuffle = Maps.newHashMap();
// The algorithm here is to flush data size > highWaterMark - lowWaterMark
- // the remain data in buffer maybe more than lowWaterMark
+ // the remaining data in buffer maybe more than lowWaterMark
// because shuffle server is still receiving data, but it should be ok
long expectedFlushSize = highWaterMark - lowWaterMark;
long pickedFlushSize = 0L;
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 09c7dd03..516391cd 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -138,7 +138,7 @@ public class HdfsStorageManager extends
SingleStorageManager {
}
@Override
- public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+ public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
}
public HdfsStorage getStorageByAppId(String appId) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4e47a34f..ee099939 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -223,7 +223,7 @@ public class LocalStorageManager extends
SingleStorageManager {
}
@Override
- public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+ public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
Set<String> appIdsOnStorages = new HashSet<>();
for (LocalStorage localStorage : localStorages) {
if (!localStorage.isCorrupted()) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index ee53ff03..53039279 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -140,8 +140,8 @@ public class MultiStorageManager implements StorageManager {
}
@Override
- public void checkAndClearLeakShuffleData(Collection<String> appIds) {
- warmStorageManager.checkAndClearLeakShuffleData(appIds);
+ public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
+ warmStorageManager.checkAndClearLeakedShuffleData(appIds);
}
public void removeResources(PurgeEvent event) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 2ba7b4c5..2a487535 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -53,5 +53,5 @@ public interface StorageManager {
// todo: add an interface that check storage isHealthy
- void checkAndClearLeakShuffleData(Collection<String> appIds);
+ void checkAndClearLeakedShuffleData(Collection<String> appIds);
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 8d27f0f8..3e8afaa9 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -258,7 +258,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
size = storage.getHandlerSize();
assertEquals(0, size);
// fs create a remoteStorage for appId2 before remove resources,
- // but thecache from appIdToStorages has removed, so we need to delete
this path in hdfs
+ // but the cache from appIdToStorages has been removed, so we need to
delete this path in hdfs
Path path = new Path(remoteStorage.getPath() + "/" + appId2 + "/");
assertTrue(fs.mkdirs(path));
storageManager.removeResources(
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 129806c3..31e48903 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleTaskManagerTest extends HdfsTestBase {
- private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
+ private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
@AfterAll
public static void tearDown() {
diff --git
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index da6e5ae5..da536099 100644
---
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -162,7 +162,7 @@ public class LocalStorageManagerTest {
localStorageManager = new LocalStorageManager(conf);
assertEquals(2, localStorageManager.getStorages().size());
- // case4: only have 1 candidates, but exceed the number of
rss.server.localstorage.initialize.max.fail.number
+ // case4: only have 1 candidate, but exceed the number of
rss.server.localstorage.initialize.max.fail.number
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList("/a/rss-data", "/tmp/rss-data-1"));
conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER,
0L);
try {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index 07513981..cdfea458 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
/**
* Metadata has three dimensions from top to down including disk, shuffle,
partition.
- * And each dimensions contains two aspects, status data and indicator data.
+ * And each dimension contains two aspects, status data and indicator data.
* Disk status data contains writable flag, Shuffle status data contains
stable, uploading, deleting flag.
* Disk indicator data contains size, fileNum, shuffleNum, Shuffle indicator
contains size, partition list,
* uploaded partition list and uploaded size.
@@ -48,7 +48,7 @@ public class LocalStorageMeta {
// todo: add ut
public List<String> getSortedShuffleKeys(boolean checkRead, int hint) {
// Filter the unread shuffle is checkRead is true
- // Filter the remain size is 0
+ // Filter the remaining size is 0
List<Map.Entry<String, ShuffleMeta>> shuffleMetaList = shuffleMetaMap
.entrySet()
.stream()
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
index 7deda234..71cca83c 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
@@ -78,7 +78,7 @@ public class ShuffleStorageUtils {
} else {
Collections.sort(segments);
long start = -1;
- long lastestPosition = -1;
+ long latestPosition = -1;
long skipThreshold = readBufferSize / 2;
long lastPosition = Long.MAX_VALUE;
List<BufferSegment> bufferSegments = Lists.newArrayList();
@@ -94,16 +94,16 @@ public class ShuffleStorageUtils {
bufferSegments = Lists.newArrayList();
start = segment.getOffset();
}
- lastestPosition = segment.getOffset() + segment.getLength();
+ latestPosition = segment.getOffset() + segment.getLength();
bufferSegments.add(new BufferSegment(segment.getBlockId(),
segment.getOffset() - start, segment.getLength(),
segment.getUncompressLength(), segment.getCrc(),
segment.getTaskAttemptId()));
- if (lastestPosition - start >= readBufferSize) {
+ if (latestPosition - start >= readBufferSize) {
dataFileSegments.add(new DataFileSegment(
- path, start, (int) (lastestPosition - start), bufferSegments));
+ path, start, (int) (latestPosition - start), bufferSegments));
start = -1;
}
- lastPosition = lastestPosition;
+ lastPosition = latestPosition;
}
if (start > -1) {
dataFileSegments.add(new DataFileSegment(path, start, (int)
(lastPosition - start), bufferSegments));