[ 
https://issues.apache.org/jira/browse/SPARK-45769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058537#comment-18058537
 ] 

Kamal Kumar commented on SPARK-45769:
-------------------------------------

Was this ever resolved ? we are getting the same with 3.5.5.

> data retrieval fails on executors with spark connect
> ----------------------------------------------------
>
>                 Key: SPARK-45769
>                 URL: https://issues.apache.org/jira/browse/SPARK-45769
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect
>    Affects Versions: 3.5.0
>            Reporter: Steven Ottens
>            Priority: Major
>
> We have an OpenShift cluster with Spark and JupyterHub and we use 
> Spark-Connect to access Spark from within Jupyter. This worked fine with 
> Spark 3.4.1. However after upgrading to Spark 3.5.0 we were not able to 
> access any data in our Delta Tables through Spark. Initially I assumed it was 
> a bug in Delta: [https://github.com/delta-io/delta/issues/2235]
> The actual error is
> {code:java}
> SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due 
> to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: 
> Lost task 0.3 in stage 6.0 (TID 13) (172.31.15.72 executor 4): 
> java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance 
> of org.apache.spark.rdd.MapPartitionsRDD{code}
> However after further investigation I discovered that this is a regression in 
> Spark 3.5.0. The issue is similar to SPARK-36917, however I am not using any 
> custom functions, nor any other classes than spark-connect, and this setup 
> used to work in 3.4.1. The issue only occurs when remote executors are used 
> in a kubernetes environment. Running a plain Spark-Connect eg
> {code:java}
> ./sbin/start-connect-server.sh --packages 
> org.apache.spark:spark-connect_2.12:3.5.0{code}
> doesn't produce the error.
> The issue occurs both in a full OpenShift cluster as in a tiny minikube 
> setup. The steps to reproduce are based on the minikube setup.
> You need to have a minimal Spark 3.5.0 setup with 1 driver and at least 1 
> executor and use python to access data through Spark. The query I used to 
> test this is
> {code:java}
> from pyspark.sql import SparkSession
> logFile = '/opt/spark/work-dir/data.csv'
> spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
> df = spark.read.csv(logFile)
> df.count()
> {code}
> However it doesn't matter if the data is local, or remote on a S3 storage, 
> nor if the data is plain text, CSV or Delta Table.
> h3. Steps to reproduce:
>  # Install minikube
>  # Create a service account 'spark'
> {code:java}
> kubectl create sa spark{code}
>  # Bind the 'edit' role to the service account
> {code:java}
> kubectl create rolebinding spark-edit \
>  --clusterrole=edit \
>  --serviceaccount=default:spark \
>  --namespace=default{code}
>  # Create a service for spark
> {code:java}
> kubectl create -f service.yml{code}
>  # Create a Spark-Connect deployment with the default Spark docker image: 
> [https://hub.docker.com/_/spark] (do change the deployment.yml to point to 
> the kubernetes API endpoint
> {code:java}
> kubectl create -f deployment.yml{code}
>  # Add data to both the executor and the driver pods, e.g. login on the 
> terminal of the pods and run on both pods
> {code:java}
> touch data.csv
> echo id,name > data.csv
> echo 1,2 >> data.csv {code}
>  # Start a spark-remote session to access the newly created data. I logged in 
> on the driver pod and installed the necessary python packages:
> {code:java}
> python3 -m pip install pandas pyspark grpcio-tools grpcio-status pyarrow{code}
> Started a python shell and executed:
> {code:java}
> from pyspark.sql import SparkSession
> logFile = '/opt/spark/work-dir/data.csv'
> spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
> df = spark.read.csv(logFile)
> df.count() {code}
> h3. Necessary files:
> Service.yml:
> {code:java}
> apiVersion: v1
> kind: Service
> metadata:
>   labels:
>     app: spark-connect        
>   name: spark-connect
>   namespace: default
> spec:
>   ipFamilies:
>     - IPv4
>   ports:
>     - name: connect-grpc
>       protocol: TCP
>       port: 15002 # Port the service listens on.
>       targetPort: 15002 # Port on the backing pods to which the service 
> forwards connections
>     - name: sparkui
>       protocol: TCP
>       port: 4040 # Port the service listens on.
>       targetPort: 4040 # Port on the backing pods to which the service 
> forwards connections
>     - name: spark-rpc
>       protocol: TCP
>       port: 7078 # Port the service listens on.
>       targetPort: 7078 # Port on the backing pods to which the service 
> forwards connections
>     - name: blockmanager
>       protocol: TCP
>       port: 7079 # Port the service listens on.
>       targetPort: 7079 # Port on the backing pods to which the service 
> forwards connections
>   internalTrafficPolicy: Cluster
>   type: ClusterIP
>   ipFamilyPolicy: SingleStack
>   sessionAffinity: None
>   selector:
>     app: spark-connect {code}
> deployment.yml: (do replace the spark.master URL with the correct one for 
> your setup)
> {code:java}
> kind: Deployment
> apiVersion: apps/v1
> metadata:
>    name: spark-connect
>    namespace: default
>    uid: 3a1b448e-4594-47a9-95f6-a82ea4ac9341
>    resourceVersion: '6107'
>    generation: 23
>    creationTimestamp: '2023-10-31T13:35:46Z'
>    labels:
>      k8s-app: spark-connect
> spec:
>    replicas: 1
>    selector:
>      matchLabels:
>        k8s-app: spark-connect
>    template:
>      metadata:
>        name: spark-connect
>        creationTimestamp: null
>        labels:
>          k8s-app: spark-connect
>      spec:
>        serviceAccount: spark
>        containers:
>          - name: spark-connect
>            image: spark
>            command:
>              - /opt/entrypoint.sh
>              - driver
>            args:
>              - '--class'
>              - org.apache.spark.sql.connect.service.SparkConnectServer
>              - '--name'
>              - spark-connect
>              - '--conf'
>              - spark.driver.blockManager.port=7079
>              - '--conf'
>              - spark.driver.port=7078
>              - '--conf'
>              - spark.driver.host=spark-connect
>              - '--conf'
>              - spark.master=k8s://https://<Kubernetes API Address eg 
> https://192.168.49.2:8443>
>              - '--conf'
>              - spark.kubernetes.namespace=default
>              - '--conf'
>              - spark.kubernetes.container.image=spark:latest
>              - '--conf'
>              - spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp
>              - '--conf'
>              - spark.driver.extraJavaOptions=-Divy.home=/tmp             
>              - '--conf'
>              - 
> spark.kubernetes.driver.label.app.kubernetes.io/part-of=spark-connect
>              - '--conf'
>              - spark.kubernetes.executor.label.app=spark-connect
>              
>              - '--conf'
>              - spark.executor.memory=1g
>              - '--conf'
>              - spark.executor.cores=1
>              - '--conf'
>              - spark.executor.instances=1
>              - '--conf'
>              - spark.kubernetes.executor.podNamePrefix=spark-connect
>              - '--packages'
>              - org.apache.spark:spark-connect_2.12:3.5.0
>            env:
>              - name: SPARK_DRIVER_BIND_ADDRESS
>                valueFrom:
>                  fieldRef:
>                    apiVersion: v1
>                    fieldPath: status.podIP
>            resources: {}
>            terminationMessagePath: /dev/termination-log
>            terminationMessagePolicy: File
>            imagePullPolicy: Always
>            securityContext:
>              privileged: true
>        restartPolicy: Always
>        terminationGracePeriodSeconds: 30
>        dnsPolicy: ClusterFirst
>        securityContext: {}
>        schedulerName: default-scheduler
>    strategy:
>      type: RollingUpdate
>      rollingUpdate:
>        maxUnavailable: 25%
>        maxSurge: 25%
>    revisionHistoryLimit: 10
>    progressDeadlineSeconds: 600 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to