This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 794b153fcd7 fix: use instance base_container_name to fetch logs on
trigger_reentry (#42960)
794b153fcd7 is described below
commit 794b153fcd72f1b2daf6b57ea14ee146d9c2a171
Author: Jean-Eudes Peloye <[email protected]>
AuthorDate: Sat Oct 12 16:28:30 2024 +0200
fix: use instance base_container_name to fetch logs on trigger_reentry
(#42960)
---
.../airflow/providers/cncf/kubernetes/operators/pod.py | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git a/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 68081b5a670..62af050a50c 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -31,7 +31,7 @@ from collections.abc import Container
from contextlib import AbstractContextManager
from enum import Enum
from functools import cached_property
-from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
+from typing import TYPE_CHECKING, Any, Callable, Iterable, Literal, Sequence
import kubernetes
import tenacity
@@ -91,7 +91,6 @@ from airflow.version import version as airflow_version
if TYPE_CHECKING:
import jinja2
from pendulum import DateTime
- from typing_extensions import Literal
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.utils.context import Context
@@ -285,7 +284,8 @@ class KubernetesPodOperator(BaseOperator):
startup_timeout_seconds: int = 120,
startup_check_interval_seconds: int = 5,
get_logs: bool = True,
- container_logs: Iterable[str] | str | Literal[True] =
BASE_CONTAINER_NAME,
+ base_container_name: str | None = None,
+ container_logs: Iterable[str] | str | Literal[True] | None = None,
image_pull_policy: str | None = None,
annotations: dict | None = None,
container_resources: k8s.V1ResourceRequirements | None = None,
@@ -315,7 +315,6 @@ class KubernetesPodOperator(BaseOperator):
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
- base_container_name: str | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: float = 2,
log_pod_spec_on_failure: bool = True,
@@ -357,9 +356,10 @@ class KubernetesPodOperator(BaseOperator):
self.cluster_context = cluster_context
self.reattach_on_restart = reattach_on_restart
self.get_logs = get_logs
- self.container_logs = container_logs
- if self.container_logs == KubernetesPodOperator.BASE_CONTAINER_NAME:
- self.container_logs = base_container_name or
self.BASE_CONTAINER_NAME
+ # Fallback to the class variable BASE_CONTAINER_NAME here instead of
via default argument value
+ # in the init method signature, to be compatible with subclasses
overloading the class variable value.
+ self.base_container_name = base_container_name or
self.BASE_CONTAINER_NAME
+ self.container_logs = container_logs or self.base_container_name
self.image_pull_policy = image_pull_policy
self.node_selector = node_selector or {}
self.annotations = annotations or {}
@@ -398,7 +398,6 @@ class KubernetesPodOperator(BaseOperator):
if skip_on_exit_code is not None
else []
)
- self.base_container_name = base_container_name or
self.BASE_CONTAINER_NAME
self.deferrable = deferrable
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
@@ -785,7 +784,7 @@ class KubernetesPodOperator(BaseOperator):
pod_log_status = self.pod_manager.fetch_container_logs(
pod=self.pod,
- container_name=self.BASE_CONTAINER_NAME,
+ container_name=self.base_container_name,
follow=follow,
since_time=last_log_time,
)