jedcunningham commented on code in PR #62129:
URL: https://github.com/apache/airflow/pull/62129#discussion_r2822897740
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -589,6 +609,26 @@ def run_next(self, next_job: KubernetesJob) -> None:
base_worker_pod=base_worker_pod,
with_mutation_hook=True,
)
+
+ if secret_name:
+ if pod.spec.volumes is None:
+ pod.spec.volumes = []
+ pod.spec.volumes.append(
Review Comment:
We should do this in `construct_pod` so that the final pod is sent to
`pod_mutation_hook`.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -618,6 +660,19 @@ def delete_pod(self, pod_name: str, namespace: str) ->
None:
# If the pod is already deleted
if str(e.status) != "404":
raise
+ self._delete_workload_secret(f"airflow-workload-{pod_name}", namespace)
Review Comment:
Here we are hard coding the "volume name" part.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -618,6 +660,19 @@ def delete_pod(self, pod_name: str, namespace: str) ->
None:
# If the pod is already deleted
if str(e.status) != "404":
raise
+ self._delete_workload_secret(f"airflow-workload-{pod_name}", namespace)
+
+ def _delete_workload_secret(self, secret_name: str, namespace: str) ->
None:
Review Comment:
We should instead patch the secret and set `ownerReferences` to the pod that
is using it. k8s will then automatically delete the secret when the pod is
deleted.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -553,12 +557,28 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod_template_file = next_job.pod_template_file
dag_id, task_id, run_id, try_number, map_index = key
+
+ pod_id = create_unique_id(dag_id, task_id)
+ secret_name = ""
+
if len(command) == 1:
from airflow.executors.workloads import ExecuteTask
if isinstance(command[0], ExecuteTask):
workload = command[0]
- command = workload_to_command_args(workload)
+ secret_name = f"{WORKLOAD_SECRET_VOLUME_NAME}-{pod_id}"
Review Comment:
Using a "volume name" here is a bit odd...
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -553,12 +557,28 @@ def run_next(self, next_job: KubernetesJob) -> None:
pod_template_file = next_job.pod_template_file
dag_id, task_id, run_id, try_number, map_index = key
+
+ pod_id = create_unique_id(dag_id, task_id)
+ secret_name = ""
+
if len(command) == 1:
from airflow.executors.workloads import ExecuteTask
if isinstance(command[0], ExecuteTask):
workload = command[0]
- command = workload_to_command_args(workload)
+ secret_name = f"{WORKLOAD_SECRET_VOLUME_NAME}-{pod_id}"
+ self.kube_client.create_namespaced_secret(
+ namespace=self.namespace,
+ body=client.V1Secret(
+ metadata=client.V1ObjectMeta(
+ name=secret_name,
+ namespace=self.namespace,
+ labels={"airflow-workload-secret": "true"},
Review Comment:
We should add more labels to here, like dag_id, run_id, task_id, map_index.
And/or ti id.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]