[
https://issues.apache.org/jira/browse/SPARK-40230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gleb Abroskin updated SPARK-40230:
----------------------------------
Environment:
About the k8s setup:
* 6+ nodes in AWS
* 4 nodes in DC
Spark 3.2.1 + spark-hadoop-cloud 3.2.1
{code:java}
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home
spark-submit \
--master k8s://https://kubemaster:6443 \
--conf
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
\
--conf spark.submit.deployMode=cluster \
--conf spark.kubernetes.namespace=ml \
--conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
--conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
--conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
--conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
--conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
--conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
--conf "spark.hadoop.fs.s3a.access.key=XXX" \
--conf "spark.hadoop.fs.s3a.secret.key=XXX" \
--conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
--conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
--conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
--conf spark.sql.shuffle.partitions=500 \
--num-executors 100 \
--driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
\
--name k8s-pyspark-test \
main.py{code}
main.py is just pi.py from the examples, modified to work on 100 machines (this
is a way to make sure executors are deployed in both AWS & DC)
{code:java}
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("PythonPi") \
.getOrCreate()
spark.sparkContext.setLogLevel("TRACE")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
n = 10000000 * partitions
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1),
partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n)) {code}
was:
About the k8s setup:
* 6+ nodes in AWS
* 4 nodes in DC
Spark 3.2.1 + spark-hadoop-cloud 3.2.1
{code:java}
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home
spark-submit \
--master k8s://https://ifunny-ml-kubemaster.ash1.fun.co:6443 \
--conf
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
\
--conf spark.submit.deployMode=cluster \
--conf spark.kubernetes.namespace=ml \
--conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
--conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
--conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
--conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
--conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
--conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
--conf "spark.hadoop.fs.s3a.access.key=XXX" \
--conf "spark.hadoop.fs.s3a.secret.key=XXX" \
--conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
--conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
--conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
--conf spark.sql.shuffle.partitions=500 \
--num-executors 100 \
--driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
\
--name k8s-pyspark-test \
main.py{code}
main.py is just pi.py from the examples, modified to work on 100 machines (this
is a way to make sure executors are deployed in both AWS & DC)
{code:java}
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("PythonPi") \
.getOrCreate()
spark.sparkContext.setLogLevel("TRACE")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
n = 10000000 * partitions
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1),
partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n)) {code}
> Executor connection issue in hybrid cloud deployment
> ----------------------------------------------------
>
> Key: SPARK-40230
> URL: https://issues.apache.org/jira/browse/SPARK-40230
> Project: Spark
> Issue Type: Bug
> Components: Block Manager, Kubernetes
> Affects Versions: 3.2.1
> Environment: About the k8s setup:
> * 6+ nodes in AWS
> * 4 nodes in DC
> Spark 3.2.1 + spark-hadoop-cloud 3.2.1
> {code:java}
> JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home
> spark-submit \
> --master k8s://https://kubemaster:6443 \
> --conf
> "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
> \
> --conf spark.submit.deployMode=cluster \
> --conf spark.kubernetes.namespace=ml \
> --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
> --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
> --conf
> spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
> --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
> --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
> --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
> --conf "spark.hadoop.fs.s3a.access.key=XXX" \
> --conf "spark.hadoop.fs.s3a.secret.key=XXX" \
> --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
> --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
> --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
> --conf spark.sql.shuffle.partitions=500 \
> --num-executors 100 \
>
> --driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
> \
> --name k8s-pyspark-test \
> main.py{code}
> main.py is just pi.py from the examples, modified to work on 100 machines
> (this is a way to make sure executors are deployed in both AWS & DC)
> {code:java}
> import sys
> from random import random
> from operator import add
> from pyspark.sql import SparkSession
> if __name__ == "__main__":
> spark = SparkSession \
> .builder \
> .appName("PythonPi") \
> .getOrCreate()
> spark.sparkContext.setLogLevel("TRACE")
> partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
> n = 10000000 * partitions
> def f(_: int) -> float:
> x = random() * 2 - 1
> y = random() * 2 - 1
> return 1 if x ** 2 + y ** 2 <= 1 else 0
> count = spark.sparkContext.parallelize(range(1, n + 1),
> partitions).map(f).reduce(add)
> print("Pi is roughly %f" % (4.0 * count / n)) {code}
> Reporter: Gleb Abroskin
> Priority: Major
>
> I understand that the issue is quite subtle and might be hard to debug, still
> I was not able to find issue with our infra, so I guess that is something
> inside the spark.
> We deploy spark application in k8s and everything works well, if all the
> driver & executor pods are either in AWS or our DC, but in case they are
> split between datacenters something strange happens, for example, logs of one
> of the executors inside the DC
> {code:java}
> 22/08/26 07:55:35 INFO TransportClientFactory: Successfully created
> connection to /172.19.149.92:39414 after 50 ms (1 ms spent in bootstraps)
> 22/08/26 07:55:35 TRACE TransportClient: Sending RPC to /172.19.149.92:39414
> 22/08/26 07:55:35 TRACE TransportClient: Sending request RPC
> 4860401977118244334 to /172.19.149.92:39414 took 3 ms
> 22/08/26 07:55:35 DEBUG TransportClient: Sending fetch chunk request 0 to
> /172.19.149.92:39414
> 22/08/26 07:55:35 TRACE TransportClient: Sending request
> StreamChunkId[streamId=1644979023003,chunkIndex=0] to /172.19.149.92:39414
> took 0 ms
> 22/08/26 07:57:35 ERROR TransportChannelHandler: Connection to
> /172.19.149.92:39414 has been quiet for 120000 ms while there are outstanding
> requests. Assuming connection is dead; please adjust
> spark.shuffle.io.connectionTimeout if this is wrong. {code}
> The executor successfully creates connection & sends the request, but the
> connection was assumed dead. Even stranger the executor on ip 172.19.149.92
> have sent the response back, which I can confirm with following logs
> {code:java}
> 22/08/26 07:55:35 TRACE MessageDecoder: Received message ChunkFetchRequest:
> ChunkFetchRequest[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0]]
> 22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Received req from
> /172.19.123.197:37626 to fetch block
> StreamChunkId[streamId=1644979023003,chunkIndex=0]
> 22/08/26 07:55:35 TRACE OneForOneStreamManager: Removing stream id
> 1644979023003
> 22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for
> broadcast_0_piece0
> --
> 22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for
> broadcast_0_piece0
> 22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Sent result
> ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0],buffer=org.apache.spark.storage.BlockManagerManagedBuffer@79b43e2a]
> to client /172.19.123.197:37626 {code}
> A few suspicious moments here:
> * connection to pod looks like /<IP>, while connection to driver looks like
> <POD_NAME>.<NAMESPACE>.svc/<IP>
> * Task *-1024* releasing lock for broadcast_0_piece0
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]