This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new ac7cee376 [CELEBORN-1673][FOLLOWUP] Shouldn't ignore 
InterruptedException when client retry
ac7cee376 is described below

commit ac7cee376f2e1dc32edd2495f2800f3dac16b3e6
Author: lvshuang.xjs <[email protected]>
AuthorDate: Fri Jun 6 10:36:05 2025 -0700

    [CELEBORN-1673][FOLLOWUP] Shouldn't ignore InterruptedException when client 
retry
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    Speculatively tasks will be interrupted if another task succeeds. In this 
case, interrupting the speculative execution can lead to client retries, and 
ignoring the InterruptedException might prevent the task from being killed 
promptly.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    PASS GA
    
    Closes #3314 from RexXiong/CELEBORN-1673-FOLLOWUP.
    
    Authored-by: lvshuang.xjs <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 5e305c3a5a5bc6d42bfdd6b467f807a1baf75627)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../common/network/client/TransportClientFactory.java         | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index fe261f03f..bd666365b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -31,7 +31,6 @@ import java.util.function.Supplier;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.*;
@@ -159,6 +158,10 @@ public class TransportClientFactory implements Closeable {
       try {
         return createClient(remoteHost, remotePort, partitionId, 
supplier.get());
       } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          throw e;
+        }
         numTries++;
         logger.warn(
             "Retry create client, times {}/{} with error: {}",
@@ -166,15 +169,11 @@ public class TransportClientFactory implements Closeable {
             maxClientConnectRetries,
             e.getMessage(),
             e);
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
         if (numTries == maxClientConnectRetries) {
           throw e;
         }
 
-        Uninterruptibles.sleepUninterruptibly(
-            maxClientConnectRetryWaitTimeMs, TimeUnit.MILLISECONDS);
+        Thread.sleep(maxClientConnectRetryWaitTimeMs);
       }
     }
 

Reply via email to