danccooper commented on a change in pull request #6377: [AIRFLOW-5589] monitor
pods by labels instead of names
URL: https://github.com/apache/airflow/pull/6377#discussion_r338131586
##########
File path: airflow/contrib/operators/kubernetes_pod_operator.py
##########
@@ -112,55 +113,61 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
+ @staticmethod
+ def create_labels_for_pod(context):
+ """
+ Generate labels for the pod s.t. we can track it in case of Operator
crash
+
+ :param context:
+ :return:
+ """
+ labels = {
+ 'dag_id': context['dag'].dag_id,
+ 'task_id': context['task'].task_id,
+ 'exec_date': context['ts'],
+ 'try_number': context['ti'].try_number,
+ }
+ # In the case of sub dags this is just useful
+ if context['dag'].parent_dag:
+ labels['parent_dag_id'] = context['dag'].parent_dag.dag_id
+ # Ensure that label is valid for Kube,
+ # and if not trucate/remove invalid chars and replace with short hash.
+ for label_id, label in labels.items():
+ safe_label = pod_generator.make_safe_label_value(str(label))
+ labels[label_id] = safe_label
+ return labels
+
def execute(self, context):
try:
client = kube_client.get_kube_client(in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file)
- pod = pod_generator.PodGenerator(
- image=self.image,
- namespace=self.namespace,
- cmds=self.cmds,
- args=self.arguments,
- labels=self.labels,
- name=self.name,
- envs=self.env_vars,
- extract_xcom=self.do_xcom_push,
- image_pull_policy=self.image_pull_policy,
- node_selectors=self.node_selectors,
- annotations=self.annotations,
- affinity=self.affinity,
- image_pull_secrets=self.image_pull_secrets,
- service_account_name=self.service_account_name,
- hostnetwork=self.hostnetwork,
- tolerations=self.tolerations,
- configmaps=self.configmaps,
- security_context=self.security_context,
- dnspolicy=self.dnspolicy,
- resources=self.resources,
- pod=self.full_pod_spec,
- ).gen_pod()
-
- pod = append_to_pod(pod, self.ports)
- pod = append_to_pod(pod, self.pod_runtime_info_envs)
- pod = append_to_pod(pod, self.volumes)
- pod = append_to_pod(pod, self.volume_mounts)
- pod = append_to_pod(pod, self.secrets)
-
- self.pod = pod
-
- launcher = pod_launcher.PodLauncher(kube_client=client,
- extract_xcom=self.do_xcom_push)
-
- try:
- (final_state, result) = launcher.run_pod(
- pod,
- startup_timeout=self.startup_timeout_seconds,
- get_logs=self.get_logs)
- finally:
- if self.is_delete_operator_pod:
- launcher.delete_pod(pod)
+ # Add combination of labels to uniquely identify a running pod
+ labels = self.create_labels_for_pod(context)
+
+ label_selector = self._get_pod_identifying_label_string(labels)
+
+ pod_list = client.list_namespaced_pod(self.namespace,
label_selector=label_selector,
+ include_uninitialized=True)
+
+ if len(pod_list.items) > 1:
+ raise AirflowException(
+ 'More than one pod running with labels: '
+ '{label_selector}'.format(label_selector=label_selector))
+
+ launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
+
+ if len(pod_list.items) == 1 and
self._try_numbers_do_not_match(context, pod_list[0]):
+ self.log.info("found a running pod with labels %s."
Review comment:
@dimberman I think the logic looks sounds, you just need to shift this log
message into the 'elif 1' & I guess we want to info here that we've found a
previous pod & will kill it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services