GitHub user dmelchor-stripe edited a discussion: Multi-cluster 
`KubernetesExecutor`

Hey there!

Wondering if anyone has worked on multi-cluster support for the 
KubernetesExecutor. At Stripe we have multiple shards where we want to schedule 
pods in, but it seems like the default executor is created with the expectation 
of a single cluster.

Our current idea is somewhat hacky: we're going to modify our fork 
(unfortunately) and add a class attribute called cluster_context to the 
KubernetesExecutor and dynamically create a set of executors using Python's 
dynamic class creation and use 2.10's Hybrid executors to select the right 
shard executor for each task.

I'd love to hear your ideas if anyone has tackled this before :pray:

### Approach

Dynamically create all executors
```python
# my.custom.executors.module
from airflow.providers... import KubernetesExecutor

for shard in SHARDS:
    name = f"KubernetesExecutor{shard}"
    new_cls = type(name, (KubernetesExecutor,), {"cluster_context": shard})
    globals()[name] = cls
```

```python
Configure the executors var dynamically
executors = [
    f"my.custom.executors.module.KubernetesExecutor{shard}"
    for shard in SHARDS
]
os.environ["AIRFLOW__CORE__EXECUTORS"] = ",".join(executors)
```

On the fork we'd make small changes like these:
```diff
class KubernetesExecutor(BaseExecutor):
     RUNNING_POD_LOG_LINES = 100
     supports_ad_hoc_ti_run: bool = True
+    cluster_context: str | None = None

     def __init__(self):
         self.kube_config = KubeConfig()
@@ -306,12 +307,13 @@ class KubernetesExecutor(BaseExecutor):
         )
         from airflow.providers.cncf.kubernetes.kube_client import 
get_kube_client

-        self.kube_client = get_kube_client()
+        self.kube_client = 
get_kube_client(cluster_context=self.cluster_context)
         self.kube_scheduler = AirflowKubernetesScheduler(
             kube_config=self.kube_config,
             result_queue=self.result_queue,
             kube_client=self.kube_client,
             scheduler_job_id=self.scheduler_job_id,
+            cluster_context=self.cluster_context,
         )
         self.event_scheduler = EventScheduler()
```

GitHub link: https://github.com/apache/airflow/discussions/46247

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to