amoghrajesh commented on code in PR #62129:
URL: https://github.com/apache/airflow/pull/62129#discussion_r2851195025
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -553,12 +559,50 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod_template_file = next_job.pod_template_file
dag_id, task_id, run_id, try_number, map_index = key
+
+ pod_id = create_unique_id(dag_id, task_id)
+ secret_name: str | None = None
+
if len(command) == 1:
from airflow.executors.workloads import ExecuteTask
if isinstance(command[0], ExecuteTask):
workload = command[0]
- command = workload_to_command_args(workload)
+ secret_name = f"{WORKLOAD_SECRET_NAME_PREFIX}-{pod_id}"
+ labels: dict[str, str] = {
+ "airflow-workload-secret": "true",
+ "dag_id": make_safe_label_value(workload.ti.dag_id),
+ "task_id": make_safe_label_value(workload.ti.task_id),
+ "run_id": make_safe_label_value(workload.ti.run_id),
+ "try_number": str(workload.ti.try_number),
+ "ti_id": str(workload.ti.id),
+ }
+ if workload.ti.map_index is not None and workload.ti.map_index
>= 0:
+ labels["map_index"] = str(workload.ti.map_index)
+ try:
+ self.kube_client.create_namespaced_secret(
+ namespace=self.namespace,
+ body=client.V1Secret(
+ metadata=client.V1ObjectMeta(
+ name=secret_name,
+ namespace=self.namespace,
+ labels=labels,
+ ),
+ string_data={"workload.json":
workload.model_dump_json()},
+ ),
+ )
+ except ApiException as e:
+ if e.status == 403:
+ raise KubernetesApiPermissionError(
Review Comment:
Seemed like the right exception to raise 🤷🏽
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]