[
https://issues.apache.org/jira/browse/SPARK-45769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058537#comment-18058537
]
Kamal Kumar commented on SPARK-45769:
-------------------------------------
Was this ever resolved ? we are getting the same with 3.5.5.
> data retrieval fails on executors with spark connect
> ----------------------------------------------------
>
> Key: SPARK-45769
> URL: https://issues.apache.org/jira/browse/SPARK-45769
> Project: Spark
> Issue Type: Bug
> Components: Connect
> Affects Versions: 3.5.0
> Reporter: Steven Ottens
> Priority: Major
>
> We have an OpenShift cluster with Spark and JupyterHub and we use
> Spark-Connect to access Spark from within Jupyter. This worked fine with
> Spark 3.4.1. However after upgrading to Spark 3.5.0 we were not able to
> access any data in our Delta Tables through Spark. Initially I assumed it was
> a bug in Delta: [https://github.com/delta-io/delta/issues/2235]
> The actual error is
> {code:java}
> SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due
> to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure:
> Lost task 0.3 in stage 6.0 (TID 13) (172.31.15.72 executor 4):
> java.lang.ClassCastException: cannot assign instance of
> java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
> of org.apache.spark.rdd.MapPartitionsRDD{code}
> However after further investigation I discovered that this is a regression in
> Spark 3.5.0. The issue is similar to SPARK-36917, however I am not using any
> custom functions, nor any other classes than spark-connect, and this setup
> used to work in 3.4.1. The issue only occurs when remote executors are used
> in a kubernetes environment. Running a plain Spark-Connect eg
> {code:java}
> ./sbin/start-connect-server.sh --packages
> org.apache.spark:spark-connect_2.12:3.5.0{code}
> doesn't produce the error.
> The issue occurs both in a full OpenShift cluster as in a tiny minikube
> setup. The steps to reproduce are based on the minikube setup.
> You need to have a minimal Spark 3.5.0 setup with 1 driver and at least 1
> executor and use python to access data through Spark. The query I used to
> test this is
> {code:java}
> from pyspark.sql import SparkSession
> logFile = '/opt/spark/work-dir/data.csv'
> spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
> df = spark.read.csv(logFile)
> df.count()
> {code}
> However it doesn't matter if the data is local, or remote on a S3 storage,
> nor if the data is plain text, CSV or Delta Table.
> h3. Steps to reproduce:
> # Install minikube
> # Create a service account 'spark'
> {code:java}
> kubectl create sa spark{code}
> # Bind the 'edit' role to the service account
> {code:java}
> kubectl create rolebinding spark-edit \
> --clusterrole=edit \
> --serviceaccount=default:spark \
> --namespace=default{code}
> # Create a service for spark
> {code:java}
> kubectl create -f service.yml{code}
> # Create a Spark-Connect deployment with the default Spark docker image:
> [https://hub.docker.com/_/spark] (do change the deployment.yml to point to
> the kubernetes API endpoint
> {code:java}
> kubectl create -f deployment.yml{code}
> # Add data to both the executor and the driver pods, e.g. login on the
> terminal of the pods and run on both pods
> {code:java}
> touch data.csv
> echo id,name > data.csv
> echo 1,2 >> data.csv {code}
> # Start a spark-remote session to access the newly created data. I logged in
> on the driver pod and installed the necessary python packages:
> {code:java}
> python3 -m pip install pandas pyspark grpcio-tools grpcio-status pyarrow{code}
> Started a python shell and executed:
> {code:java}
> from pyspark.sql import SparkSession
> logFile = '/opt/spark/work-dir/data.csv'
> spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
> df = spark.read.csv(logFile)
> df.count() {code}
> h3. Necessary files:
> Service.yml:
> {code:java}
> apiVersion: v1
> kind: Service
> metadata:
> labels:
> app: spark-connect
> name: spark-connect
> namespace: default
> spec:
> ipFamilies:
> - IPv4
> ports:
> - name: connect-grpc
> protocol: TCP
> port: 15002 # Port the service listens on.
> targetPort: 15002 # Port on the backing pods to which the service
> forwards connections
> - name: sparkui
> protocol: TCP
> port: 4040 # Port the service listens on.
> targetPort: 4040 # Port on the backing pods to which the service
> forwards connections
> - name: spark-rpc
> protocol: TCP
> port: 7078 # Port the service listens on.
> targetPort: 7078 # Port on the backing pods to which the service
> forwards connections
> - name: blockmanager
> protocol: TCP
> port: 7079 # Port the service listens on.
> targetPort: 7079 # Port on the backing pods to which the service
> forwards connections
> internalTrafficPolicy: Cluster
> type: ClusterIP
> ipFamilyPolicy: SingleStack
> sessionAffinity: None
> selector:
> app: spark-connect {code}
> deployment.yml: (do replace the spark.master URL with the correct one for
> your setup)
> {code:java}
> kind: Deployment
> apiVersion: apps/v1
> metadata:
> name: spark-connect
> namespace: default
> uid: 3a1b448e-4594-47a9-95f6-a82ea4ac9341
> resourceVersion: '6107'
> generation: 23
> creationTimestamp: '2023-10-31T13:35:46Z'
> labels:
> k8s-app: spark-connect
> spec:
> replicas: 1
> selector:
> matchLabels:
> k8s-app: spark-connect
> template:
> metadata:
> name: spark-connect
> creationTimestamp: null
> labels:
> k8s-app: spark-connect
> spec:
> serviceAccount: spark
> containers:
> - name: spark-connect
> image: spark
> command:
> - /opt/entrypoint.sh
> - driver
> args:
> - '--class'
> - org.apache.spark.sql.connect.service.SparkConnectServer
> - '--name'
> - spark-connect
> - '--conf'
> - spark.driver.blockManager.port=7079
> - '--conf'
> - spark.driver.port=7078
> - '--conf'
> - spark.driver.host=spark-connect
> - '--conf'
> - spark.master=k8s://https://<Kubernetes API Address eg
> https://192.168.49.2:8443>
> - '--conf'
> - spark.kubernetes.namespace=default
> - '--conf'
> - spark.kubernetes.container.image=spark:latest
> - '--conf'
> - spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp
> - '--conf'
> - spark.driver.extraJavaOptions=-Divy.home=/tmp
> - '--conf'
> -
> spark.kubernetes.driver.label.app.kubernetes.io/part-of=spark-connect
> - '--conf'
> - spark.kubernetes.executor.label.app=spark-connect
>
> - '--conf'
> - spark.executor.memory=1g
> - '--conf'
> - spark.executor.cores=1
> - '--conf'
> - spark.executor.instances=1
> - '--conf'
> - spark.kubernetes.executor.podNamePrefix=spark-connect
> - '--packages'
> - org.apache.spark:spark-connect_2.12:3.5.0
> env:
> - name: SPARK_DRIVER_BIND_ADDRESS
> valueFrom:
> fieldRef:
> apiVersion: v1
> fieldPath: status.podIP
> resources: {}
> terminationMessagePath: /dev/termination-log
> terminationMessagePolicy: File
> imagePullPolicy: Always
> securityContext:
> privileged: true
> restartPolicy: Always
> terminationGracePeriodSeconds: 30
> dnsPolicy: ClusterFirst
> securityContext: {}
> schedulerName: default-scheduler
> strategy:
> type: RollingUpdate
> rollingUpdate:
> maxUnavailable: 25%
> maxSurge: 25%
> revisionHistoryLimit: 10
> progressDeadlineSeconds: 600 {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]