danccooper commented on a change in pull request #6377: [AIRFLOW-5589] monitor 
pods by labels instead of names
URL: https://github.com/apache/airflow/pull/6377#discussion_r338131586
 
 

 ##########
 File path: airflow/contrib/operators/kubernetes_pod_operator.py
 ##########
 @@ -112,55 +113,61 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
     """
     template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
+    @staticmethod
+    def create_labels_for_pod(context):
+        """
+        Generate labels for the pod s.t. we can track it in case of Operator 
crash
+
+        :param context:
+        :return:
+        """
+        labels = {
+            'dag_id': context['dag'].dag_id,
+            'task_id': context['task'].task_id,
+            'exec_date': context['ts'],
+            'try_number': context['ti'].try_number,
+        }
+        # In the case of sub dags this is just useful
+        if context['dag'].parent_dag:
+            labels['parent_dag_id'] = context['dag'].parent_dag.dag_id
+        # Ensure that label is valid for Kube,
+        # and if not trucate/remove invalid chars and replace with short hash.
+        for label_id, label in labels.items():
+            safe_label = pod_generator.make_safe_label_value(str(label))
+            labels[label_id] = safe_label
+        return labels
+
     def execute(self, context):
         try:
             client = kube_client.get_kube_client(in_cluster=self.in_cluster,
                                                  
cluster_context=self.cluster_context,
                                                  config_file=self.config_file)
 
-            pod = pod_generator.PodGenerator(
-                image=self.image,
-                namespace=self.namespace,
-                cmds=self.cmds,
-                args=self.arguments,
-                labels=self.labels,
-                name=self.name,
-                envs=self.env_vars,
-                extract_xcom=self.do_xcom_push,
-                image_pull_policy=self.image_pull_policy,
-                node_selectors=self.node_selectors,
-                annotations=self.annotations,
-                affinity=self.affinity,
-                image_pull_secrets=self.image_pull_secrets,
-                service_account_name=self.service_account_name,
-                hostnetwork=self.hostnetwork,
-                tolerations=self.tolerations,
-                configmaps=self.configmaps,
-                security_context=self.security_context,
-                dnspolicy=self.dnspolicy,
-                resources=self.resources,
-                pod=self.full_pod_spec,
-            ).gen_pod()
-
-            pod = append_to_pod(pod, self.ports)
-            pod = append_to_pod(pod, self.pod_runtime_info_envs)
-            pod = append_to_pod(pod, self.volumes)
-            pod = append_to_pod(pod, self.volume_mounts)
-            pod = append_to_pod(pod, self.secrets)
-
-            self.pod = pod
-
-            launcher = pod_launcher.PodLauncher(kube_client=client,
-                                                extract_xcom=self.do_xcom_push)
-
-            try:
-                (final_state, result) = launcher.run_pod(
-                    pod,
-                    startup_timeout=self.startup_timeout_seconds,
-                    get_logs=self.get_logs)
-            finally:
-                if self.is_delete_operator_pod:
-                    launcher.delete_pod(pod)
+            # Add combination of labels to uniquely identify a running pod
+            labels = self.create_labels_for_pod(context)
+
+            label_selector = self._get_pod_identifying_label_string(labels)
+
+            pod_list = client.list_namespaced_pod(self.namespace, 
label_selector=label_selector,
+                                                  include_uninitialized=True)
+
+            if len(pod_list.items) > 1:
+                raise AirflowException(
+                    'More than one pod running with labels: '
+                    '{label_selector}'.format(label_selector=label_selector))
+
+            launcher = pod_launcher.PodLauncher(kube_client=client, 
extract_xcom=self.do_xcom_push)
+
+            if len(pod_list.items) == 1 and 
self._try_numbers_do_not_match(context, pod_list[0]):
+                self.log.info("found a running pod with labels %s."
 
 Review comment:
   @dimberman I think the logic looks sounds, you just need to shift this log 
message into the 'elif 1' & I guess we want to info here that we've found a 
previous pod & will kill it

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to