Gleb Abroskin created SPARK-40230:
-------------------------------------

             Summary: Executor connection issue in hybrid cloud deployment
                 Key: SPARK-40230
                 URL: https://issues.apache.org/jira/browse/SPARK-40230
             Project: Spark
          Issue Type: Bug
          Components: Block Manager, Kubernetes
    Affects Versions: 3.2.1
         Environment: About the k8s setup:
 * 6+ nodes in AWS
 * 4 nodes in DC

Spark 3.2.1 + spark-hadoop-cloud 3.2.1
{code:java}
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home
 spark-submit \
  --master k8s://https://ifunny-ml-kubemaster.ash1.fun.co:6443 \
  --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
 \
  --conf spark.submit.deployMode=cluster \
  --conf spark.kubernetes.namespace=ml \
  --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
  --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
  --conf 
spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
  --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
  --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
  --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
  --conf "spark.hadoop.fs.s3a.access.key=XXX" \
  --conf "spark.hadoop.fs.s3a.secret.key=XXX" \
  --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
  --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
  --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
  --conf spark.sql.shuffle.partitions=500 \
  --num-executors 100 \
  
--driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
 \
  --name k8s-pyspark-test \
  main.py{code}
main.py is just pi.py from the examples, modified to work on 100 machines (this 
is a way to make sure executors are deployed in both AWS & DC)
{code:java}
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PythonPi") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("TRACE")

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
    n = 10000000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), 
partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n)) {code}
            Reporter: Gleb Abroskin


I understand that the issue is quite subtle and might be hard to debug, still I 
was not able to find issue with our infra, so I guess that is something inside 
the spark.

We deploy spark application in k8s and everything works well, if all the driver 
& executor pods are either in AWS or our DC, but in case they are split between 
datacenters something strange happens, for example, logs of one of the 
executors inside the DC
{code:java}
22/08/26 07:55:35 INFO TransportClientFactory: Successfully created connection 
to /172.19.149.92:39414 after 50 ms (1 ms spent in bootstraps)
22/08/26 07:55:35 TRACE TransportClient: Sending RPC to /172.19.149.92:39414
22/08/26 07:55:35 TRACE TransportClient: Sending request RPC 
4860401977118244334 to /172.19.149.92:39414 took 3 ms
22/08/26 07:55:35 DEBUG TransportClient: Sending fetch chunk request 0 to 
/172.19.149.92:39414
22/08/26 07:55:35 TRACE TransportClient: Sending request 
StreamChunkId[streamId=1644979023003,chunkIndex=0] to /172.19.149.92:39414 took 
0 ms
22/08/26 07:57:35 ERROR TransportChannelHandler: Connection to 
/172.19.149.92:39414 has been quiet for 120000 ms while there are outstanding 
requests. Assuming connection is dead; please adjust 
spark.shuffle.io.connectionTimeout if this is wrong. {code}
The executor successfully creates connection & sends the request, but the 
connection was assumed dead. Even stranger the executor on ip 172.19.149.92 
have sent the response back, which I can confirm with following logs
{code:java}
22/08/26 07:55:35 TRACE MessageDecoder: Received message ChunkFetchRequest: 
ChunkFetchRequest[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0]]
22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Received req from 
/172.19.123.197:37626 to fetch block 
StreamChunkId[streamId=1644979023003,chunkIndex=0]
22/08/26 07:55:35 TRACE OneForOneStreamManager: Removing stream id 1644979023003
22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for 
broadcast_0_piece0
--
22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for 
broadcast_0_piece0
22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Sent result 
ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0],buffer=org.apache.spark.storage.BlockManagerManagedBuffer@79b43e2a]
 to client /172.19.123.197:37626 {code}
A few suspicious moments here:
 * connection to pod looks like /<IP>, while connection to driver looks like 
<POD_NAME>.<NAMESPACE>.svc/<IP>
 * Task *-1024* releasing lock for broadcast_0_piece0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to