[ 
https://issues.apache.org/jira/browse/SPARK-40230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gleb Abroskin updated SPARK-40230:
----------------------------------
    Environment: 
About the k8s setup:
 * 6+ nodes in AWS
 * 4 nodes in DC

Spark 3.2.1 + spark-hadoop-cloud 3.2.1
{code:java}
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home
 spark-submit \
  --master k8s://https://kubemaster:6443 \
  --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
 \
  --conf spark.submit.deployMode=cluster \
  --conf spark.kubernetes.namespace=ml \
  --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
  --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
  --conf 
spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
  --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
  --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
  --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
  --conf "spark.hadoop.fs.s3a.access.key=XXX" \
  --conf "spark.hadoop.fs.s3a.secret.key=XXX" \
  --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
  --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
  --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
  --conf spark.sql.shuffle.partitions=500 \
  --num-executors 100 \
  
--driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
 \
  --name k8s-pyspark-test \
  main.py{code}
main.py is just pi.py from the examples, modified to work on 100 machines (this 
is a way to make sure executors are deployed in both AWS & DC)
{code:java}
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PythonPi") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("TRACE")

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
    n = 10000000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), 
partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n)) {code}

  was:
About the k8s setup:
 * 6+ nodes in AWS
 * 4 nodes in DC

Spark 3.2.1 + spark-hadoop-cloud 3.2.1
{code:java}
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home
 spark-submit \
  --master k8s://https://ifunny-ml-kubemaster.ash1.fun.co:6443 \
  --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
 \
  --conf spark.submit.deployMode=cluster \
  --conf spark.kubernetes.namespace=ml \
  --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
  --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
  --conf 
spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
  --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
  --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
  --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
  --conf "spark.hadoop.fs.s3a.access.key=XXX" \
  --conf "spark.hadoop.fs.s3a.secret.key=XXX" \
  --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
  --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
  --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
  --conf spark.sql.shuffle.partitions=500 \
  --num-executors 100 \
  
--driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
 \
  --name k8s-pyspark-test \
  main.py{code}
main.py is just pi.py from the examples, modified to work on 100 machines (this 
is a way to make sure executors are deployed in both AWS & DC)
{code:java}
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PythonPi") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("TRACE")

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
    n = 10000000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), 
partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n)) {code}


> Executor connection issue in hybrid cloud deployment
> ----------------------------------------------------
>
>                 Key: SPARK-40230
>                 URL: https://issues.apache.org/jira/browse/SPARK-40230
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Kubernetes
>    Affects Versions: 3.2.1
>         Environment: About the k8s setup:
>  * 6+ nodes in AWS
>  * 4 nodes in DC
> Spark 3.2.1 + spark-hadoop-cloud 3.2.1
> {code:java}
> JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home
>  spark-submit \
>   --master k8s://https://kubemaster:6443 \
>   --conf 
> "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
>  \
>   --conf spark.submit.deployMode=cluster \
>   --conf spark.kubernetes.namespace=ml \
>   --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \
>   --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \
>   --conf 
> spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \
>   --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \
>   --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \
>   --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \
>   --conf "spark.hadoop.fs.s3a.access.key=XXX" \
>   --conf "spark.hadoop.fs.s3a.secret.key=XXX" \
>   --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \
>   --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \
>   --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \
>   --conf spark.sql.shuffle.partitions=500 \
>   --num-executors 100 \
>   
> --driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties"
>  \
>   --name k8s-pyspark-test \
>   main.py{code}
> main.py is just pi.py from the examples, modified to work on 100 machines 
> (this is a way to make sure executors are deployed in both AWS & DC)
> {code:java}
> import sys
> from random import random
> from operator import add
> from pyspark.sql import SparkSession
> if __name__ == "__main__":
>     spark = SparkSession \
>         .builder \
>         .appName("PythonPi") \
>         .getOrCreate()
>     spark.sparkContext.setLogLevel("TRACE")
>     partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100
>     n = 10000000 * partitions
>     def f(_: int) -> float:
>         x = random() * 2 - 1
>         y = random() * 2 - 1
>         return 1 if x ** 2 + y ** 2 <= 1 else 0
>     count = spark.sparkContext.parallelize(range(1, n + 1), 
> partitions).map(f).reduce(add)
>     print("Pi is roughly %f" % (4.0 * count / n)) {code}
>            Reporter: Gleb Abroskin
>            Priority: Major
>
> I understand that the issue is quite subtle and might be hard to debug, still 
> I was not able to find issue with our infra, so I guess that is something 
> inside the spark.
> We deploy spark application in k8s and everything works well, if all the 
> driver & executor pods are either in AWS or our DC, but in case they are 
> split between datacenters something strange happens, for example, logs of one 
> of the executors inside the DC
> {code:java}
> 22/08/26 07:55:35 INFO TransportClientFactory: Successfully created 
> connection to /172.19.149.92:39414 after 50 ms (1 ms spent in bootstraps)
> 22/08/26 07:55:35 TRACE TransportClient: Sending RPC to /172.19.149.92:39414
> 22/08/26 07:55:35 TRACE TransportClient: Sending request RPC 
> 4860401977118244334 to /172.19.149.92:39414 took 3 ms
> 22/08/26 07:55:35 DEBUG TransportClient: Sending fetch chunk request 0 to 
> /172.19.149.92:39414
> 22/08/26 07:55:35 TRACE TransportClient: Sending request 
> StreamChunkId[streamId=1644979023003,chunkIndex=0] to /172.19.149.92:39414 
> took 0 ms
> 22/08/26 07:57:35 ERROR TransportChannelHandler: Connection to 
> /172.19.149.92:39414 has been quiet for 120000 ms while there are outstanding 
> requests. Assuming connection is dead; please adjust 
> spark.shuffle.io.connectionTimeout if this is wrong. {code}
> The executor successfully creates connection & sends the request, but the 
> connection was assumed dead. Even stranger the executor on ip 172.19.149.92 
> have sent the response back, which I can confirm with following logs
> {code:java}
> 22/08/26 07:55:35 TRACE MessageDecoder: Received message ChunkFetchRequest: 
> ChunkFetchRequest[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0]]
> 22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Received req from 
> /172.19.123.197:37626 to fetch block 
> StreamChunkId[streamId=1644979023003,chunkIndex=0]
> 22/08/26 07:55:35 TRACE OneForOneStreamManager: Removing stream id 
> 1644979023003
> 22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for 
> broadcast_0_piece0
> --
> 22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for 
> broadcast_0_piece0
> 22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Sent result 
> ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0],buffer=org.apache.spark.storage.BlockManagerManagedBuffer@79b43e2a]
>  to client /172.19.123.197:37626 {code}
> A few suspicious moments here:
>  * connection to pod looks like /<IP>, while connection to driver looks like 
> <POD_NAME>.<NAMESPACE>.svc/<IP>
>  * Task *-1024* releasing lock for broadcast_0_piece0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to