Steven Ottens created SPARK-45769:
-------------------------------------
Summary: data retrieval fails on executors with spark connect
Key: SPARK-45769
URL: https://issues.apache.org/jira/browse/SPARK-45769
Project: Spark
Issue Type: Bug
Components: Connect
Affects Versions: 3.5.0
Reporter: Steven Ottens
We have an OpenShift cluster with Spark and JupyterHub and we use Spark-Connect
to access Spark from within Jupyter. This worked fine with Spark 3.4.1. However
after upgrading to Spark 3.5.0 we were not able to access any data in our Delta
Tables through Spark. Initially I assumed it was a bug in Delta:
[https://github.com/delta-io/delta/issues/2235]
The actual error is
{code:java}
SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to
stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost
task 0.3 in stage 6.0 (TID 13) (172.31.15.72 executor 4):
java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of
org.apache.spark.rdd.MapPartitionsRDD{code}
However after further investigation I discovered that this is a regression in
Spark 3.5.0. The issue is similar to SPARK-36917, however I am not using any
custom functions, nor any other classes than spark-connect, and this setup used
to work in 3.4.1. The issue only occurs when remote executors are used in a
kubernetes environment. Running a plain Spark-Connect eg
{code:java}
./sbin/start-connect-server.sh --packages
org.apache.spark:spark-connect_2.12:3.5.0{code}
doesn't produce the error.
The issue occurs both in a full OpenShift cluster as in a tiny minikube setup.
The steps to reproduce are based on the minikube setup.
You need to have a minimal Spark 3.5.0 setup with 1 driver and at least 1
executor and use python to access data through Spark. The query I used to test
this is
{code:java}
from pyspark.sql import SparkSession
logFile = '/opt/spark/work-dir/data.csv'
spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
df = spark.read.csv(logFile)
df.count()
{code}
However it doesn't matter if the data is local, or remote on a S3 storage, nor
if the data is plain text, CSV or Delta Table.
h3. Steps to reproduce:
# Install minikube
# Create a service account 'spark'
{code:java}
kubectl create sa spark{code}
# Bind the 'edit' role to the service account
{code:java}
kubectl create rolebinding spark-edit \
--clusterrole=edit \
--serviceaccount=default:spark \
--namespace=default{code}
# Create a service for spark
{code:java}
kubectl create -f service.yml{code}
# Create a Spark-Connect deployment with the default Spark docker image:
[https://hub.docker.com/_/spark] (do change the deployment.yml to point to the
kubernetes API endpoint
{code:java}
kubectl create -f deployment.yml{code}
# Add data to both the executor and the driver pods, e.g. login on the
terminal of the pods and run on both pods
{code:java}
touch data.csv
echo id,name > data.csv
echo 1,2 >> data.csv {code}
# Start a spark-remote session to access the newly created data. I logged in
on the driver pod and installed the necessary python packages:
{code:java}
python3 -m pip install pandas pyspark grpcio-tools grpcio-status pyarrow{code}
Started a python shell and executed:
{code:java}
from pyspark.sql import SparkSession
logFile = '/opt/spark/work-dir/data.csv'
spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
df = spark.read.csv(logFile)
df.count() {code}
h3. Necessary files:
Service.yml:
{code:java}
apiVersion: v1
kind: Service
metadata:
labels:
app: spark-connect
name: spark-connect
namespace: default
spec:
ipFamilies:
- IPv4
ports:
- name: connect-grpc
protocol: TCP
port: 15002 # Port the service listens on.
targetPort: 15002 # Port on the backing pods to which the service
forwards connections
- name: sparkui
protocol: TCP
port: 4040 # Port the service listens on.
targetPort: 4040 # Port on the backing pods to which the service forwards
connections
- name: spark-rpc
protocol: TCP
port: 7078 # Port the service listens on.
targetPort: 7078 # Port on the backing pods to which the service forwards
connections
- name: blockmanager
protocol: TCP
port: 7079 # Port the service listens on.
targetPort: 7079 # Port on the backing pods to which the service forwards
connections
internalTrafficPolicy: Cluster
type: ClusterIP
ipFamilyPolicy: SingleStack
sessionAffinity: None
selector:
app: spark-connect {code}
deployment.yml: (do replace the spark.master URL with the correct one for your
setup)
{code:java}
kind: Deployment
apiVersion: apps/v1
metadata:
name: spark-connect
namespace: default
uid: 3a1b448e-4594-47a9-95f6-a82ea4ac9341
resourceVersion: '6107'
generation: 23
creationTimestamp: '2023-10-31T13:35:46Z'
labels:
k8s-app: spark-connect
spec:
replicas: 1
selector:
matchLabels:
k8s-app: spark-connect
template:
metadata:
name: spark-connect
creationTimestamp: null
labels:
k8s-app: spark-connect
spec:
serviceAccount: spark
containers:
- name: spark-connect
image: spark
command:
- /opt/entrypoint.sh
- driver
args:
- '--class'
- org.apache.spark.sql.connect.service.SparkConnectServer
- '--name'
- spark-connect
- '--conf'
- spark.driver.blockManager.port=7079
- '--conf'
- spark.driver.port=7078
- '--conf'
- spark.driver.host=spark-connect
- '--conf'
- spark.master=k8s://https://<Kubernetes API Address eg
https://192.168.49.2:8443>
- '--conf'
- spark.kubernetes.namespace=default
- '--conf'
- spark.kubernetes.container.image=spark:latest
- '--conf'
- spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp
- '--conf'
- spark.driver.extraJavaOptions=-Divy.home=/tmp
- '--conf'
-
spark.kubernetes.driver.label.app.kubernetes.io/part-of=spark-connect
- '--conf'
- spark.kubernetes.executor.label.app=spark-connect
- '--conf'
- spark.executor.memory=1g
- '--conf'
- spark.executor.cores=1
- '--conf'
- spark.executor.instances=1
- '--conf'
- spark.kubernetes.executor.podNamePrefix=spark-connect
- '--packages'
- org.apache.spark:spark-connect_2.12:3.5.0
env:
- name: SPARK_DRIVER_BIND_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: Always
securityContext:
privileged: true
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
schedulerName: default-scheduler
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
revisionHistoryLimit: 10
progressDeadlineSeconds: 600 {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]