This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e305c3a5 [CELEBORN-1673][FOLLOWUP] Shouldn't ignore 
InterruptedException when client retry
5e305c3a5 is described below

commit 5e305c3a5a5bc6d42bfdd6b467f807a1baf75627
Author: lvshuang.xjs <[email protected]>
AuthorDate: Fri Jun 6 10:36:05 2025 -0700

    [CELEBORN-1673][FOLLOWUP] Shouldn't ignore InterruptedException when client 
retry
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    Speculatively tasks will be interrupted if another task succeeds. In this 
case, interrupting the speculative execution can lead to client retries, and 
ignoring the InterruptedException might prevent the task from being killed 
promptly.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    PASS GA
    
    Closes #3314 from RexXiong/CELEBORN-1673-FOLLOWUP.
    
    Authored-by: lvshuang.xjs <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../common/network/client/TransportClientFactory.java         | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index fe261f03f..bd666365b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -31,7 +31,6 @@ import java.util.function.Supplier;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.*;
@@ -159,6 +158,10 @@ public class TransportClientFactory implements Closeable {
       try {
         return createClient(remoteHost, remotePort, partitionId, 
supplier.get());
       } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          throw e;
+        }
         numTries++;
         logger.warn(
             "Retry create client, times {}/{} with error: {}",
@@ -166,15 +169,11 @@ public class TransportClientFactory implements Closeable {
             maxClientConnectRetries,
             e.getMessage(),
             e);
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
         if (numTries == maxClientConnectRetries) {
           throw e;
         }
 
-        Uninterruptibles.sleepUninterruptibly(
-            maxClientConnectRetryWaitTimeMs, TimeUnit.MILLISECONDS);
+        Thread.sleep(maxClientConnectRetryWaitTimeMs);
       }
     }
 

Reply via email to