GitHub user sdaberdaku created a discussion: Proposal: contribute 
`PySparkOnK8sOperator` + `@task.pyspark_on_k8s` to `apache.spark` provider

# Proposal: contribute `PySparkOnK8sOperator` + `@task.pyspark_on_k8s` to 
`apache.spark` provider

## Summary

I maintain a third-party provider, 
[`apache-airflow-providers-pysparkonk8s`](https://github.com/sdaberdaku/apache-airflow-providers-pysparkonk8s),
 that adds two ways to run PySpark code as Airflow tasks on a Kubernetes-hosted 
Airflow deployment:

- **`PySparkOnK8sOperator`** — a `PythonOperator` subclass that initializes a 
`SparkSession` and injects it into the user's Python callable as a `spark` 
kwarg.
- **`@task.pyspark_on_k8s`** decorator — the TaskFlow-API equivalent.

I would like to contribute this code upstream and merge it into the existing 
`providers/apache/spark/` provider. Before I open a draft PR, I want to (a) 
confirm `apache.spark` is the right home, and (b) find a committer interested 
in reviewing.

## What it does

The provider initializes the Spark cluster in one of three modes, configured 
via dataclasses:

- **`client` (default)** — Spark Driver runs **inside the Airflow worker pod** 
executing the task. Executor pods are provisioned through the Kubernetes API. 
The worker pod's resource requests/limits are dynamically mutated via Airflow's 
`executor_config["pod_override"]` mechanism to match the configured Spark 
driver resources.
- **`local`** — single-JVM driver+executor inside the worker pod. For 
dev/testing.
- **`connect`** — connects to an existing Spark Connect cluster.

Because the driver coincides with the worker pod, user code has native access 
to Airflow Variables, Connections, and XComs from inside the Spark callable, 
without needing `spark-submit` or a sidecar.

The operator also auto-generates pod affinity rules per task run (using a fresh 
UUID label) so that:
1. executors prefer the driver's node,
2. then other executors' nodes,
3. then the driver's availability zone,
4. then other executors' availability zones.

This means concurrent PySpark tasks don't interfere with each other's pod 
placement.

## Why not just `SparkSubmitOperator`?

The existing `apache.spark` provider's `SparkSubmitOperator` shells out to 
`spark-submit` and runs the driver in a separate process or pod. That works, 
but:

- No TaskFlow / `@task` integration — you cannot return a value from PySpark 
code and have it flow as XCom.
- No automatic worker-pod resource mutation — the Airflow worker remains a thin 
shell while the driver runs elsewhere, doubling the pod count per task.
- No automatic per-task executor pod affinity.
- Airflow Connections/Variables/XComs aren't accessible from inside the Spark 
code without manual plumbing.

The proposed operator addresses these specifically for the K8s-executor 
deployment topology, which is increasingly common.

## Minimal usage example

```python
from airflow.decorators import task
from pyspark.sql import SparkSession

@task.pyspark_on_k8s
def my_task(spark: SparkSession) -> int:
    df = spark.range(1_000_000)
    return df.count()
```

```python
from airflow.providers.pysparkonk8s.operators import PySparkOnK8sOperator
from airflow.providers.pysparkonk8s.config import SparkDriverConf, 
SparkExecutorConf
from airflow.providers.pysparkonk8s.resources import CPU, Memory

PySparkOnK8sOperator(
    task_id="spark_task",
    python_callable=my_callable,
    spark_driver_conf=SparkDriverConf(cores=CPU.cores(2), 
memory=Memory.gibibytes(4)),
    spark_executor_conf=SparkExecutorConf(instances=4, cores=CPU.cores(2), 
memory=Memory.gibibytes(8)),
)
```

## Scope of the contribution

What I'd contribute, in `providers/apache/spark/`:

- `operators/pyspark_on_k8s.py` — operator
- `decorators/pyspark_on_k8s.py` — decorator
- `config/` — `SparkBaseConf`, `SparkDriverConf`, `SparkExecutorConf`, 
`SparkDeployMode`, `Sentinel`
- `resources/` — `CPU` and `Memory` value types with K8s↔JVM conversion
- Unit tests (the existing repo has unit tests under `tests/`)
- Operator-level docs page

Additionally, in the official Airflow Helm chart (`chart/`):

- An **opt-in** values flag that, when enabled, renders a `Role` + 
`RoleBinding` granting the worker `ServiceAccount` the permissions required to 
create/list/watch/delete executor `Pod`s, `ConfigMap`s, and 
`PersistentVolumeClaim`s in the worker's namespace. This is the RBAC currently 
shipped by my third-party `pysparkonk8s-addon` chart; folding it into the 
official chart removes a manual install step for users.

Cross-provider dependency: the code imports from `apache.cncf.kubernetes` (for 
`PodGenerator` and `create_pod_id`). I'd declare this in `provider.yaml` rather 
than vendoring helpers.

## Open questions for maintainers

1. **Placement.** Is `providers/apache/spark/` the right home? Alternatives I 
considered: `providers/cncf/kubernetes/` (since it's K8s-deployment-specific), 
or a new `providers/apache/spark-on-k8s/` sub-provider. I lean toward folding 
it into `apache.spark`, but defer to maintainers.
2. **Naming.** Current class name is `PySparkOnK8sOperator`. Open to 
suggestions if `apache.spark` has naming conventions I should follow.
3. **Sponsor.** Per `ACCEPTING_PROVIDERS.rst`, new community providers require 
committer sponsorship. Since I'm proposing to merge into an existing provider 
rather than create a new one, my reading is that standard PR review applies and 
no separate sponsorship is required. Please correct me if I have this wrong.
4. **Cross-provider import policy.** Importing from `apache.cncf.kubernetes` — 
is this fine if declared in `provider.yaml`, or is there a preferred pattern?
5. **Helm chart RBAC addition.** Would chart maintainers accept an opt-in 
`Role` + `RoleBinding` for executor-pod management, gated behind a values flag 
and disabled by default? I'd open this as a separate PR scoped to `chart/` 
after the provider PR lands, but want to flag it now since it's part of the 
overall donation.
6. **Reviewer.** Is anyone on the `apache.spark`, `cncf.kubernetes`, or Helm 
chart side interested in reviewing? I'm happy to do all the restructuring work; 
I just want to make sure a reviewer is willing to engage before I invest the 
time.

## About me

I'm the sole maintainer of the existing third-party provider. I've signed the 
ASF ICLA (or will before opening the PR). The project has been in production 
use for a few years now. I'm comfortable losing release control by moving to 
Airflow's release cadence, and I'm happy to maintain the contributed code 
long-term as a regular contributor.

Happy to demo a working DAG, share resource-mutation traces, or walk through 
the affinity logic if useful. Thanks for considering this.


GitHub link: https://github.com/apache/airflow/discussions/67165

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to