GitHub user sdaberdaku created a discussion: Proposal: contribute `PySparkOnK8sOperator` + `@task.pyspark_on_k8s` to `apache.spark` provider
# Proposal: contribute `PySparkOnK8sOperator` + `@task.pyspark_on_k8s` to `apache.spark` provider ## Summary I maintain a third-party provider, [`apache-airflow-providers-pysparkonk8s`](https://github.com/sdaberdaku/apache-airflow-providers-pysparkonk8s), that adds two ways to run PySpark code as Airflow tasks on a Kubernetes-hosted Airflow deployment: - **`PySparkOnK8sOperator`** — a `PythonOperator` subclass that initializes a `SparkSession` and injects it into the user's Python callable as a `spark` kwarg. - **`@task.pyspark_on_k8s`** decorator — the TaskFlow-API equivalent. I would like to contribute this code upstream and merge it into the existing `providers/apache/spark/` provider. Before I open a draft PR, I want to (a) confirm `apache.spark` is the right home, and (b) find a committer interested in reviewing. ## What it does The provider initializes the Spark cluster in one of three modes, configured via dataclasses: - **`client` (default)** — Spark Driver runs **inside the Airflow worker pod** executing the task. Executor pods are provisioned through the Kubernetes API. The worker pod's resource requests/limits are dynamically mutated via Airflow's `executor_config["pod_override"]` mechanism to match the configured Spark driver resources. - **`local`** — single-JVM driver+executor inside the worker pod. For dev/testing. - **`connect`** — connects to an existing Spark Connect cluster. Because the driver coincides with the worker pod, user code has native access to Airflow Variables, Connections, and XComs from inside the Spark callable, without needing `spark-submit` or a sidecar. The operator also auto-generates pod affinity rules per task run (using a fresh UUID label) so that: 1. executors prefer the driver's node, 2. then other executors' nodes, 3. then the driver's availability zone, 4. then other executors' availability zones. This means concurrent PySpark tasks don't interfere with each other's pod placement. ## Why not just `SparkSubmitOperator`? The existing `apache.spark` provider's `SparkSubmitOperator` shells out to `spark-submit` and runs the driver in a separate process or pod. That works, but: - No TaskFlow / `@task` integration — you cannot return a value from PySpark code and have it flow as XCom. - No automatic worker-pod resource mutation — the Airflow worker remains a thin shell while the driver runs elsewhere, doubling the pod count per task. - No automatic per-task executor pod affinity. - Airflow Connections/Variables/XComs aren't accessible from inside the Spark code without manual plumbing. The proposed operator addresses these specifically for the K8s-executor deployment topology, which is increasingly common. ## Minimal usage example ```python from airflow.decorators import task from pyspark.sql import SparkSession @task.pyspark_on_k8s def my_task(spark: SparkSession) -> int: df = spark.range(1_000_000) return df.count() ``` ```python from airflow.providers.pysparkonk8s.operators import PySparkOnK8sOperator from airflow.providers.pysparkonk8s.config import SparkDriverConf, SparkExecutorConf from airflow.providers.pysparkonk8s.resources import CPU, Memory PySparkOnK8sOperator( task_id="spark_task", python_callable=my_callable, spark_driver_conf=SparkDriverConf(cores=CPU.cores(2), memory=Memory.gibibytes(4)), spark_executor_conf=SparkExecutorConf(instances=4, cores=CPU.cores(2), memory=Memory.gibibytes(8)), ) ``` ## Scope of the contribution What I'd contribute, in `providers/apache/spark/`: - `operators/pyspark_on_k8s.py` — operator - `decorators/pyspark_on_k8s.py` — decorator - `config/` — `SparkBaseConf`, `SparkDriverConf`, `SparkExecutorConf`, `SparkDeployMode`, `Sentinel` - `resources/` — `CPU` and `Memory` value types with K8s↔JVM conversion - Unit tests (the existing repo has unit tests under `tests/`) - Operator-level docs page Additionally, in the official Airflow Helm chart (`chart/`): - An **opt-in** values flag that, when enabled, renders a `Role` + `RoleBinding` granting the worker `ServiceAccount` the permissions required to create/list/watch/delete executor `Pod`s, `ConfigMap`s, and `PersistentVolumeClaim`s in the worker's namespace. This is the RBAC currently shipped by my third-party `pysparkonk8s-addon` chart; folding it into the official chart removes a manual install step for users. Cross-provider dependency: the code imports from `apache.cncf.kubernetes` (for `PodGenerator` and `create_pod_id`). I'd declare this in `provider.yaml` rather than vendoring helpers. ## Open questions for maintainers 1. **Placement.** Is `providers/apache/spark/` the right home? Alternatives I considered: `providers/cncf/kubernetes/` (since it's K8s-deployment-specific), or a new `providers/apache/spark-on-k8s/` sub-provider. I lean toward folding it into `apache.spark`, but defer to maintainers. 2. **Naming.** Current class name is `PySparkOnK8sOperator`. Open to suggestions if `apache.spark` has naming conventions I should follow. 3. **Sponsor.** Per `ACCEPTING_PROVIDERS.rst`, new community providers require committer sponsorship. Since I'm proposing to merge into an existing provider rather than create a new one, my reading is that standard PR review applies and no separate sponsorship is required. Please correct me if I have this wrong. 4. **Cross-provider import policy.** Importing from `apache.cncf.kubernetes` — is this fine if declared in `provider.yaml`, or is there a preferred pattern? 5. **Helm chart RBAC addition.** Would chart maintainers accept an opt-in `Role` + `RoleBinding` for executor-pod management, gated behind a values flag and disabled by default? I'd open this as a separate PR scoped to `chart/` after the provider PR lands, but want to flag it now since it's part of the overall donation. 6. **Reviewer.** Is anyone on the `apache.spark`, `cncf.kubernetes`, or Helm chart side interested in reviewing? I'm happy to do all the restructuring work; I just want to make sure a reviewer is willing to engage before I invest the time. ## About me I'm the sole maintainer of the existing third-party provider. I've signed the ASF ICLA (or will before opening the PR). The project has been in production use for a few years now. I'm comfortable losing release control by moving to Airflow's release cadence, and I'm happy to maintain the contributed code long-term as a regular contributor. Happy to demo a working DAG, share resource-mutation traces, or walk through the affinity logic if useful. Thanks for considering this. GitHub link: https://github.com/apache/airflow/discussions/67165 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
