Steven Ottens created SPARK-45769:
-------------------------------------

             Summary: data retrieval fails on executors with spark connect
                 Key: SPARK-45769
                 URL: https://issues.apache.org/jira/browse/SPARK-45769
             Project: Spark
          Issue Type: Bug
          Components: Connect
    Affects Versions: 3.5.0
            Reporter: Steven Ottens


We have an OpenShift cluster with Spark and JupyterHub and we use Spark-Connect 
to access Spark from within Jupyter. This worked fine with Spark 3.4.1. However 
after upgrading to Spark 3.5.0 we were not able to access any data in our Delta 
Tables through Spark. Initially I assumed it was a bug in Delta: 
[https://github.com/delta-io/delta/issues/2235]

The actual error is
{code:java}
SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to 
stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost 
task 0.3 in stage 6.0 (TID 13) (172.31.15.72 executor 4): 
java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of 
org.apache.spark.rdd.MapPartitionsRDD{code}
However after further investigation I discovered that this is a regression in 
Spark 3.5.0. The issue is similar to SPARK-36917, however I am not using any 
custom functions, nor any other classes than spark-connect, and this setup used 
to work in 3.4.1. The issue only occurs when remote executors are used in a 
kubernetes environment. Running a plain Spark-Connect eg
{code:java}
./sbin/start-connect-server.sh --packages 
org.apache.spark:spark-connect_2.12:3.5.0{code}
doesn't produce the error.

The issue occurs both in a full OpenShift cluster as in a tiny minikube setup. 
The steps to reproduce are based on the minikube setup.

You need to have a minimal Spark 3.5.0 setup with 1 driver and at least 1 
executor and use python to access data through Spark. The query I used to test 
this is
{code:java}
from pyspark.sql import SparkSession
logFile = '/opt/spark/work-dir/data.csv'
spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
df = spark.read.csv(logFile)
df.count()


{code}
However it doesn't matter if the data is local, or remote on a S3 storage, nor 
if the data is plain text, CSV or Delta Table.
h3. Steps to reproduce:
 # Install minikube
 # Create a service account 'spark'
{code:java}
kubectl create sa spark{code}

 # Bind the 'edit' role to the service account

{code:java}
kubectl create rolebinding spark-edit \
 --clusterrole=edit \
 --serviceaccount=default:spark \
 --namespace=default{code}
 # Create a service for spark

{code:java}
kubectl create -f service.yml{code}
 # Create a Spark-Connect deployment with the default Spark docker image: 
[https://hub.docker.com/_/spark] (do change the deployment.yml to point to the 
kubernetes API endpoint

{code:java}
kubectl create -f deployment.yml{code}
 # Add data to both the executor and the driver pods, e.g. login on the 
terminal of the pods and run on both pods

{code:java}
touch data.csv
echo id,name > data.csv
echo 1,2 >> data.csv {code}
 # Start a spark-remote session to access the newly created data. I logged in 
on the driver pod and installed the necessary python packages:

{code:java}
python3 -m pip install pandas pyspark grpcio-tools grpcio-status pyarrow{code}
Started a python shell and executed:
{code:java}
from pyspark.sql import SparkSession
logFile = '/opt/spark/work-dir/data.csv'
spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
df = spark.read.csv(logFile)
df.count() {code}
h3. Necessary files:

Service.yml:
{code:java}
apiVersion: v1
kind: Service
metadata:
  labels:
    app: spark-connect        
  name: spark-connect
  namespace: default
spec:
  ipFamilies:
    - IPv4
  ports:
    - name: connect-grpc
      protocol: TCP
      port: 15002 # Port the service listens on.
      targetPort: 15002 # Port on the backing pods to which the service 
forwards connections
    - name: sparkui
      protocol: TCP
      port: 4040 # Port the service listens on.
      targetPort: 4040 # Port on the backing pods to which the service forwards 
connections
    - name: spark-rpc
      protocol: TCP
      port: 7078 # Port the service listens on.
      targetPort: 7078 # Port on the backing pods to which the service forwards 
connections
    - name: blockmanager
      protocol: TCP
      port: 7079 # Port the service listens on.
      targetPort: 7079 # Port on the backing pods to which the service forwards 
connections
  internalTrafficPolicy: Cluster
  type: ClusterIP
  ipFamilyPolicy: SingleStack
  sessionAffinity: None
  selector:
    app: spark-connect {code}
deployment.yml: (do replace the spark.master URL with the correct one for your 
setup)
{code:java}
kind: Deployment
apiVersion: apps/v1
metadata:
   name: spark-connect
   namespace: default
   uid: 3a1b448e-4594-47a9-95f6-a82ea4ac9341
   resourceVersion: '6107'
   generation: 23
   creationTimestamp: '2023-10-31T13:35:46Z'
   labels:
     k8s-app: spark-connect
spec:
   replicas: 1
   selector:
     matchLabels:
       k8s-app: spark-connect
   template:
     metadata:
       name: spark-connect
       creationTimestamp: null
       labels:
         k8s-app: spark-connect
     spec:
       serviceAccount: spark
       containers:
         - name: spark-connect
           image: spark
           command:
             - /opt/entrypoint.sh
             - driver
           args:
             - '--class'
             - org.apache.spark.sql.connect.service.SparkConnectServer
             - '--name'
             - spark-connect
             - '--conf'
             - spark.driver.blockManager.port=7079
             - '--conf'
             - spark.driver.port=7078
             - '--conf'
             - spark.driver.host=spark-connect
             - '--conf'
             - spark.master=k8s://https://<Kubernetes API Address eg 
https://192.168.49.2:8443>
             - '--conf'
             - spark.kubernetes.namespace=default
             - '--conf'
             - spark.kubernetes.container.image=spark:latest
             - '--conf'
             - spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp
             - '--conf'
             - spark.driver.extraJavaOptions=-Divy.home=/tmp             
             - '--conf'
             - 
spark.kubernetes.driver.label.app.kubernetes.io/part-of=spark-connect
             - '--conf'
             - spark.kubernetes.executor.label.app=spark-connect
             
             - '--conf'
             - spark.executor.memory=1g
             - '--conf'
             - spark.executor.cores=1
             - '--conf'
             - spark.executor.instances=1
             - '--conf'
             - spark.kubernetes.executor.podNamePrefix=spark-connect
             - '--packages'
             - org.apache.spark:spark-connect_2.12:3.5.0
           env:
             - name: SPARK_DRIVER_BIND_ADDRESS
               valueFrom:
                 fieldRef:
                   apiVersion: v1
                   fieldPath: status.podIP
           resources: {}
           terminationMessagePath: /dev/termination-log
           terminationMessagePolicy: File
           imagePullPolicy: Always
           securityContext:
             privileged: true
       restartPolicy: Always
       terminationGracePeriodSeconds: 30
       dnsPolicy: ClusterFirst
       securityContext: {}
       schedulerName: default-scheduler
   strategy:
     type: RollingUpdate
     rollingUpdate:
       maxUnavailable: 25%
       maxSurge: 25%
   revisionHistoryLimit: 10
   progressDeadlineSeconds: 600 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to