This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 880efe6d8e6 Fix EKSPodOperator credential refresh errors and improve
error handling (#57585) (#58743)
880efe6d8e6 is described below
commit 880efe6d8e6db4dcf369067fe209c5cdff7f874e
Author: Aviral Garg <[email protected]>
AuthorDate: Tue Feb 24 23:10:28 2026 +0530
Fix EKSPodOperator credential refresh errors and improve error handling
(#57585) (#58743)
---
.../tests/kubernetes_tests/test_base.py | 8 +-
.../airflow/providers/amazon/aws/operators/eks.py | 116 ++++++++++++++-
.../providers/amazon/aws/utils/eks_get_token.py | 10 +-
.../tests/unit/amazon/aws/operators/test_eks.py | 157 +++++++++++++++++----
.../providers/cncf/kubernetes/utils/pod_manager.py | 20 +++
.../unit/cncf/kubernetes/utils/test_pod_manager.py | 64 +++++++++
6 files changed, 342 insertions(+), 33 deletions(-)
diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py
b/kubernetes-tests/tests/kubernetes_tests/test_base.py
index 20b807954f0..326c1191d9c 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_base.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py
@@ -257,10 +257,10 @@ class BaseK8STest:
rollout_status = check_output(
["kubectl", "rollout", "status",
f"{resource_type}/{resource_name}", "-n", namespace, "--watch"],
).decode()
- if resource_type == "deployment":
- assert "successfully rolled out" in rollout_status
- else:
- assert "roll out complete" in rollout_status
+ # kubectl output can vary between versions and resource types. Accept
either
+ # the common "successfully rolled out" wording or the alternative
+ # "roll out complete" phrasing to reduce flakiness across environments.
+ assert "successfully rolled out" in rollout_status or "roll out
complete" in rollout_status
def ensure_dag_expected_state(self, host, logical_date, dag_id,
expected_final_state, timeout):
tries = 0
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
index 2b6ae60dc18..f024ab056d2 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
@@ -19,6 +19,8 @@
from __future__ import annotations
import logging
+import os
+import stat
import warnings
from ast import literal_eval
from collections.abc import Sequence
@@ -97,6 +99,10 @@ def _create_compute(
# this is to satisfy mypy
subnets = subnets or []
create_nodegroup_kwargs = create_nodegroup_kwargs or {}
+ if nodegroup_role_arn is None:
+ raise ValueError(
+ MISSING_ARN_MSG.format(compute=NODEGROUP_FULL_NAME,
requirement="nodegroup_role_arn")
+ )
eks_hook.create_nodegroup(
clusterName=cluster_name,
@@ -152,6 +158,12 @@ def _create_compute(
# this is to satisfy mypy
create_fargate_profile_kwargs = create_fargate_profile_kwargs or {}
fargate_selectors = fargate_selectors or []
+ if fargate_pod_execution_role_arn is None:
+ raise ValueError(
+ MISSING_ARN_MSG.format(
+ compute=FARGATE_FULL_NAME,
requirement="fargate_pod_execution_role_arn"
+ )
+ )
eks_hook.create_fargate_profile(
clusterName=cluster_name,
@@ -1093,6 +1105,8 @@ class EksPodOperator(KubernetesPodOperator):
self.pod_name = pod_name
self.aws_conn_id = aws_conn_id
self.region = region
+ # Track credentials file path for credential refresh during
long-running tasks
+ self._credentials_file_path: str | None = None
super().__init__(
in_cluster=self.in_cluster,
namespace=self.namespace,
@@ -1111,10 +1125,18 @@ class EksPodOperator(KubernetesPodOperator):
region_name=self.region,
)
session = eks_hook.get_session()
- credentials = session.get_credentials().get_frozen_credentials()
+ credentials_obj = session.get_credentials()
+ if credentials_obj is None:
+ raise AirflowException(
+ "Unable to retrieve AWS credentials. Credentials may have
expired or not been configured. "
+ "Please check your AWS connection configuration."
+ )
+ credentials = credentials_obj.get_frozen_credentials()
with eks_hook._secure_credential_context(
credentials.access_key, credentials.secret_key, credentials.token
) as credentials_file:
+ # Store credentials file path for potential refresh during
long-running tasks
+ self._credentials_file_path = credentials_file
with eks_hook.generate_config_file(
eks_cluster_name=self.cluster_name,
pod_namespace=self.namespace,
@@ -1130,13 +1152,103 @@ class EksPodOperator(KubernetesPodOperator):
eks_cluster_name = event["eks_cluster_name"]
pod_namespace = event["namespace"]
session = eks_hook.get_session()
- credentials = session.get_credentials().get_frozen_credentials()
+ credentials_obj = session.get_credentials()
+ if credentials_obj is None:
+ raise AirflowException(
+ "Unable to retrieve AWS credentials. Credentials may have
expired or not been configured. "
+ "Please check your AWS connection configuration."
+ )
+ credentials = credentials_obj.get_frozen_credentials()
with eks_hook._secure_credential_context(
credentials.access_key, credentials.secret_key, credentials.token
) as credentials_file:
+ # Store credentials file path for potential refresh during
long-running tasks
+ self._credentials_file_path = credentials_file
with eks_hook.generate_config_file(
eks_cluster_name=eks_cluster_name,
pod_namespace=pod_namespace,
credentials_file=credentials_file,
) as self.config_file:
return super().trigger_reentry(context, event)
+
+ def _write_credentials_to_file(
+ self, credentials_file_path: str, access_key: str, secret_key: str,
session_token: str | None
+ ) -> None:
+ """
+ Write AWS credentials to an existing credentials file.
+
+ This overwrites the contents of the credentials file with fresh
credentials,
+ which allows the kubeconfig exec credential plugin to use new
credentials
+ without regenerating the entire kubeconfig.
+
+ The file was originally created by EksHook._secure_credential_context
with
+ restrictive permissions (0600 - owner read/write only). This method
preserves
+ those permissions by using os.open with the same mode flags.
+
+ :param credentials_file_path: Path to the credentials file to update
+ :param access_key: AWS access key ID
+ :param secret_key: AWS secret access key
+ :param session_token: AWS session token (optional)
+ """
+ # Open with same restrictive permissions as _secure_credential_context
(0600)
+ fd = os.open(credentials_file_path, os.O_WRONLY | os.O_TRUNC,
stat.S_IRUSR | stat.S_IWUSR)
+ try:
+ with os.fdopen(fd, "w") as f:
+ f.write(f"export AWS_ACCESS_KEY_ID='{access_key}'\n")
+ f.write(f"export AWS_SECRET_ACCESS_KEY='{secret_key}'\n")
+ if session_token:
+ f.write(f"export AWS_SESSION_TOKEN='{session_token}'\n")
+ except Exception:
+ # If fdopen fails, we need to close the file descriptor manually
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+ raise
+
+ def _refresh_cached_properties(self) -> None:
+ """
+ Refresh cached properties including AWS credentials.
+
+ This method is called by KubernetesPodOperator._handle_api_exception
(in
+ providers/cncf/kubernetes/operators/pod.py) when a 401 Unauthorized
error
+ is received from the Kubernetes API. The 401 error indicates that the
+ credentials used to authenticate with EKS have expired.
+
+ The call chain is:
+ 1. KubernetesPodOperator._await_pod_completion catches ApiException
with status 401
+ 2. _handle_api_exception is called, which logs a warning and calls
_refresh_cached_properties
+ 3. This override refreshes the AWS credentials file that the
kubeconfig exec
+ credential plugin reads from (see
EksHook._secure_credential_context)
+ 4. The parent class deletes cached hook/client/pod_manager so they are
recreated
+ with fresh credentials on next access
+
+ Without this refresh, the kubeconfig would continue to reference stale
+ credentials in the temp file, causing repeated authentication failures.
+ """
+ if self._credentials_file_path:
+ self.log.info("Refreshing AWS credentials for EKS authentication")
+ try:
+ eks_hook = EksHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region,
+ )
+ session = eks_hook.get_session()
+ credentials_obj = session.get_credentials()
+ if credentials_obj is None:
+ raise AirflowException(
+ "Unable to retrieve fresh AWS credentials during
refresh. "
+ "Credentials may have expired or the AWS connection
may be misconfigured."
+ )
+ credentials = credentials_obj.get_frozen_credentials()
+ self._write_credentials_to_file(
+ self._credentials_file_path,
+ credentials.access_key,
+ credentials.secret_key,
+ credentials.token,
+ )
+ self.log.info("Successfully refreshed AWS credentials for EKS")
+ except Exception:
+ self.log.exception("Failed to refresh AWS credentials.")
+ raise
+ super()._refresh_cached_properties()
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py
b/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py
index 47dec133f6c..03e2f2513df 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py
@@ -58,12 +58,20 @@ def fetch_access_token_for_cluster(eks_cluster_name: str,
sts_url: str, region_n
# the endpoint is regional.
os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
+ credentials = session.get_credentials()
+ if credentials is None:
+ raise ValueError(
+ "No AWS credentials found. Credentials may have expired or not
been properly configured. "
+ "Please ensure AWS credentials are available through environment
variables, "
+ "AWS config files, or IAM roles."
+ )
+
signer = RequestSigner(
service_id=eks_client.meta.service_model.service_id,
region_name=session.region_name,
signing_name="sts",
signature_version="v4",
- credentials=session.get_credentials(),
+ credentials=credentials,
event_emitter=session.events,
)
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
index 3cf453cb5e0..2e1c850ba94 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
@@ -506,7 +506,7 @@ class TestEksCreateNodegroupOperator:
op_kwargs["create_nodegroup_kwargs"] = create_nodegroup_kwargs
parameters = {**self.create_nodegroup_params,
**create_nodegroup_kwargs}
else:
- assert "create_nodegroup_params" not in op_kwargs
+ assert "create_nodegroup_kwargs" not in op_kwargs
parameters = self.create_nodegroup_params
operator = EksCreateNodegroupOperator(task_id=TASK_ID, **op_kwargs)
@@ -514,31 +514,36 @@ class TestEksCreateNodegroupOperator:
mock_create_nodegroup.assert_called_with(**convert_keys(parameters))
mock_waiter.assert_not_called()
- @pytest.mark.parametrize(
- "create_nodegroup_kwargs",
- [
- pytest.param(None, id="without nodegroup kwargs"),
- pytest.param(CREATE_NODEGROUP_KWARGS, id="with nodegroup
kwargs"),
- ],
- )
- @mock.patch.object(Waiter, "wait")
- @mock.patch.object(EksHook, "create_nodegroup")
- def test_execute_with_wait_when_nodegroup_does_not_already_exist(
- self, mock_create_nodegroup, mock_waiter, create_nodegroup_kwargs
- ):
- op_kwargs = {**self.create_nodegroup_params}
- if create_nodegroup_kwargs:
- op_kwargs["create_nodegroup_kwargs"] = create_nodegroup_kwargs
- parameters = {**self.create_nodegroup_params,
**create_nodegroup_kwargs}
- else:
- assert "create_nodegroup_params" not in op_kwargs
- parameters = self.create_nodegroup_params
-
- operator = EksCreateNodegroupOperator(task_id=TASK_ID,
**op_kwargs, wait_for_completion=True)
- operator.execute({})
-
mock_create_nodegroup.assert_called_with(**convert_keys(parameters))
- mock_waiter.assert_called_with(mock.ANY, clusterName=CLUSTER_NAME,
nodegroupName=NODEGROUP_NAME)
- assert_expected_waiter_type(mock_waiter, "NodegroupActive")
+ @pytest.mark.parametrize(
+ "create_nodegroup_kwargs",
+ [
+ pytest.param(None, id="without nodegroup kwargs"),
+ pytest.param(CREATE_NODEGROUP_KWARGS, id="with nodegroup kwargs"),
+ ],
+ )
+ @mock.patch.object(Waiter, "wait")
+ @mock.patch.object(EksHook, "create_nodegroup")
+ def test_execute_with_wait_when_nodegroup_does_not_already_exist(
+ self, mock_create_nodegroup, mock_waiter, create_nodegroup_kwargs
+ ):
+ op_kwargs = {**self.create_nodegroup_params}
+ if create_nodegroup_kwargs:
+ op_kwargs["create_nodegroup_kwargs"] = create_nodegroup_kwargs
+ parameters = {**self.create_nodegroup_params,
**create_nodegroup_kwargs}
+ else:
+ assert "create_nodegroup_kwargs" not in op_kwargs
+ parameters = self.create_nodegroup_params
+
+ operator = EksCreateNodegroupOperator(task_id=TASK_ID, **op_kwargs,
wait_for_completion=True)
+ operator.execute({})
+ mock_create_nodegroup.assert_called_with(**convert_keys(parameters))
+ mock_waiter.assert_called_with(
+ mock.ANY,
+ clusterName=CLUSTER_NAME,
+ nodegroupName=NODEGROUP_NAME,
+ WaiterConfig={"MaxAttempts": mock.ANY},
+ )
+ assert_expected_waiter_type(mock_waiter, "NodegroupActive")
@mock.patch.object(EksHook, "create_nodegroup")
def test_create_nodegroup_deferrable(self, mock_create_nodegroup):
@@ -1011,3 +1016,103 @@ class TestEksPodOperator:
credentials_file=mock_credentials_file,
)
assert op.config_file == mock_config_file
+
+ @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.get_session")
+ @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__",
return_value=None)
+ @mock.patch(
+
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator._refresh_cached_properties"
+ )
+ def test_refresh_cached_properties_refreshes_credentials(
+ self,
+ mock_super_refresh,
+ mock_eks_hook,
+ mock_get_session,
+ tmp_path,
+ ):
+ """Test that _refresh_cached_properties refreshes AWS credentials
file."""
+ # Create a temporary credentials file
+ credentials_file = tmp_path / "test_creds.aws_creds"
+ credentials_file.write_text(
+ "export AWS_ACCESS_KEY_ID='old_key'\n"
+ "export AWS_SECRET_ACCESS_KEY='old_secret'\n"
+ "export AWS_SESSION_TOKEN='old_token'\n"
+ )
+
+ # Mock the credential chain for refresh
+ mock_session = mock.MagicMock()
+ mock_credentials = mock.MagicMock()
+ mock_frozen_credentials = mock.MagicMock()
+ mock_frozen_credentials.access_key = "new_access_key"
+ mock_frozen_credentials.secret_key = "new_secret_key"
+ mock_frozen_credentials.token = "new_token"
+
+ mock_get_session.return_value = mock_session
+ mock_session.get_credentials.return_value = mock_credentials
+ mock_credentials.get_frozen_credentials.return_value =
mock_frozen_credentials
+
+ op = EksPodOperator(
+ task_id="run_pod",
+ pod_name="run_pod",
+ cluster_name=CLUSTER_NAME,
+ image="amazon/aws-cli:latest",
+ cmds=["sh", "-c", "ls"],
+ labels={"demo": "hello_world"},
+ get_logs=True,
+ on_finish_action="delete_pod",
+ )
+ # Set the credentials file path as it would be during execute()
+ op._credentials_file_path = str(credentials_file)
+
+ # Call the refresh method
+ op._refresh_cached_properties()
+
+ # Verify the credentials file was updated with new credentials
+ updated_content = credentials_file.read_text()
+ assert "new_access_key" in updated_content
+ assert "new_secret_key" in updated_content
+ assert "new_token" in updated_content
+ assert "old_key" not in updated_content
+
+ # Verify super()._refresh_cached_properties() was called
+ mock_super_refresh.assert_called_once()
+
+ @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.get_session")
+ @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__",
return_value=None)
+ @mock.patch(
+
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator._refresh_cached_properties"
+ )
+ def test_refresh_cached_properties_raises_when_no_credentials(
+ self,
+ mock_super_refresh,
+ mock_eks_hook,
+ mock_get_session,
+ tmp_path,
+ ):
+ """Test that _refresh_cached_properties raises when credentials cannot
be retrieved."""
+ # Create a temporary credentials file
+ credentials_file = tmp_path / "test_creds.aws_creds"
+ credentials_file.write_text("export AWS_ACCESS_KEY_ID='old_key'\n")
+
+ # Mock the credential chain to return None (simulating expired/missing
credentials)
+ mock_session = mock.MagicMock()
+ mock_get_session.return_value = mock_session
+ mock_session.get_credentials.return_value = None
+
+ op = EksPodOperator(
+ task_id="run_pod",
+ pod_name="run_pod",
+ cluster_name=CLUSTER_NAME,
+ image="amazon/aws-cli:latest",
+ cmds=["sh", "-c", "ls"],
+ labels={"demo": "hello_world"},
+ get_logs=True,
+ on_finish_action="delete_pod",
+ )
+ op._credentials_file_path = str(credentials_file)
+
+ # Call the refresh method and expect it to raise
+ with pytest.raises(AirflowException, match="Unable to retrieve fresh
AWS credentials"):
+ op._refresh_cached_properties()
+
+ # Verify super()._refresh_cached_properties() was NOT called since we
raised
+ mock_super_refresh.assert_not_called()
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 413a1fa4748..32ce54465bd 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -355,6 +355,7 @@ class PodManager(LoggingMixin):
self._watch = watch.Watch()
self._callbacks = callbacks or []
self.stop_watching_events = False
+ self.container_log_times: dict[tuple[str, str, str], DateTime] = {}
def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
"""Run POD asynchronously."""
@@ -534,6 +535,10 @@ class PodManager(LoggingMixin):
log_formatter,
)
last_captured_timestamp = message_timestamp
+ if last_captured_timestamp is not None:
+ self.container_log_times[
+ (pod.metadata.namespace,
pod.metadata.name, container_name)
+ ] = last_captured_timestamp
message_to_log = message
message_timestamp = line_timestamp
else: # continuation of the previous log line
@@ -554,6 +559,10 @@ class PodManager(LoggingMixin):
message_to_log, container_name,
container_name_log_prefix_enabled, log_formatter
)
last_captured_timestamp = message_timestamp
+ if last_captured_timestamp:
+ self.container_log_times[
+ (pod.metadata.namespace, pod.metadata.name,
container_name)
+ ] = last_captured_timestamp
except TimeoutError as e:
# in case of timeout, increment return time by 2 seconds to
avoid
# duplicate log entries
@@ -663,10 +672,12 @@ class PodManager(LoggingMixin):
containers_to_log = sorted(containers_to_log, key=lambda cn:
all_containers.index(cn))
for c in containers_to_log:
self._await_init_container_start(pod=pod, container_name=c)
+ since_time = self.container_log_times.get((pod.metadata.namespace,
pod.metadata.name, c))
status = self.fetch_container_logs(
pod=pod,
container_name=c,
follow=follow_logs,
+ since_time=since_time,
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
log_formatter=log_formatter,
)
@@ -696,10 +707,12 @@ class PodManager(LoggingMixin):
pod_name=pod.metadata.name,
)
for c in containers_to_log:
+ since_time = self.container_log_times.get((pod.metadata.namespace,
pod.metadata.name, c))
status = self.fetch_container_logs(
pod=pod,
container_name=c,
follow=follow_logs,
+ since_time=since_time,
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
log_formatter=log_formatter,
)
@@ -837,6 +850,12 @@ class PodManager(LoggingMixin):
resource_version=resource_version,
resource_version_match="NotOlderThan" if resource_version else
None,
)
+ except TypeError:
+ return self._client.list_namespaced_event(
+ namespace=pod.metadata.namespace,
+ field_selector=f"involvedObject.name={pod.metadata.name}",
+ resource_version=resource_version,
+ )
except HTTPError as e:
raise KubernetesApiException(f"There was an error reading the
kubernetes API: {e}")
@@ -899,6 +918,7 @@ class PodManager(LoggingMixin):
f"then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; "
f"else echo {EMPTY_XCOM_RESULT}; fi"
)
+ result = None
with closing(
kubernetes_stream(
self._client.connect_get_namespaced_pod_exec,
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 544109a599e..b5c57c49167 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -638,6 +638,70 @@ class TestPodManager:
assert "message3 line1" in caplog.text
assert "ERROR" not in caplog.text
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+ def test_container_log_times_tracks_last_timestamp(self,
mock_read_pod_logs, mock_container_is_running):
+ """Test that container_log_times dictionary tracks the last log
timestamp for each container."""
+ timestamp_string = "2020-10-08T14:16:17.793417674Z"
+ mock_read_pod_logs.return_value = [bytes(f"{timestamp_string}
message", "utf-8")]
+ mock_container_is_running.return_value = False
+
+ # Create a mock pod with namespace and name
+ mock_pod = mock.MagicMock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+ container_name = "test-container"
+
+ # Ensure container_log_times is empty initially
+ assert not self.pod_manager.container_log_times
+
+ # Fetch logs which should populate container_log_times
+ self.pod_manager.fetch_container_logs(mock_pod, container_name,
follow=True)
+
+ # Verify the timestamp was stored in container_log_times
+ key = ("test-namespace", "test-pod", "test-container")
+ assert key in self.pod_manager.container_log_times
+ assert self.pod_manager.container_log_times[key] == cast("DateTime",
pendulum.parse(timestamp_string))
+
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+ def test_fetch_requested_container_logs_uses_since_time(
+ self, mock_read_pod_logs, mock_container_is_running
+ ):
+ """Test that fetch_requested_container_logs passes since_time from
container_log_times."""
+ timestamp_string = "2020-10-08T14:16:17.793417674Z"
+ mock_read_pod_logs.return_value = [bytes(f"{timestamp_string}
message", "utf-8")]
+ mock_container_is_running.return_value = False
+
+ # Create a mock pod
+ mock_pod = mock.MagicMock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+ mock_container = mock.MagicMock()
+ mock_container.name = "test-container"
+ mock_pod.spec.containers = [mock_container]
+ container_name = "test-container"
+
+ # Pre-populate container_log_times with an earlier timestamp
+ earlier_timestamp = pendulum.parse("2020-10-08T14:15:00.000000000Z")
+ self.pod_manager.container_log_times[("test-namespace", "test-pod",
"test-container")] = (
+ earlier_timestamp
+ )
+
+ # Mock read_pod to return the mock_pod
+ self.pod_manager.read_pod = mock.Mock(return_value=mock_pod)
+
+ # Fetch logs - this should pass the earlier timestamp as since_time
+ with mock.patch.object(
+ self.pod_manager, "fetch_container_logs",
wraps=self.pod_manager.fetch_container_logs
+ ) as mock_fetch:
+ self.pod_manager.fetch_requested_container_logs(mock_pod,
containers=container_name)
+
+ # Verify fetch_container_logs was called with since_time set to
the earlier timestamp
+ mock_fetch.assert_called_once()
+ call_kwargs = mock_fetch.call_args[1]
+ assert call_kwargs["since_time"] == earlier_timestamp
+
@pytest.mark.parametrize("status", [409, 429])
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
def test_start_pod_retries_on_409_or_429_error(self, mock_run_pod_async,
status):