This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f2357bf75 [CELEBORN-671] Add hasPeer method to PartitionLocation
f2357bf75 is described below

commit f2357bf75cb06232b738130b90dd31a678287234
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Jun 14 10:29:16 2023 +0800

    [CELEBORN-671] Add hasPeer method to PartitionLocation
    
    ### What changes were proposed in this pull request?
    Add hasPeer method to PartitionLocation
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1583 from AngersZhuuuu/CELEBORN-671.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Angerszhuuuu <[email protected]>
---
 .../java/org/apache/celeborn/client/ShuffleClientImpl.java     |  5 ++---
 .../java/org/apache/celeborn/client/read/RssInputStream.java   | 10 +++++-----
 .../scala/org/apache/celeborn/client/WorkerStatusTracker.scala |  8 ++++----
 .../org/apache/celeborn/client/commit/CommitHandler.scala      |  4 ++--
 .../org/apache/celeborn/common/protocol/PartitionLocation.java |  4 ++++
 .../scala/org/apache/celeborn/common/util/PbSerDeUtils.scala   |  2 +-
 .../celeborn/service/deploy/worker/PushDataHandler.scala       |  8 ++++----
 7 files changed, 22 insertions(+), 19 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 611e1dd24..a2a381dea 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -187,8 +187,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     if (blacklist.contains(location.hostAndPushPort())) {
       wrappedCallback.onFailure(new 
CelebornIOException(StatusCode.PUSH_DATA_MASTER_BLACKLISTED));
       return true;
-    } else if (location.getPeer() != null
-        && blacklist.contains(location.getPeer().hostAndPushPort())) {
+    } else if (location.hasPeer() && 
blacklist.contains(location.getPeer().hostAndPushPort())) {
       wrappedCallback.onFailure(new 
CelebornIOException(StatusCode.PUSH_DATA_SLAVE_BLACKLISTED));
       return true;
     } else {
@@ -362,7 +361,7 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   private String genAddressPair(PartitionLocation loc) {
     String addressPair;
-    if (loc.getPeer() != null) {
+    if (loc.hasPeer()) {
       addressPair = loc.hostAndPushPort() + "-" + 
loc.getPeer().hostAndPushPort();
     } else {
       addressPair = loc.hostAndPushPort();
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 98f82a01d..9711266c7 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -185,7 +185,7 @@ public abstract class RssInputStream extends InputStream {
         return false;
       }
       RoaringBitmap bitmap = location.getMapIdBitMap();
-      if (bitmap == null && location.getPeer() != null) {
+      if (bitmap == null && location.hasPeer()) {
         bitmap = location.getPeer().getMapIdBitMap();
       }
       for (int i = startMapIndex; i < endMapIndex; i++) {
@@ -294,7 +294,7 @@ public abstract class RssInputStream extends InputStream {
         } catch (Exception e) {
           excludeFailedLocation(location, e);
           fetchChunkRetryCnt++;
-          if (location.getPeer() != null) {
+          if (location.hasPeer()) {
             // fetchChunkRetryCnt % 2 == 0 means both replicas have been tried,
             // so sleep before next try.
             if (fetchChunkRetryCnt % 2 == 0) {
@@ -342,7 +342,7 @@ public abstract class RssInputStream extends InputStream {
                     + currentReader.getLocation(),
                 e);
           } else {
-            if (currentReader.getLocation().getPeer() != null) {
+            if (currentReader.getLocation().hasPeer()) {
               logger.warn(
                   "Fetch chunk failed {}/{} times for location {}, change to 
peer",
                   fetchChunkRetryCnt,
@@ -374,10 +374,10 @@ public abstract class RssInputStream extends InputStream {
     private PartitionReader createReader(
         PartitionLocation location, int fetchChunkRetryCnt, int 
fetchChunkMaxRetry)
         throws IOException, InterruptedException {
-      if (location.getPeer() == null) {
+      if (!location.hasPeer()) {
         logger.debug("Partition {} has only one partition replica.", location);
       }
-      if (location.getPeer() != null && attemptNumber % 2 == 1) {
+      if (location.hasPeer() && attemptNumber % 2 == 1) {
         location = location.getPeer();
         logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
       }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 1ec8dd3f7..05443b322 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -73,26 +73,26 @@ class WorkerStatusTracker(
         case StatusCode.PUSH_DATA_WRITE_FAIL_MASTER =>
           blacklistWorker(oldPartition, StatusCode.PUSH_DATA_WRITE_FAIL_MASTER)
         case StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE
-            if oldPartition.getPeer != null && 
conf.clientBlacklistSlaveEnabled =>
+            if oldPartition.hasPeer && conf.clientBlacklistSlaveEnabled =>
           blacklistWorker(oldPartition.getPeer, 
StatusCode.PUSH_DATA_WRITE_FAIL_SLAVE)
         case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER =>
           blacklistWorker(oldPartition, 
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER)
         case StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE
-            if oldPartition.getPeer != null && 
conf.clientBlacklistSlaveEnabled =>
+            if oldPartition.hasPeer && conf.clientBlacklistSlaveEnabled =>
           blacklistWorker(
             oldPartition.getPeer,
             StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE)
         case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER =>
           blacklistWorker(oldPartition, 
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER)
         case StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE
-            if oldPartition.getPeer != null && 
conf.clientBlacklistSlaveEnabled =>
+            if oldPartition.hasPeer && conf.clientBlacklistSlaveEnabled =>
           blacklistWorker(
             oldPartition.getPeer,
             StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE)
         case StatusCode.PUSH_DATA_TIMEOUT_MASTER =>
           blacklistWorker(oldPartition, StatusCode.PUSH_DATA_TIMEOUT_MASTER)
         case StatusCode.PUSH_DATA_TIMEOUT_SLAVE
-            if oldPartition.getPeer != null && 
conf.clientBlacklistSlaveEnabled =>
+            if oldPartition.hasPeer && conf.clientBlacklistSlaveEnabled =>
           blacklistWorker(
             oldPartition.getPeer,
             StatusCode.PUSH_DATA_TIMEOUT_SLAVE)
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index a42c8b2af..b88af9723 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -94,7 +94,7 @@ abstract class CommitHandler(
       shuffleCommittedInfo.unhandledPartitionLocations.clear()
       currentBatch.foreach { partitionLocation =>
         shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation)
-        if (partitionLocation.getPeer != null) {
+        if (partitionLocation.hasPeer) {
           
shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation.getPeer)
         }
       }
@@ -103,7 +103,7 @@ abstract class CommitHandler(
         logDebug(s"Commit current batch HARD_SPLIT partitions for $shuffleId: 
" +
           s"${currentBatch.map(_.getUniqueId).mkString("[", ",", "]")}")
         val workerToRequests = currentBatch.flatMap { partitionLocation =>
-          if (partitionLocation.getPeer != null) {
+          if (partitionLocation.hasPeer) {
             Seq(partitionLocation, partitionLocation.getPeer)
           } else {
             Seq(partitionLocation)
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
 
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
index 76486d45c..c13693504 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
@@ -234,6 +234,10 @@ public class PartitionLocation implements Serializable {
     this.peer = peer;
   }
 
+  public boolean hasPeer() {
+    return peer != null;
+  }
+
   public String getUniqueId() {
     return id + "-" + epoch;
   }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 3011a33cc..e6e7a1e6f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -270,7 +270,7 @@ object PbSerDeUtils {
       .setReplicatePort(location.getReplicatePort)
       .setStorageInfo(StorageInfo.toPb(location.getStorageInfo))
       .setMapIdBitmap(Utils.roaringBitmapToByteString(location.getMapIdBitMap))
-    if (location.getPeer != null) {
+    if (location.hasPeer) {
       val peerBuilder = PbPartitionLocation.newBuilder
       if (location.getPeer.getMode eq Mode.MASTER) {
         peerBuilder.setMode(PbPartitionLocation.Mode.Master)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index f97fad93f..8b21e7d82 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -160,7 +160,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       }
 
     // Fetch real batchId from body will add more cost and no meaning for 
replicate.
-    val doReplicate = location != null && location.getPeer != null && isMaster
+    val doReplicate = location != null && location.hasPeer && isMaster
     val softSplit = new AtomicBoolean(false)
 
     if (location == null) {
@@ -419,7 +419,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
 
     // Fetch real batchId from body will add more cost and no meaning for 
replicate.
     val doReplicate =
-      partitionIdToLocations.head._2 != null && 
partitionIdToLocations.head._2.getPeer != null && isMaster
+      partitionIdToLocations.head._2 != null && 
partitionIdToLocations.head._2.hasPeer && isMaster
 
     // find FileWriters responsible for the data
     partitionIdToLocations.foreach { case (id, loc) =>
@@ -770,7 +770,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     fileWriter.incrementPendingWrites()
 
     // for master, send data to slave
-    if (location.getPeer != null && isMaster) {
+    if (location.hasPeer && isMaster) {
       // to do
       wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
     } else {
@@ -893,7 +893,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
           fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
       }
       // for master, send data to slave
-      if (location.getPeer != null && isMaster) {
+      if (location.hasPeer && isMaster) {
         // TODO replica
         wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
       } else {

Reply via email to