This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new feaf1393a [CELEBORN-686] Include ConnectException when exclude worker
for fetch
feaf1393a is described below
commit feaf1393a67e023c1a4902c5777120a6ac5f7781
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Fri Jun 16 16:21:57 2023 +0800
[CELEBORN-686] Include ConnectException when exclude worker for fetch
### What changes were proposed in this pull request?
Include ConnectException when exclude worker for fetch
### Why are the changes needed?
Currently RssInputStream.isCriticalCause does not include ConnectException
```
java.io.IOException: Failed to connect to /192.168.1.17:46197
at
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:232)
at
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:176)
at
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:114)
at
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:183)
at
org.apache.celeborn.client.read.WorkerPartitionReader.<init>(WorkerPartitionReader.java:103)
at
org.apache.celeborn.client.read.RssInputStream$RssInputStreamImpl.createReader(RssInputStream.java:399)
at
org.apache.celeborn.client.read.RssInputStream$RssInputStreamImpl.createReaderWithRetry(RssInputStream.java:301)
at
org.apache.celeborn.client.read.RssInputStream$RssInputStreamImpl.moveToNextReader(RssInputStream.java:229)
at
org.apache.celeborn.client.read.RssInputStream$RssInputStreamImpl.<init>(RssInputStream.java:178)
at
org.apache.celeborn.client.read.RssInputStream.create(RssInputStream.java:63)
at
org.apache.celeborn.client.ShuffleClientImpl.readPartition(ShuffleClientImpl.java:1599)
at
org.apache.spark.shuffle.celeborn.RssShuffleReader.$anonfun$read$1(RssShuffleReader.scala:88)
at
org.apache.spark.shuffle.celeborn.RssShuffleReader.$anonfun$read$1$adapted(RssShuffleReader.scala:79)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.sort_addToSorter_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.sort_doSort_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.hasNext(Unknown
Source)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:954)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage18.smj_findNextJoinRows_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage18.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage18.hasNext(Unknown
Source)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:973)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.fastWrite0(HashBasedShuffleWriter.java:251)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:180)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by:
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: /192.168.1.17:46197
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
org.apache.celeborn.shaded.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
at
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test
Closes #1598 from waitinfuture/686.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit fae96f73cb316219e1318967e6f7c3b9051dcd04)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../java/org/apache/celeborn/client/read/RssInputStream.java | 12 ++++++------
.../common/network/client/TransportClientFactory.java | 5 +++--
2 files changed, 9 insertions(+), 8 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 9711266c7..5f88aef9c 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -268,20 +268,20 @@ public abstract class RssInputStream extends InputStream {
}
private boolean isCriticalCause(Exception e) {
- boolean isConnectTimeout =
- e instanceof IOException
- && e.getMessage() != null
- && e.getMessage().startsWith("Connecting to")
- && e.getMessage().contains("timed out");
boolean rpcTimeout =
e instanceof IOException
&& e.getCause() != null
&& e.getCause() instanceof TimeoutException;
+ boolean connectException =
+ e instanceof CelebornIOException
+ && e.getMessage() != null
+ && (e.getMessage().startsWith("Connecting to")
+ || e.getMessage().startsWith("Failed to"));
boolean fetchChunkTimeout =
e instanceof CelebornIOException
&& e.getCause() != null
&& e.getCause() instanceof IOException;
- return isConnectTimeout || rpcTimeout || fetchChunkTimeout;
+ return connectException || rpcTimeout || fetchChunkTimeout;
}
private PartitionReader createReaderWithRetry(PartitionLocation location)
throws IOException {
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index a90c2632d..d8ba62911 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -33,6 +33,7 @@ import io.netty.channel.socket.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.network.TransportContext;
import org.apache.celeborn.common.network.server.TransportChannelHandler;
import org.apache.celeborn.common.network.util.*;
@@ -226,10 +227,10 @@ public class TransportClientFactory implements Closeable {
// Connect to the remote server
ChannelFuture cf = bootstrap.connect(address);
if (!cf.await(conf.connectTimeoutMs())) {
- throw new IOException(
+ throw new CelebornIOException(
String.format("Connecting to %s timed out (%s ms)", address,
conf.connectTimeoutMs()));
} else if (cf.cause() != null) {
- throw new IOException(String.format("Failed to connect to %s", address),
cf.cause());
+ throw new CelebornIOException(String.format("Failed to connect to %s",
address), cf.cause());
}
TransportClient client = clientRef.get();