This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new fae96f73c [CELEBORN-686] Include ConnectException when exclude worker 
for fetch
fae96f73c is described below

commit fae96f73cb316219e1318967e6f7c3b9051dcd04
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Fri Jun 16 16:21:57 2023 +0800

    [CELEBORN-686] Include ConnectException when exclude worker for fetch
    
    ### What changes were proposed in this pull request?
    Include ConnectException when exclude worker for fetch
    
    ### Why are the changes needed?
    Currently RssInputStream.isCriticalCause does not include ConnectException
    ```
    java.io.IOException: Failed to connect to /192.168.1.17:46197
            at 
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:232)
            at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:176)
            at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:114)
            at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:183)
            at 
org.apache.celeborn.client.read.WorkerPartitionReader.<init>(WorkerPartitionReader.java:103)
            at 
org.apache.celeborn.client.read.RssInputStream$RssInputStreamImpl.createReader(RssInputStream.java:399)
            at 
org.apache.celeborn.client.read.RssInputStream$RssInputStreamImpl.createReaderWithRetry(RssInputStream.java:301)
            at 
org.apache.celeborn.client.read.RssInputStream$RssInputStreamImpl.moveToNextReader(RssInputStream.java:229)
            at 
org.apache.celeborn.client.read.RssInputStream$RssInputStreamImpl.<init>(RssInputStream.java:178)
            at 
org.apache.celeborn.client.read.RssInputStream.create(RssInputStream.java:63)
            at 
org.apache.celeborn.client.ShuffleClientImpl.readPartition(ShuffleClientImpl.java:1599)
            at 
org.apache.spark.shuffle.celeborn.RssShuffleReader.$anonfun$read$1(RssShuffleReader.scala:88)
            at 
org.apache.spark.shuffle.celeborn.RssShuffleReader.$anonfun$read$1$adapted(RssShuffleReader.scala:79)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
            at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
            at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
            at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.sort_addToSorter_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.sort_doSort_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.hasNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:954)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage18.smj_findNextJoinRows_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage18.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage18.hasNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:973)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
            at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.fastWrite0(HashBasedShuffleWriter.java:251)
            at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:180)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:133)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    Caused by: 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: /192.168.1.17:46197
    Caused by: java.net.ConnectException: Connection refused
            at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
            at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
            at 
org.apache.celeborn.shaded.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:750)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test
    
    Closes #1598 from waitinfuture/686.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../java/org/apache/celeborn/client/read/RssInputStream.java | 12 ++++++------
 .../common/network/client/TransportClientFactory.java        |  5 +++--
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 9711266c7..5f88aef9c 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -268,20 +268,20 @@ public abstract class RssInputStream extends InputStream {
     }
 
     private boolean isCriticalCause(Exception e) {
-      boolean isConnectTimeout =
-          e instanceof IOException
-              && e.getMessage() != null
-              && e.getMessage().startsWith("Connecting to")
-              && e.getMessage().contains("timed out");
       boolean rpcTimeout =
           e instanceof IOException
               && e.getCause() != null
               && e.getCause() instanceof TimeoutException;
+      boolean connectException =
+          e instanceof CelebornIOException
+              && e.getMessage() != null
+              && (e.getMessage().startsWith("Connecting to")
+                  || e.getMessage().startsWith("Failed to"));
       boolean fetchChunkTimeout =
           e instanceof CelebornIOException
               && e.getCause() != null
               && e.getCause() instanceof IOException;
-      return isConnectTimeout || rpcTimeout || fetchChunkTimeout;
+      return connectException || rpcTimeout || fetchChunkTimeout;
     }
 
     private PartitionReader createReaderWithRetry(PartitionLocation location) 
throws IOException {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index a90c2632d..d8ba62911 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -33,6 +33,7 @@ import io.netty.channel.socket.SocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.celeborn.common.exception.CelebornIOException;
 import org.apache.celeborn.common.network.TransportContext;
 import org.apache.celeborn.common.network.server.TransportChannelHandler;
 import org.apache.celeborn.common.network.util.*;
@@ -226,10 +227,10 @@ public class TransportClientFactory implements Closeable {
     // Connect to the remote server
     ChannelFuture cf = bootstrap.connect(address);
     if (!cf.await(conf.connectTimeoutMs())) {
-      throw new IOException(
+      throw new CelebornIOException(
           String.format("Connecting to %s timed out (%s ms)", address, 
conf.connectTimeoutMs()));
     } else if (cf.cause() != null) {
-      throw new IOException(String.format("Failed to connect to %s", address), 
cf.cause());
+      throw new CelebornIOException(String.format("Failed to connect to %s", 
address), cf.cause());
     }
 
     TransportClient client = clientRef.get();

Reply via email to