dholleran-lendico commented on pull request #7428:
URL: https://github.com/apache/airflow/pull/7428#issuecomment-704994507
hi @smaley07,
You can use the code provided in this PR to make a custom pod launcher and
operator. Then add the files to your plugin directory so that you can use the
operator in your dags
for the pod operator:
```
from airflow.contrib.operators.kubernetes_pod_operator import
KubernetesPodOperator
from airflow.exceptions import AirflowException
from airflow.contrib.kubernetes import kube_client, pod_generator
from utils.custom_pod_launcher import CustomPodLauncher
from airflow.utils.state import State
from airflow.version import version as airflow_version
class CustomPodOperator(KubernetesPodOperator):
def execute(self, context):
try:
if self.in_cluster is not None:
client =
kube_client.get_kube_client(in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file)
else:
client =
kube_client.get_kube_client(cluster_context=self.cluster_context,
config_file=self.config_file)
# Add Airflow Version to the label
# And a label to identify that pod is launched by
KubernetesPodOperator
self.labels.update(
{
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_pod_operator': 'True',
}
)
gen = pod_generator.PodGenerator()
for port in self.ports:
gen.add_port(port)
for mount in self.volume_mounts:
gen.add_mount(mount)
for volume in self.volumes:
gen.add_volume(volume)
pod = gen.make_pod(
namespace=self.namespace,
image=self.image,
pod_id=self.name,
cmds=self.cmds,
arguments=self.arguments,
labels=self.labels,
)
pod.service_account_name = self.service_account_name
pod.secrets = self.secrets
pod.envs = self.env_vars
pod.image_pull_policy = self.image_pull_policy
pod.image_pull_secrets = self.image_pull_secrets
pod.annotations = self.annotations
pod.resources = self.resources
pod.affinity = self.affinity
pod.node_selectors = self.node_selectors
pod.hostnetwork = self.hostnetwork
pod.tolerations = self.tolerations
pod.configmaps = self.configmaps
pod.security_context = self.security_context
pod.pod_runtime_info_envs = self.pod_runtime_info_envs
pod.dnspolicy = self.dnspolicy
launcher = CustomPodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
try:
(final_state, result) = launcher.run_pod(
pod,
startup_timeout=self.startup_timeout_seconds,
get_logs=self.get_logs)
finally:
if self.is_delete_operator_pod:
launcher.delete_pod(pod)
if final_state != State.SUCCESS:
raise AirflowException(
'Pod returned a failure:
{state}'.format(state=final_state)
)
if self.do_xcom_push:
return result
except AirflowException as ex:
raise AirflowException('Pod Launching failed:
{error}'.format(error=ex))
```
and for the launcher:
```
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from requests.exceptions import BaseHTTPError
from airflow import AirflowException
from typing import Generator, List
from kubernetes.client.models.v1_pod import V1Pod
import math
import time
from datetime import datetime as dt
POD_LOGS_POLL_INTERVAL_SECONDS = 5
class CustomPodLauncher(PodLauncher):
def _request_pod_log_chunk(self, pod: V1Pod, since_seconds: int) -> str:
return self._client.read_namespaced_pod_log(
name=pod.name,
namespace=pod.namespace,
container='base',
follow=False,
since_seconds=since_seconds,
timestamps=True,
_preload_content=False
)
def _read_pod_log_chunk(self, pod: V1Pod, last_line: bytes) ->
Generator[bytes, None, None]:
# The CoreV1Api doesn't support since_time even though the API does,
so we must use
# since_seconds. Add 15 seconds of buffer just in case of NTP woes
if last_line:
# Strip fractional part because strptime doesn't support
nanosecond parsing
timestamp = last_line.split(b" ", 1)[0]
last_chunk_dt = dt.strptime(timestamp.split(b".",
1)[0].decode("utf-8"),
"%Y-%m-%dT%H:%M:%S")
since_time = last_chunk_dt
else:
since_time = dt.utcfromtimestamp(0)
since_seconds = math.ceil((dt.utcnow() - since_time).total_seconds()
+ 15)
resp = self._request_pod_log_chunk(pod, since_seconds)
# If we've already read a chunk, skip until we find a matching line
# Just in case since_seconds doesn't get everything we want, keep
the previous lines in a buffer
buffered_lines = [] # type: List[bytes]
skipping_lines = True
for line in resp:
if skipping_lines:
if line == last_line:
self.log.debug("Found duplicate line. Stopping log
skipping")
buffered_lines = []
skipping_lines = False
else:
buffered_lines.append(line)
else:
yield line
if buffered_lines:
self.log.warning(
"End of previous log chunk not found in next chunk. May
indicated log line loss"
)
for buffered_line in buffered_lines:
yield buffered_line
def read_pod_logs(self, pod: V1Pod) -> Generator[bytes, None, None]:
"""
Reads pod logs from the Kubernetes API until the pod stops.
This explicitly does not use the `follow` parameter due to issues
around log rotation
(https://github.com/kubernetes/kubernetes/issues/28369). Once that is
fixed, using follow instead of polling for pod status should be fine,
but deduping on timestamp will still be desired in case the
underlying
request fails
:param pod:
:return:
"""
# The timestamps returned from the Kubernetes API are in
nanoseconds, and appear
# to never duplicate across lines so we can use the timestamp plus
the line
# content to deduplicate log lines across multiple runs
last_line = b""
# We use a variable here instead of looping on self.pod_is_running so
# that we can get one more read in the loop before breaking out
pod_is_running = True
try:
while pod_is_running:
pod_is_running = self.base_container_is_running(pod)
if not pod_is_running:
self.log.info("pod stopped, pulling logs one more time")
for line in self._read_pod_log_chunk(pod, last_line):
timestamp, log_line = line.split(b" ", 1)
yield log_line
last_line = line
time.sleep(POD_LOGS_POLL_INTERVAL_SECONDS)
except BaseHTTPError as e:
raise AirflowException(
'There was an error reading the kubernetes API: {}'.format(e)
)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]