This is an automated email from the ASF dual-hosted git repository.
fchen pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new fc8925029 [CELEBORN-1190] Apply error prone patch and suppress some
problems
fc8925029 is described below
commit fc892502931960fe4d4173cda517477f0d1b17d4
Author: sychen <[email protected]>
AuthorDate: Wed Dec 20 20:54:18 2023 +0800
[CELEBORN-1190] Apply error prone patch and suppress some problems
### What changes were proposed in this pull request?
1. Fix MissingOverride, DefaultCharset, UnnecessaryParentheses Rule
2. Exclude generated sources, FutureReturnValueIgnored,
TypeParameterUnusedInFormals, UnusedVariable
### Why are the changes needed?
```
./build/make-distribution.sh --release
```
We get a lot of WARNINGs.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #2177 from cxzl25/error_prone_patch.
Authored-by: sychen <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
(cherry picked from commit 7f653ce7d6c3b9361818ff8e863b7b852211a8b1)
Signed-off-by: Fu Chen <[email protected]>
---
.../celeborn/plugin/flink/RemoteBufferStreamReader.java | 1 +
.../plugin/flink/RemoteShuffleInputGateDelegation.java | 3 +--
.../celeborn/plugin/flink/buffer/PartitionSortedBuffer.java | 2 +-
.../plugin/flink/network/FlinkTransportClientFactory.java | 1 +
.../network/TransportFrameDecoderWithBufferSupplier.java | 1 +
.../plugin/flink/readclient/FlinkShuffleClientImpl.java | 1 +
.../celeborn/plugin/flink/RemoteShuffleInputGateFactory.java | 1 +
.../celeborn/plugin/flink/RemoteShuffleInputGateFactory.java | 1 +
.../celeborn/plugin/flink/RemoteShuffleInputGateFactory.java | 1 +
.../celeborn/plugin/flink/RemoteShuffleInputGateFactory.java | 1 +
.../celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java | 2 +-
.../org/apache/hadoop/mapred/CelebornSortBasedPusher.java | 2 ++
.../apache/spark/shuffle/celeborn/SparkShuffleManager.java | 1 +
.../sql/execution/columnar/CelebornColumnDictionary.java | 4 +++-
.../apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | 1 +
.../java/org/apache/celeborn/client/ShuffleClientImpl.java | 7 ++++---
.../apache/celeborn/client/read/WorkerPartitionReader.java | 3 +++
.../java/org/apache/celeborn/client/security/CryptoUtils.java | 3 ++-
.../java/org/apache/celeborn/client/compress/CodecSuiteJ.java | 4 ++--
.../org/apache/celeborn/common/network/protocol/PushData.java | 2 +-
.../celeborn/common/network/protocol/PushDataHandShake.java | 2 +-
.../apache/celeborn/common/network/protocol/RegionFinish.java | 2 +-
.../apache/celeborn/common/network/protocol/RegionStart.java | 2 +-
.../celeborn/common/network/protocol/StreamChunkSlice.java | 1 +
.../celeborn/common/network/sasl/SecretRegistryImpl.java | 1 +
.../celeborn/common/network/sasl/CelebornSaslSuiteJ.java | 1 +
.../deploy/master/clustermeta/SingleMasterMetaManager.java | 1 +
.../service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java | 11 ++++++++---
pom.xml | 2 +-
.../celeborn/server/common/service/config/SystemConfig.java | 1 +
.../celeborn/service/deploy/worker/shuffledb/DBIterator.java | 1 +
.../celeborn/service/deploy/worker/storage/FileWriter.java | 3 +++
.../service/deploy/worker/storage/MapPartitionFileWriter.java | 4 +++-
.../deploy/worker/storage/ReducePartitionFileWriter.java | 2 ++
34 files changed, 56 insertions(+), 20 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java
index 51dadf9f1..640ea8a25 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteBufferStreamReader.java
@@ -113,6 +113,7 @@ public class RemoteBufferStreamReader extends
CreditListener {
return isOpened;
}
+ @Override
public void notifyAvailableCredits(int numCredits) {
if (!closed) {
bufferStream.addCredit(
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index 4427b8673..e0499dc83 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -154,8 +154,7 @@ public class RemoteShuffleInputGateDelegation {
try {
String appUniqueId =
- ((RemoteShuffleDescriptor)
(gateDescriptor.getShuffleDescriptors()[0]))
- .getCelebornAppId();
+ ((RemoteShuffleDescriptor)
gateDescriptor.getShuffleDescriptors()[0]).getCelebornAppId();
this.shuffleClient =
FlinkShuffleClientImpl.get(
appUniqueId,
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
index 0c2a0a2bb..88b5338d6 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
@@ -368,7 +368,7 @@ public class PartitionSortedBuffer implements SortBuffer {
}
private int getSegmentOffsetFromPointer(long value) {
- return (int) (value);
+ return (int) value;
}
@Override
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
index 68e418a54..b3cfc09ea 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
@@ -71,6 +71,7 @@ public class FlinkTransportClientFactory extends
TransportClientFactory {
return null;
}
+ @Override
public TransportClient createClient(String remoteHost, int remotePort)
throws IOException, InterruptedException {
return createClient(
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
index 84ec43aa5..9140b6b23 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
@@ -167,6 +167,7 @@ public class TransportFrameDecoderWithBufferSupplier
extends ChannelInboundHandl
}
}
+ @Override
public void channelRead(ChannelHandlerContext ctx, Object data) {
io.netty.buffer.ByteBuf nettyBuf = (io.netty.buffer.ByteBuf) data;
try {
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index c7ed33b33..5e2974547 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -557,6 +557,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
this.dataClientFactory = dataClientFactory;
}
+ @Override
@VisibleForTesting
public TransportClientFactory getDataClientFactory() {
return flinkTransportClientFactory;
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 7e53e2585..f5e2b2938 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -36,6 +36,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
super(conf, networkBufferPool, networkBufferSize);
}
+ @Override
protected RemoteShuffleInputGate createInputGate(
String owningTaskName,
int gateIndex,
diff --git
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index a8cf5e645..de0ddab7c 100644
---
a/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -36,6 +36,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
super(conf, networkBufferPool, networkBufferSize);
}
+ @Override
protected RemoteShuffleInputGate createInputGate(
String owningTaskName,
int gateIndex,
diff --git
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index d13613023..b2ac441ae 100644
---
a/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -37,6 +37,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
}
// For testing.
+ @Override
protected RemoteShuffleInputGate createInputGate(
String owningTaskName,
int gateIndex,
diff --git
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index d13613023..b2ac441ae 100644
---
a/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.18/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -37,6 +37,7 @@ public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGat
}
// For testing.
+ @Override
protected RemoteShuffleInputGate createInputGate(
String owningTaskName,
int gateIndex,
diff --git
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
index f6d06acdd..43958e680 100644
---
a/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
+++
b/client-mr/mr/src/main/java/org/apache/celeborn/mapreduce/v2/app/MRAppMasterWithCeleborn.java
@@ -135,7 +135,7 @@ public class MRAppMasterWithCeleborn extends MRAppMaster {
ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
if (applicationAttemptId != null) {
CallerContext.setCurrent(
- (new CallerContext.Builder("mr_app_master_with_celeborn_" +
applicationAttemptId))
+ new CallerContext.Builder("mr_app_master_with_celeborn_" +
applicationAttemptId)
.build());
}
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java
index 7aa3857bd..4692a58cf 100644
---
a/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java
@@ -295,6 +295,7 @@ public class CelebornSortBasedPusher<K, V> extends
OutputStream {
}
}
+ @Override
public void flush() {
logger.info("Sort based pusher called flush");
try {
@@ -305,6 +306,7 @@ public class CelebornSortBasedPusher<K, V> extends
OutputStream {
}
}
+ @Override
public void close() {
flush();
try {
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 35939dd5c..5b68db0f8 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -268,6 +268,7 @@ public class SparkShuffleManager implements ShuffleManager {
}
}
+ @Override
public <K, C> ShuffleReader<K, C> getReader(
ShuffleHandle handle, int startPartition, int endPartition, TaskContext
context) {
if (handle instanceof CelebornShuffleHandle) {
diff --git
a/client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/sql/execution/columnar/CelebornColumnDictionary.java
b/client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/sql/execution/columnar/CelebornColumnDictionary.java
index 6354bac41..c264b257b 100644
---
a/client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/sql/execution/columnar/CelebornColumnDictionary.java
+++
b/client-spark/spark-3-columnar-shuffle/src/main/java/org/apache/spark/sql/execution/columnar/CelebornColumnDictionary.java
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.columnar;
+import java.nio.charset.StandardCharsets;
+
import org.apache.spark.sql.execution.vectorized.Dictionary;
public class CelebornColumnDictionary implements Dictionary {
@@ -58,6 +60,6 @@ public class CelebornColumnDictionary implements Dictionary {
@Override
public byte[] decodeToBinary(int id) {
- return stringDictionary[id].getBytes();
+ return stringDictionary[id].getBytes(StandardCharsets.UTF_8);
}
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 7b30101f7..a7c494c03 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -407,6 +407,7 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
}
+ // Added in SPARK-32917, for Spark 3.2 and above
public long[] getPartitionLengths() {
throw new UnsupportedOperationException(
"Celeborn is not compatible with Spark push mode, please set
spark.shuffle.push.enabled to false");
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d83e4283a..bc60c9fd1 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -504,7 +504,7 @@ public class ShuffleClientImpl extends ShuffleClient {
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
}
- @VisibleForTesting
+ @Override
public PartitionLocation registerMapPartitionTask(
int shuffleId, int numMappers, int mapId, int attemptId, int
partitionId) throws IOException {
logger.info(
@@ -1277,6 +1277,7 @@ public class ShuffleClientImpl extends ShuffleClient {
false);
}
+ @Override
public void pushMergedData(int shuffleId, int mapId, int attemptId) throws
IOException {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
PushState pushState = pushStates.get(mapKey);
@@ -1791,8 +1792,8 @@ public class ShuffleClientImpl extends ShuffleClient {
private boolean connectFail(String message) {
return (message.startsWith("Connection from ") && message.endsWith("
closed"))
- || (message.equals("Connection reset by peer"))
- || (message.startsWith("Failed to send RPC "));
+ || message.equals("Connection reset by peer")
+ || message.startsWith("Failed to send RPC ");
}
@VisibleForTesting
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index 080643814..a00f02476 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -134,10 +134,12 @@ public class WorkerPartitionReader implements
PartitionReader {
ShuffleClient.incrementTotalReadCounter();
}
+ @Override
public boolean hasNext() {
return returnedChunks < streamHandler.getNumChunks();
}
+ @Override
public ByteBuf next() throws IOException, InterruptedException {
checkException();
if (chunkIndex < streamHandler.getNumChunks()) {
@@ -160,6 +162,7 @@ public class WorkerPartitionReader implements
PartitionReader {
return chunk;
}
+ @Override
public void close() {
synchronized (this) {
closed = true;
diff --git
a/client/src/main/java/org/apache/celeborn/client/security/CryptoUtils.java
b/client/src/main/java/org/apache/celeborn/client/security/CryptoUtils.java
index 5ce4672d6..95bebd919 100644
--- a/client/src/main/java/org/apache/celeborn/client/security/CryptoUtils.java
+++ b/client/src/main/java/org/apache/celeborn/client/security/CryptoUtils.java
@@ -18,6 +18,7 @@
package org.apache.celeborn.client.security;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.Optional;
import java.util.Properties;
@@ -50,7 +51,7 @@ public class CryptoUtils {
CryptoRandomFactory.getCryptoRandom(new Properties()).nextBytes(iv);
} catch (GeneralSecurityException e) {
logger.warn("Failed to create crypto Initialization Vector", e);
- iv = "1234567890123456".getBytes();
+ iv = "1234567890123456".getBytes(StandardCharsets.UTF_8);
}
long initialIVFinish = System.nanoTime();
long initialIVTime = TimeUnit.NANOSECONDS.toMillis(initialIVFinish -
initialIVStart);
diff --git
a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
index 53ba4ccd4..07ba142e4 100644
--- a/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/compress/CodecSuiteJ.java
@@ -31,7 +31,7 @@ public class CodecSuiteJ {
@Test
public void testLz4Codec() {
- int blockSize = (new CelebornConf()).clientPushBufferMaxSize();
+ int blockSize = new CelebornConf().clientPushBufferMaxSize();
Lz4Compressor lz4Compressor = new Lz4Compressor(blockSize);
byte[] data =
RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
int oriLength = data.length;
@@ -49,7 +49,7 @@ public class CodecSuiteJ {
@Test
public void testZstdCodec() {
for (int level = -5; level <= 22; level++) {
- int blockSize = (new CelebornConf()).clientPushBufferMaxSize();
+ int blockSize = new CelebornConf().clientPushBufferMaxSize();
ZstdCompressor zstdCompressor = new ZstdCompressor(blockSize, level);
byte[] data =
RandomStringUtils.random(1024).getBytes(StandardCharsets.UTF_8);
int oriLength = data.length;
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java
index f8765d5a2..cf94b44c8 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java
@@ -99,7 +99,7 @@ public final class PushData extends RequestMessage {
return requestId == o.requestId
&& mode == o.mode
&& shuffleKey.equals(o.shuffleKey)
- && partitionUniqueId.equals((o.partitionUniqueId))
+ && partitionUniqueId.equals(o.partitionUniqueId)
&& super.equals(o);
}
return false;
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java
index 1f6745d72..d0e3bda17 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java
@@ -95,7 +95,7 @@ public final class PushDataHandShake extends RequestMessage {
PushDataHandShake o = (PushDataHandShake) other;
return mode == o.mode
&& shuffleKey.equals(o.shuffleKey)
- && partitionUniqueId.equals((o.partitionUniqueId))
+ && partitionUniqueId.equals(o.partitionUniqueId)
&& attemptId == o.attemptId
&& numPartitions == o.numPartitions
&& bufferSize == o.bufferSize
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java
index 0d64bdcdd..34e1e66df 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java
@@ -80,7 +80,7 @@ public final class RegionFinish extends RequestMessage {
RegionFinish o = (RegionFinish) other;
return mode == o.mode
&& shuffleKey.equals(o.shuffleKey)
- && partitionUniqueId.equals((o.partitionUniqueId))
+ && partitionUniqueId.equals(o.partitionUniqueId)
&& attemptId == o.attemptId
&& super.equals(o);
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java
index f12c4a010..918a38fe4 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java
@@ -98,7 +98,7 @@ public final class RegionStart extends RequestMessage {
RegionStart o = (RegionStart) other;
return mode == o.mode
&& shuffleKey.equals(o.shuffleKey)
- && partitionUniqueId.equals((o.partitionUniqueId))
+ && partitionUniqueId.equals(o.partitionUniqueId)
&& attemptId == o.attemptId
&& currentRegionIndex == o.currentRegionIndex
&& isBroadcast == o.isBroadcast
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java
index b0129a42b..abbbb2f93 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java
@@ -53,6 +53,7 @@ public final class StreamChunkSlice implements Encodable {
return 20;
}
+ @Override
public void encode(ByteBuf buffer) {
buffer.writeLong(streamId);
buffer.writeInt(chunkIndex);
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
index 402fc3a27..92b33bc47 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java
@@ -41,6 +41,7 @@ public class SecretRegistryImpl implements SecretRegistry {
secrets.remove(appId);
}
+ @Override
public boolean isRegistered(String appId) {
return secrets.containsKey(appId);
}
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/sasl/CelebornSaslSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/network/sasl/CelebornSaslSuiteJ.java
index 582988e1d..10ed9851e 100644
---
a/common/src/test/java/org/apache/celeborn/common/network/sasl/CelebornSaslSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/network/sasl/CelebornSaslSuiteJ.java
@@ -195,6 +195,7 @@ public class CelebornSaslSuiteJ {
}
}
+ @Override
public void close() {
if (client != null) {
client.close();
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index c901341c5..2ddd1ec06 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -138,6 +138,7 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
updateMetaByReportWorkerUnavailable(failedNodes);
}
+ @Override
public void handleUpdatePartitionSize() {
updatePartitionSize();
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
index 6862b5a11..7140cc6c6 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
@@ -21,8 +21,10 @@ import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NO
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -55,7 +57,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
conf.set(CelebornConf.CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED().key(),
"true");
File mapFile = File.createTempFile("testResolve1", ".txt");
- FileWriter mapFileWriter = new FileWriter(mapFile);
+ Writer mapFileWriter = Files.newBufferedWriter(mapFile.toPath(),
StandardCharsets.UTF_8);
mapFileWriter.write(
"host1 /default/rack1\nhost2 /default/rack1\nhost3 /default/rack1\n"
+ "host4 /default/rack2\nhost5 /default/rack2\nhost6
/default/rack2\n");
@@ -82,6 +84,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
Consumer<PartitionLocation> assertCustomer =
new Consumer<PartitionLocation>() {
+ @Override
public void accept(PartitionLocation location) {
Assert.assertNotEquals(
resolver.resolve(location.getHost()).getNetworkLocation(),
@@ -118,6 +121,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
Consumer<PartitionLocation> assertConsumer =
new Consumer<PartitionLocation>() {
+ @Override
public void accept(PartitionLocation location) {
Assert.assertEquals(
NetworkTopology.DEFAULT_RACK,
@@ -141,6 +145,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
workers.forEach(
new Consumer<WorkerInfo>() {
+ @Override
public void accept(WorkerInfo workerInfo) {
workerInfo.networkLocation_$eq(
resolver.resolve(workerInfo.host()).getNetworkLocation());
@@ -416,7 +421,7 @@ public class SlotsAllocatorRackAwareSuiteJ {
int expected =
(int)
Math.ceil(
- ((double) (numPartitions)
+ ((double) numPartitions
/ totalHosts
* (1
+ ((double) (maxHostsPerRack - secondMaxHostsPerRack
+ 1))
diff --git a/pom.xml b/pom.xml
index d17fbece7..585ac36e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -929,7 +929,7 @@
<fork>true</fork>
<compilerArgs>
<arg>-XDcompilePolicy=simple</arg>
- <arg>-Xplugin:ErrorProne</arg>
+ <arg>-Xplugin:ErrorProne
-XepExcludedPaths:.*/target/generated-sources/protobuf/.*
-Xep:FutureReturnValueIgnored:OFF -Xep:TypeParameterUnusedInFormals:OFF
-Xep:UnusedVariable:OFF</arg>
</compilerArgs>
<annotationProcessorPaths>
<path>
diff --git
a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
index 812414150..82b5714b8 100644
---
a/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
+++
b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java
@@ -41,6 +41,7 @@ public class SystemConfig extends DynamicConfig {
return null;
}
+ @Override
public <T> T getValue(
String configKey,
ConfigEntry<Object> configEntry,
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
index 351b773df..15864fa54 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
@@ -27,6 +27,7 @@ public interface DBIterator extends
Iterator<Map.Entry<byte[], byte[]>>, Closeab
/** Position at the first entry in the source whose `key` is at target. */
void seek(byte[] key);
+ @Override
default void remove() {
throw new UnsupportedOperationException();
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index ca38a49c4..3cba81cba 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -401,15 +401,18 @@ public abstract class FileWriter implements
DeviceObserver {
}
}
+ @Override
public int hashCode() {
return fileInfo.getFilePath().hashCode();
}
+ @Override
public boolean equals(Object obj) {
return (obj instanceof FileWriter)
&& fileInfo.getFilePath().equals(((FileWriter)
obj).fileInfo.getFilePath());
}
+ @Override
public String toString() {
return fileInfo.getFilePath();
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
index 2b306f025..42e21c926 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
@@ -91,6 +91,7 @@ public final class MapPartitionFileWriter extends FileWriter {
}
}
+ @Override
public void write(ByteBuf data) throws IOException {
data.markReaderIndex();
int partitionId = data.readInt();
@@ -155,13 +156,14 @@ public final class MapPartitionFileWriter extends
FileWriter {
deleted = true;
} else {
StorageManager.hadoopFs()
- .create(new
Path(Utils.getWriteSuccessFilePath((fileInfo.getIndexPath()))))
+ .create(new
Path(Utils.getWriteSuccessFilePath(fileInfo.getIndexPath())))
.close();
}
}
});
}
+ @Override
public synchronized void destroy(IOException ioException) {
destroyIndex();
super.destroy(ioException);
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
index d9b89849b..768a69492 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -62,6 +62,7 @@ public final class ReducePartitionFileWriter extends
FileWriter {
this.nextBoundary = this.shuffleChunkSize;
}
+ @Override
protected void flush(boolean finalFlush) throws IOException {
super.flush(finalFlush);
maybeSetChunkOffsets(finalFlush);
@@ -87,6 +88,7 @@ public final class ReducePartitionFileWriter extends
FileWriter {
return fileInfo.getLastChunkOffset() == fileInfo.getFileLength();
}
+ @Override
public synchronized long close() throws IOException {
return super.close(
() -> {