This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f2357bf75 [CELEBORN-671] Add hasPeer method to PartitionLocation
f2357bf75 is described below
commit f2357bf75cb06232b738130b90dd31a678287234
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Jun 14 10:29:16 2023 +0800
[CELEBORN-671] Add hasPeer method to PartitionLocation
### What changes were proposed in this pull request?
Add hasPeer method to PartitionLocation
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1583 from AngersZhuuuu/CELEBORN-671.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../java/org/apache/celeborn/client/ShuffleClientImpl.java | 5 ++---
.../java/org/apache/celeborn/client/read/RssInputStream.java | 10 +++++-----
.../scala/org/apache/celeborn/client/WorkerStatusTracker.scala | 8 ++++----
.../org/apache/celeborn/client/commit/CommitHandler.scala | 4 ++--
.../org/apache/celeborn/common/protocol/PartitionLocation.java | 4 ++++
.../scala/org/apache/celeborn/common/util/PbSerDeUtils.scala | 2 +-
.../celeborn/service/deploy/worker/PushDataHandler.scala | 8 ++++----
7 files changed, 22 insertions(+), 19 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 611e1dd24..a2a381dea 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -187,8 +187,7 @@ public class ShuffleClientImpl extends ShuffleClient {
if (blacklist.contains(location.hostAndPushPort())) {
wrappedCallback.onFailure(new
CelebornIOException(StatusCode.PUSH_DATA_MASTER_BLACKLISTED));
return true;
- } else if (location.getPeer() != null
- && blacklist.contains(location.getPeer().hostAndPushPort())) {
+ } else if (location.hasPeer() &&
blacklist.contains(location.getPeer().hostAndPushPort())) {
wrappedCallback.onFailure(new
CelebornIOException(StatusCode.PUSH_DATA_SLAVE_BLACKLISTED));
return true;
} else {
@@ -362,7 +361,7 @@ public class ShuffleClientImpl extends ShuffleClient {
private String genAddressPair(PartitionLocation loc) {
String addressPair;
- if (loc.getPeer() != null) {
+ if (loc.hasPeer()) {
addressPair = loc.hostAndPushPort() + "-" +
loc.getPeer().hostAndPushPort();
} else {
addressPair = loc.hostAndPushPort();
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 98f82a01d..9711266c7 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -185,7 +185,7 @@ public abstract class RssInputStream extends InputStream {
return false;
}
RoaringBitmap bitmap = location.getMapIdBitMap();
- if (bitmap == null && location.getPeer() != null) {
+ if (bitmap == null && location.hasPeer()) {
bitmap = location.getPeer().getMapIdBitMap();
}
for (int i = startMapIndex; i < endMapIndex; i++) {
@@ -294,7 +294,7 @@ public abstract class RssInputStream extends InputStream {
} catch (Exception e) {
excludeFailedLocation(location, e);
fetchChunkRetryCnt++;
- if (location.getPeer() != null) {
+ if (location.hasPeer()) {
// fetchChunkRetryCnt % 2 == 0 means both replicas have been tried,
// so sleep before next try.
if (fetchChunkRetryCnt % 2 == 0) {
@@ -342,7 +342,7 @@ public abstract class RssInputStream extends InputStream {
+ currentReader.getLocation(),
e);
} else {
- if (currentReader.getLocation().getPeer() != null) {
+ if (currentReader.getLocation().hasPeer()) {
logger.warn(
"Fetch chunk failed {}/{} times for location {}, change to
peer",
fetchChunkRetryCnt,
@@ -374,10 +374,10 @@ public abstract class RssInputStream extends InputStream {
private PartitionReader createReader(
PartitionLocation location, int fetchChunkRetryCnt, int
fetchChunkMaxRetry)
throws IOException, InterruptedException {
- if (location.getPeer() == null) {
+ if (!location.hasPeer()) {
logger.debug("Partition {} has only one partition replica.", location);
}
- if (location.getPeer() != null && attemptNumber % 2 == 1) {
+ if (location.hasPeer() && attemptNumber % 2 == 1) {
location = location.getPeer();
logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 1ec8dd3f7..05443b322 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -73,26 +73,26 @@ class WorkerStatusTracker(
case StatusCode.PUSH_DATA_WRITE_FAIL_MASTER =>
blacklistWorker(oldPartition, StatusCode.PUSH_DATA_WRITE_FAIL_MASTER)
case StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE
- if oldPartition.getPeer != null &&
conf.clientBlacklistSlaveEnabled =>
+ if oldPartition.hasPeer && conf.clientBlacklistSlaveEnabled =>
blacklistWorker(oldPartition.getPeer,
StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE)
case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER =>
blacklistWorker(oldPartition,
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER)
case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
- if oldPartition.getPeer != null &&
conf.clientBlacklistSlaveEnabled =>
+ if oldPartition.hasPeer && conf.clientBlacklistSlaveEnabled =>
blacklistWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE)
case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER =>
blacklistWorker(oldPartition,
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER)
case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
- if oldPartition.getPeer != null &&
conf.clientBlacklistSlaveEnabled =>
+ if oldPartition.hasPeer && conf.clientBlacklistSlaveEnabled =>
blacklistWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE)
case StatusCode.PUSH_DATA_TIMEOUT_MASTER =>
blacklistWorker(oldPartition, StatusCode.PUSH_DATA_TIMEOUT_MASTER)
case StatusCode.PUSH_DATA_TIMEOUT_SLAVE
- if oldPartition.getPeer != null &&
conf.clientBlacklistSlaveEnabled =>
+ if oldPartition.hasPeer && conf.clientBlacklistSlaveEnabled =>
blacklistWorker(
oldPartition.getPeer,
StatusCode.PUSH_DATA_TIMEOUT_SLAVE)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index a42c8b2af..b88af9723 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -94,7 +94,7 @@ abstract class CommitHandler(
shuffleCommittedInfo.unhandledPartitionLocations.clear()
currentBatch.foreach { partitionLocation =>
shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation)
- if (partitionLocation.getPeer != null) {
+ if (partitionLocation.hasPeer) {
shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation.getPeer)
}
}
@@ -103,7 +103,7 @@ abstract class CommitHandler(
logDebug(s"Commit current batch HARD_SPLIT partitions for $shuffleId:
" +
s"${currentBatch.map(_.getUniqueId).mkString("[", ",", "]")}")
val workerToRequests = currentBatch.flatMap { partitionLocation =>
- if (partitionLocation.getPeer != null) {
+ if (partitionLocation.hasPeer) {
Seq(partitionLocation, partitionLocation.getPeer)
} else {
Seq(partitionLocation)
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
index 76486d45c..c13693504 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
@@ -234,6 +234,10 @@ public class PartitionLocation implements Serializable {
this.peer = peer;
}
+ public boolean hasPeer() {
+ return peer != null;
+ }
+
public String getUniqueId() {
return id + "-" + epoch;
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 3011a33cc..e6e7a1e6f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -270,7 +270,7 @@ object PbSerDeUtils {
.setReplicatePort(location.getReplicatePort)
.setStorageInfo(StorageInfo.toPb(location.getStorageInfo))
.setMapIdBitmap(Utils.roaringBitmapToByteString(location.getMapIdBitMap))
- if (location.getPeer != null) {
+ if (location.hasPeer) {
val peerBuilder = PbPartitionLocation.newBuilder
if (location.getPeer.getMode eq Mode.MASTER) {
peerBuilder.setMode(PbPartitionLocation.Mode.Master)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index f97fad93f..8b21e7d82 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -160,7 +160,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
}
// Fetch real batchId from body will add more cost and no meaning for
replicate.
- val doReplicate = location != null && location.getPeer != null && isMaster
+ val doReplicate = location != null && location.hasPeer && isMaster
val softSplit = new AtomicBoolean(false)
if (location == null) {
@@ -419,7 +419,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
// Fetch real batchId from body will add more cost and no meaning for
replicate.
val doReplicate =
- partitionIdToLocations.head._2 != null &&
partitionIdToLocations.head._2.getPeer != null && isMaster
+ partitionIdToLocations.head._2 != null &&
partitionIdToLocations.head._2.hasPeer && isMaster
// find FileWriters responsible for the data
partitionIdToLocations.foreach { case (id, loc) =>
@@ -770,7 +770,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
fileWriter.incrementPendingWrites()
// for master, send data to slave
- if (location.getPeer != null && isMaster) {
+ if (location.hasPeer && isMaster) {
// to do
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
} else {
@@ -893,7 +893,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
}
// for master, send data to slave
- if (location.getPeer != null && isMaster) {
+ if (location.hasPeer && isMaster) {
// TODO replica
wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
} else {