mahmoudyusofi opened a new issue, #3511:
URL: https://github.com/apache/celeborn/issues/3511

   ### What is the bug(with logs or screenshots)?
   I am trying to exclude workers on failure using the property in the 
documentation `celeborn.client.fetch.excludeWorkerOnFailure.enabled` to avoid 
wasting 30 seconds (3 retries each timing out after 10 seconds) per shuffle 
read operation.
   
   The property works as it does adds the worker to the excluded list, however, 
I see in the code as well as the logs, that celeborn tries to connect to the 
excluded worker anyways.
   
   Referring to 
https://github.com/apache/celeborn/blob/770c07431b241b2fcd802c15f10e25aa6f648703/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L484
 
   
   where a connection is trying to get established before switching to peer 
without checking if the worker is excluded.
   
   ### How to reproduce the bug?
   1. Deploy celeborn to kubernetes via helm setting worker replicas to 3
   2. run spark application that does some long shuffle operations connecting 
it to celeborn as a shuffle service
   3. configure `celeborn.client.fetch.excludeWorkerOnFailure.enabled=true` and 
`celeborn.client.push.replicate.enabled=true`
   4. mid execution, down scale celeborn workers to 2 replicas using `kubectl 
scale` command
   5. observe worker logs when reducer tasks start running
   
   The following logs will be observed multiple times and the application will 
be much slower (30 seconds per shuffle read operation added)
   
   ```
   25/10/19 11:03:46 WARN TransportClientFactory: Retry create client, times 
1/3 with error: Connecting to /10.244.0.28:34549 timed out (10000 ms)
   org.apache.celeborn.common.exception.CelebornIOException: Connecting to 
/10.244.0.28:34549 timed out (10000 ms)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:314)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:252)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:159)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:147)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:259)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$6(CelebornShuffleReader.scala:214)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.makeOpenStreamList$1(CelebornShuffleReader.scala:207)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$9(CelebornShuffleReader.scala:260)
        at 
scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:242)
        at 
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:232)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   25/10/19 11:03:51 INFO TransportClientFactory: Found inactive connection to 
/10.244.0.28:34549, creating a new one.
   25/10/19 11:04:01 WARN TransportClientFactory: Retry create client, times 
2/3 with error: Connecting to /10.244.0.28:34549 timed out (10000 ms)
   org.apache.celeborn.common.exception.CelebornIOException: Connecting to 
/10.244.0.28:34549 timed out (10000 ms)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:314)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:252)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:159)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:147)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:259)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$6(CelebornShuffleReader.scala:214)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.makeOpenStreamList$1(CelebornShuffleReader.scala:207)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$9(CelebornShuffleReader.scala:260)
        at 
scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:242)
        at 
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:232)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   25/10/19 11:04:06 INFO TransportClientFactory: Found inactive connection to 
/10.244.0.28:34549, creating a new one.
   25/10/19 11:04:16 WARN TransportClientFactory: Retry create client, times 
3/3 with error: Connecting to /10.244.0.28:34549 timed out (10000 ms)
   org.apache.celeborn.common.exception.CelebornIOException: Connecting to 
/10.244.0.28:34549 timed out (10000 ms)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:314)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:252)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:159)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:147)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:259)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$6(CelebornShuffleReader.scala:214)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.makeOpenStreamList$1(CelebornShuffleReader.scala:207)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$9(CelebornShuffleReader.scala:260)
        at 
scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:242)
        at 
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:232)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   25/10/19 11:04:16 WARN CelebornShuffleReader: Failed to create client for 
spark-2055e9b332bd46da9da59d51be5b3c45-1-393 from host: 10.244.0.28:34549. 
Shuffle reader will try its replica if exists.
   25/10/19 11:04:16 INFO CelebornShuffleReader: BatchOpenStream for 1 cost 
40026ms
   25/10/19 11:04:16 WARN CelebornInputStream: CreatePartitionReader failed 1/6 
times for location PartitionLocation[
     id-epoch:393-0
     
host-rpcPort-pushPort-fetchPort-replicatePort:10.244.0.28-40759-35523-34549-40859
     mode:PRIMARY
     
peer:(host-rpcPort-pushPort-fetchPort-replicatePort:10.244.2.27-34003-42229-45749-46535)
     storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, 
filePath=, fileSize=20715, chunkOffsets=[0, 20715]}
     mapIdBitMap:null], change to peer
   org.apache.celeborn.common.exception.CelebornIOException: Fetch data from 
excluded worker! PartitionLocation[
     id-epoch:393-0
     
host-rpcPort-pushPort-fetchPort-replicatePort:10.244.0.28-40759-35523-34549-40859
     mode:PRIMARY
     
peer:(host-rpcPort-pushPort-fetchPort-replicatePort:10.244.2.27-34003-42229-45749-46535)
     storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, 
filePath=, fileSize=20715, chunkOffsets=[0, 20715]}
     mapIdBitMap:null]
        at 
org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.createReaderWithRetry(CelebornInputStream.java:437)
        at 
org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.createReaderWithRetry(CelebornInputStream.java:424)
        at 
org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.moveToNextReader(CelebornInputStream.java:387)
        at 
org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.<init>(CelebornInputStream.java:327)
        at 
org.apache.celeborn.client.read.CelebornInputStream.create(CelebornInputStream.java:108)
        at 
org.apache.celeborn.client.ShuffleClientImpl.readPartition(ShuffleClientImpl.java:1961)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.org$apache$spark$shuffle$celeborn$CelebornShuffleReader$$createInputStream$1(CelebornShuffleReader.scala:341)
        at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$3.run(CelebornShuffleReader.scala:358)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to