mahmoudyusofi opened a new issue, #3511: URL: https://github.com/apache/celeborn/issues/3511
### What is the bug(with logs or screenshots)? I am trying to exclude workers on failure using the property in the documentation `celeborn.client.fetch.excludeWorkerOnFailure.enabled` to avoid wasting 30 seconds (3 retries each timing out after 10 seconds) per shuffle read operation. The property works as it does adds the worker to the excluded list, however, I see in the code as well as the logs, that celeborn tries to connect to the excluded worker anyways. Referring to https://github.com/apache/celeborn/blob/770c07431b241b2fcd802c15f10e25aa6f648703/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L484 where a connection is trying to get established before switching to peer without checking if the worker is excluded. ### How to reproduce the bug? 1. Deploy celeborn to kubernetes via helm setting worker replicas to 3 2. run spark application that does some long shuffle operations connecting it to celeborn as a shuffle service 3. configure `celeborn.client.fetch.excludeWorkerOnFailure.enabled=true` and `celeborn.client.push.replicate.enabled=true` 4. mid execution, down scale celeborn workers to 2 replicas using `kubectl scale` command 5. observe worker logs when reducer tasks start running The following logs will be observed multiple times and the application will be much slower (30 seconds per shuffle read operation added) ``` 25/10/19 11:03:46 WARN TransportClientFactory: Retry create client, times 1/3 with error: Connecting to /10.244.0.28:34549 timed out (10000 ms) org.apache.celeborn.common.exception.CelebornIOException: Connecting to /10.244.0.28:34549 timed out (10000 ms) at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:314) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:252) at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:159) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:147) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:259) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$6(CelebornShuffleReader.scala:214) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.makeOpenStreamList$1(CelebornShuffleReader.scala:207) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$9(CelebornShuffleReader.scala:260) at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:242) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:232) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) 25/10/19 11:03:51 INFO TransportClientFactory: Found inactive connection to /10.244.0.28:34549, creating a new one. 25/10/19 11:04:01 WARN TransportClientFactory: Retry create client, times 2/3 with error: Connecting to /10.244.0.28:34549 timed out (10000 ms) org.apache.celeborn.common.exception.CelebornIOException: Connecting to /10.244.0.28:34549 timed out (10000 ms) at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:314) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:252) at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:159) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:147) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:259) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$6(CelebornShuffleReader.scala:214) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.makeOpenStreamList$1(CelebornShuffleReader.scala:207) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$9(CelebornShuffleReader.scala:260) at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:242) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:232) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) 25/10/19 11:04:06 INFO TransportClientFactory: Found inactive connection to /10.244.0.28:34549, creating a new one. 25/10/19 11:04:16 WARN TransportClientFactory: Retry create client, times 3/3 with error: Connecting to /10.244.0.28:34549 timed out (10000 ms) org.apache.celeborn.common.exception.CelebornIOException: Connecting to /10.244.0.28:34549 timed out (10000 ms) at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:314) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:252) at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:159) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:147) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:259) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$6(CelebornShuffleReader.scala:214) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.makeOpenStreamList$1(CelebornShuffleReader.scala:207) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$9(CelebornShuffleReader.scala:260) at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:242) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:232) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) 25/10/19 11:04:16 WARN CelebornShuffleReader: Failed to create client for spark-2055e9b332bd46da9da59d51be5b3c45-1-393 from host: 10.244.0.28:34549. Shuffle reader will try its replica if exists. 25/10/19 11:04:16 INFO CelebornShuffleReader: BatchOpenStream for 1 cost 40026ms 25/10/19 11:04:16 WARN CelebornInputStream: CreatePartitionReader failed 1/6 times for location PartitionLocation[ id-epoch:393-0 host-rpcPort-pushPort-fetchPort-replicatePort:10.244.0.28-40759-35523-34549-40859 mode:PRIMARY peer:(host-rpcPort-pushPort-fetchPort-replicatePort:10.244.2.27-34003-42229-45749-46535) storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, filePath=, fileSize=20715, chunkOffsets=[0, 20715]} mapIdBitMap:null], change to peer org.apache.celeborn.common.exception.CelebornIOException: Fetch data from excluded worker! PartitionLocation[ id-epoch:393-0 host-rpcPort-pushPort-fetchPort-replicatePort:10.244.0.28-40759-35523-34549-40859 mode:PRIMARY peer:(host-rpcPort-pushPort-fetchPort-replicatePort:10.244.2.27-34003-42229-45749-46535) storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, filePath=, fileSize=20715, chunkOffsets=[0, 20715]} mapIdBitMap:null] at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.createReaderWithRetry(CelebornInputStream.java:437) at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.createReaderWithRetry(CelebornInputStream.java:424) at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.moveToNextReader(CelebornInputStream.java:387) at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.<init>(CelebornInputStream.java:327) at org.apache.celeborn.client.read.CelebornInputStream.create(CelebornInputStream.java:108) at org.apache.celeborn.client.ShuffleClientImpl.readPartition(ShuffleClientImpl.java:1961) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.org$apache$spark$shuffle$celeborn$CelebornShuffleReader$$createInputStream$1(CelebornShuffleReader.scala:341) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$3.run(CelebornShuffleReader.scala:358) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
