This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b24f86778 [CELEBORN-1510] Partial task unable to switch to the replica
b24f86778 is described below

commit b24f8677844279a1882ad946e05e5483baec1aa3
Author: zhangzhao.08 <[email protected]>
AuthorDate: Tue Dec 17 15:13:14 2024 +0800

    [CELEBORN-1510] Partial task unable to switch to the replica
    
    ### What changes were proposed in this pull request?
    
    Attempt%2 = 1 unable to switch to the replica
    
    ### Why are the changes needed?
    
    When Attempt equals 1, createReader will access location.peer. Special 
circumstances can lead to unexpected behaviors. For example, an exception 
occurs during the process of obtaining data and the peer needs to be used. 
However, this logic will switch to the abnormal node again.
    
    <img width="1626" alt="image" 
src="https://github.com/user-attachments/assets/21c50953-db0f-4717-9b91-e3aeae16ece2";>
    <img width="1652" alt="image" 
src="https://github.com/user-attachments/assets/7430786c-26a4-4b3b-be68-f8bdf780c58c";>
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    Internal test.
    
    Closes #2626 from zhaostu4/switch_replica.
    
    Authored-by: zhangzhao.08 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/client/read/CelebornInputStream.java  | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 670eda47f..2b02da2ba 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -317,6 +317,11 @@ public abstract class CelebornInputStream extends 
InputStream {
 
     private PartitionReader createReaderWithRetry(
         PartitionLocation location, PbStreamHandler pbStreamHandler) throws 
IOException {
+      // For the first time, the location will be selected according to 
attemptNumber
+      if (fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1 && 
location.hasPeer()) {
+        location = location.getPeer();
+        logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
+      }
       Exception lastException = null;
       while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
         try {
@@ -412,12 +417,6 @@ public abstract class CelebornInputStream extends 
InputStream {
         int fetchChunkRetryCnt,
         int fetchChunkMaxRetry)
         throws IOException, InterruptedException {
-      if (!location.hasPeer()) {
-        logger.debug("Partition {} has only one partition replica.", location);
-      } else if (pbStreamHandler == null && attemptNumber % 2 == 1) {
-        location = location.getPeer();
-        logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
-      }
       logger.debug("Create reader for location {}", location);
 
       StorageInfo storageInfo = location.getStorageInfo();

Reply via email to