dholleran-lendico commented on pull request #7428:
URL: https://github.com/apache/airflow/pull/7428#issuecomment-704994507


   hi @smaley07,
   
   You can use the code provided in this PR to make a custom pod launcher and 
operator. Then add the files to your plugin directory so that you can use the 
operator in your dags
   
   for the pod operator:
   
   ```
   from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
   from airflow.exceptions import AirflowException
   from airflow.contrib.kubernetes import kube_client, pod_generator
   from utils.custom_pod_launcher import CustomPodLauncher
   from airflow.utils.state import State
   from airflow.version import version as airflow_version
   
   
   class CustomPodOperator(KubernetesPodOperator):
   
       def execute(self, context):
           try:
               if self.in_cluster is not None:
                   client = 
kube_client.get_kube_client(in_cluster=self.in_cluster,
                                                        
cluster_context=self.cluster_context,
                                                        
config_file=self.config_file)
               else:
                   client = 
kube_client.get_kube_client(cluster_context=self.cluster_context,
                                                        
config_file=self.config_file)
   
               # Add Airflow Version to the label
               # And a label to identify that pod is launched by 
KubernetesPodOperator
               self.labels.update(
                   {
                       'airflow_version': airflow_version.replace('+', '-'),
                       'kubernetes_pod_operator': 'True',
                   }
               )
   
               gen = pod_generator.PodGenerator()
   
               for port in self.ports:
                   gen.add_port(port)
               for mount in self.volume_mounts:
                   gen.add_mount(mount)
               for volume in self.volumes:
                   gen.add_volume(volume)
   
               pod = gen.make_pod(
                   namespace=self.namespace,
                   image=self.image,
                   pod_id=self.name,
                   cmds=self.cmds,
                   arguments=self.arguments,
                   labels=self.labels,
               )
   
               pod.service_account_name = self.service_account_name
               pod.secrets = self.secrets
               pod.envs = self.env_vars
               pod.image_pull_policy = self.image_pull_policy
               pod.image_pull_secrets = self.image_pull_secrets
               pod.annotations = self.annotations
               pod.resources = self.resources
               pod.affinity = self.affinity
               pod.node_selectors = self.node_selectors
               pod.hostnetwork = self.hostnetwork
               pod.tolerations = self.tolerations
               pod.configmaps = self.configmaps
               pod.security_context = self.security_context
               pod.pod_runtime_info_envs = self.pod_runtime_info_envs
               pod.dnspolicy = self.dnspolicy
   
               launcher = CustomPodLauncher(kube_client=client,
                                                   
extract_xcom=self.do_xcom_push)
               try:
                   (final_state, result) = launcher.run_pod(
                       pod,
                       startup_timeout=self.startup_timeout_seconds,
                       get_logs=self.get_logs)
               finally:
                   if self.is_delete_operator_pod:
                       launcher.delete_pod(pod)
   
               if final_state != State.SUCCESS:
                   raise AirflowException(
                       'Pod returned a failure: 
{state}'.format(state=final_state)
                   )
               if self.do_xcom_push:
                   return result
           except AirflowException as ex:
               raise AirflowException('Pod Launching failed: 
{error}'.format(error=ex))
   ```
   and for the launcher:
   ```
   from airflow.contrib.kubernetes.pod_launcher import PodLauncher
   from requests.exceptions import BaseHTTPError
   from airflow import AirflowException
   from typing import Generator, List
   from kubernetes.client.models.v1_pod import V1Pod
   import math
   import time
   from datetime import datetime as dt
   
   POD_LOGS_POLL_INTERVAL_SECONDS = 5
   
   class CustomPodLauncher(PodLauncher):
   
       def _request_pod_log_chunk(self, pod: V1Pod, since_seconds: int) -> str:
           return self._client.read_namespaced_pod_log(
               name=pod.name,
               namespace=pod.namespace,
               container='base',
               follow=False,
               since_seconds=since_seconds,
               timestamps=True,
               _preload_content=False
           )
   
       def _read_pod_log_chunk(self, pod: V1Pod, last_line: bytes) -> 
Generator[bytes, None, None]:
           # The CoreV1Api doesn't support since_time even though the API does, 
so we must use
           # since_seconds. Add 15 seconds of buffer just in case of NTP woes
           if last_line:
               # Strip fractional part because strptime doesn't support 
nanosecond parsing
               timestamp = last_line.split(b" ", 1)[0]
               last_chunk_dt = dt.strptime(timestamp.split(b".", 
1)[0].decode("utf-8"),
                                           "%Y-%m-%dT%H:%M:%S")
               since_time = last_chunk_dt
           else:
               since_time = dt.utcfromtimestamp(0)
           since_seconds = math.ceil((dt.utcnow() - since_time).total_seconds() 
+ 15)
           resp = self._request_pod_log_chunk(pod, since_seconds)
           # If we've already read a chunk, skip until we find a matching line
           # Just in case since_seconds doesn't get everything we want, keep 
the previous lines in a buffer
           buffered_lines = []  # type: List[bytes]
           skipping_lines = True
           for line in resp:
               if skipping_lines:
                   if line == last_line:
                       self.log.debug("Found duplicate line. Stopping log 
skipping")
                       buffered_lines = []
                       skipping_lines = False
                   else:
                       buffered_lines.append(line)
               else:
                   yield line
   
           if buffered_lines:
               self.log.warning(
                   "End of previous log chunk not found in next chunk. May 
indicated log line loss"
               )
               for buffered_line in buffered_lines:
                   yield buffered_line
   
       def read_pod_logs(self, pod: V1Pod) -> Generator[bytes, None, None]:
           """
           Reads pod logs from the Kubernetes API until the pod stops.
           This explicitly does not use the `follow` parameter due to issues
           around log rotation
           (https://github.com/kubernetes/kubernetes/issues/28369). Once that is
           fixed, using follow instead of polling for pod status should be fine,
           but deduping on timestamp will still be desired in case the 
underlying
           request fails
           :param pod:
           :return:
           """
   
           # The timestamps returned from the Kubernetes API are in 
nanoseconds, and appear
           # to never duplicate across lines so we can use the timestamp plus 
the line
           # content to deduplicate log lines across multiple runs
           last_line = b""
           # We use a variable here instead of looping on self.pod_is_running so
           # that we can get one more read in the loop before breaking out
           pod_is_running = True
   
           try:
               while pod_is_running:
                   pod_is_running = self.base_container_is_running(pod)
                   if not pod_is_running:
                       self.log.info("pod stopped, pulling logs one more time")
   
                   for line in self._read_pod_log_chunk(pod, last_line):
                       timestamp, log_line = line.split(b" ", 1)
                       yield log_line
                       last_line = line
   
                   time.sleep(POD_LOGS_POLL_INTERVAL_SECONDS)
           except BaseHTTPError as e:
               raise AirflowException(
                   'There was an error reading the kubernetes API: {}'.format(e)
               )
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to