Gleb Abroskin created SPARK-40230:
-------------------------------------
Summary: Executor connection issue in hybrid cloud deployment
Key: SPARK-40230
URL: https://issues.apache.org/jira/browse/SPARK-40230
Project: Spark
Issue Type: Bug
Components: Block Manager, Kubernetes
Affects Versions: 3.2.1
Environment: About the k8s setup:
* 6+ nodes in AWS
* 4 nodes in DC
Spark 3.2.1 + spark-hadoop-cloud 3.2.1
{code:java}
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home
spark-submit \
--master k8s://https://ifunny-ml-kubemaster.ash1.fun.co:6443 \
--conf
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
\
--conf spark.submit.deployMode=cluster \
--conf spark.kubernetes.namespace=ml \
--conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
--conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
--conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
--conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
--conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
--conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
--conf "spark.hadoop.fs.s3a.access.key=XXX" \
--conf "spark.hadoop.fs.s3a.secret.key=XXX" \
--conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
--conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
--conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
--conf spark.sql.shuffle.partitions=500 \
--num-executors 100 \
--driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
\
--name k8s-pyspark-test \
main.py{code}
main.py is just pi.py from the examples, modified to work on 100 machines (this
is a way to make sure executors are deployed in both AWS & DC)
{code:java}
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("PythonPi") \
.getOrCreate()
spark.sparkContext.setLogLevel("TRACE")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
n = 10000000 * partitions
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1),
partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n)) {code}
Reporter: Gleb Abroskin
I understand that the issue is quite subtle and might be hard to debug, still I
was not able to find issue with our infra, so I guess that is something inside
the spark.
We deploy spark application in k8s and everything works well, if all the driver
& executor pods are either in AWS or our DC, but in case they are split between
datacenters something strange happens, for example, logs of one of the
executors inside the DC
{code:java}
22/08/26 07:55:35 INFO TransportClientFactory: Successfully created connection
to /172.19.149.92:39414 after 50 ms (1 ms spent in bootstraps)
22/08/26 07:55:35 TRACE TransportClient: Sending RPC to /172.19.149.92:39414
22/08/26 07:55:35 TRACE TransportClient: Sending request RPC
4860401977118244334 to /172.19.149.92:39414 took 3 ms
22/08/26 07:55:35 DEBUG TransportClient: Sending fetch chunk request 0 to
/172.19.149.92:39414
22/08/26 07:55:35 TRACE TransportClient: Sending request
StreamChunkId[streamId=1644979023003,chunkIndex=0] to /172.19.149.92:39414 took
0 ms
22/08/26 07:57:35 ERROR TransportChannelHandler: Connection to
/172.19.149.92:39414 has been quiet for 120000 ms while there are outstanding
requests. Assuming connection is dead; please adjust
spark.shuffle.io.connectionTimeout if this is wrong. {code}
The executor successfully creates connection & sends the request, but the
connection was assumed dead. Even stranger the executor on ip 172.19.149.92
have sent the response back, which I can confirm with following logs
{code:java}
22/08/26 07:55:35 TRACE MessageDecoder: Received message ChunkFetchRequest:
ChunkFetchRequest[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0]]
22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Received req from
/172.19.123.197:37626 to fetch block
StreamChunkId[streamId=1644979023003,chunkIndex=0]
22/08/26 07:55:35 TRACE OneForOneStreamManager: Removing stream id 1644979023003
22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for
broadcast_0_piece0
--
22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for
broadcast_0_piece0
22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Sent result
ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0],buffer=org.apache.spark.storage.BlockManagerManagedBuffer@79b43e2a]
to client /172.19.123.197:37626 {code}
A few suspicious moments here:
* connection to pod looks like /<IP>, while connection to driver looks like
<POD_NAME>.<NAMESPACE>.svc/<IP>
* Task *-1024* releasing lock for broadcast_0_piece0
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]