This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 880efe6d8e6 Fix EKSPodOperator credential refresh errors and improve 
error handling (#57585) (#58743)
880efe6d8e6 is described below

commit 880efe6d8e6db4dcf369067fe209c5cdff7f874e
Author: Aviral Garg <[email protected]>
AuthorDate: Tue Feb 24 23:10:28 2026 +0530

    Fix EKSPodOperator credential refresh errors and improve error handling 
(#57585) (#58743)
---
 .../tests/kubernetes_tests/test_base.py            |   8 +-
 .../airflow/providers/amazon/aws/operators/eks.py  | 116 ++++++++++++++-
 .../providers/amazon/aws/utils/eks_get_token.py    |  10 +-
 .../tests/unit/amazon/aws/operators/test_eks.py    | 157 +++++++++++++++++----
 .../providers/cncf/kubernetes/utils/pod_manager.py |  20 +++
 .../unit/cncf/kubernetes/utils/test_pod_manager.py |  64 +++++++++
 6 files changed, 342 insertions(+), 33 deletions(-)

diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py 
b/kubernetes-tests/tests/kubernetes_tests/test_base.py
index 20b807954f0..326c1191d9c 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_base.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py
@@ -257,10 +257,10 @@ class BaseK8STest:
         rollout_status = check_output(
             ["kubectl", "rollout", "status", 
f"{resource_type}/{resource_name}", "-n", namespace, "--watch"],
         ).decode()
-        if resource_type == "deployment":
-            assert "successfully rolled out" in rollout_status
-        else:
-            assert "roll out complete" in rollout_status
+        # kubectl output can vary between versions and resource types. Accept 
either
+        # the common "successfully rolled out" wording or the alternative
+        # "roll out complete" phrasing to reduce flakiness across environments.
+        assert "successfully rolled out" in rollout_status or "roll out 
complete" in rollout_status
 
     def ensure_dag_expected_state(self, host, logical_date, dag_id, 
expected_final_state, timeout):
         tries = 0
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
index 2b6ae60dc18..f024ab056d2 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
@@ -19,6 +19,8 @@
 from __future__ import annotations
 
 import logging
+import os
+import stat
 import warnings
 from ast import literal_eval
 from collections.abc import Sequence
@@ -97,6 +99,10 @@ def _create_compute(
         # this is to satisfy mypy
         subnets = subnets or []
         create_nodegroup_kwargs = create_nodegroup_kwargs or {}
+        if nodegroup_role_arn is None:
+            raise ValueError(
+                MISSING_ARN_MSG.format(compute=NODEGROUP_FULL_NAME, 
requirement="nodegroup_role_arn")
+            )
 
         eks_hook.create_nodegroup(
             clusterName=cluster_name,
@@ -152,6 +158,12 @@ def _create_compute(
         # this is to satisfy mypy
         create_fargate_profile_kwargs = create_fargate_profile_kwargs or {}
         fargate_selectors = fargate_selectors or []
+        if fargate_pod_execution_role_arn is None:
+            raise ValueError(
+                MISSING_ARN_MSG.format(
+                    compute=FARGATE_FULL_NAME, 
requirement="fargate_pod_execution_role_arn"
+                )
+            )
 
         eks_hook.create_fargate_profile(
             clusterName=cluster_name,
@@ -1093,6 +1105,8 @@ class EksPodOperator(KubernetesPodOperator):
         self.pod_name = pod_name
         self.aws_conn_id = aws_conn_id
         self.region = region
+        # Track credentials file path for credential refresh during 
long-running tasks
+        self._credentials_file_path: str | None = None
         super().__init__(
             in_cluster=self.in_cluster,
             namespace=self.namespace,
@@ -1111,10 +1125,18 @@ class EksPodOperator(KubernetesPodOperator):
             region_name=self.region,
         )
         session = eks_hook.get_session()
-        credentials = session.get_credentials().get_frozen_credentials()
+        credentials_obj = session.get_credentials()
+        if credentials_obj is None:
+            raise AirflowException(
+                "Unable to retrieve AWS credentials. Credentials may have 
expired or not been configured. "
+                "Please check your AWS connection configuration."
+            )
+        credentials = credentials_obj.get_frozen_credentials()
         with eks_hook._secure_credential_context(
             credentials.access_key, credentials.secret_key, credentials.token
         ) as credentials_file:
+            # Store credentials file path for potential refresh during 
long-running tasks
+            self._credentials_file_path = credentials_file
             with eks_hook.generate_config_file(
                 eks_cluster_name=self.cluster_name,
                 pod_namespace=self.namespace,
@@ -1130,13 +1152,103 @@ class EksPodOperator(KubernetesPodOperator):
         eks_cluster_name = event["eks_cluster_name"]
         pod_namespace = event["namespace"]
         session = eks_hook.get_session()
-        credentials = session.get_credentials().get_frozen_credentials()
+        credentials_obj = session.get_credentials()
+        if credentials_obj is None:
+            raise AirflowException(
+                "Unable to retrieve AWS credentials. Credentials may have 
expired or not been configured. "
+                "Please check your AWS connection configuration."
+            )
+        credentials = credentials_obj.get_frozen_credentials()
         with eks_hook._secure_credential_context(
             credentials.access_key, credentials.secret_key, credentials.token
         ) as credentials_file:
+            # Store credentials file path for potential refresh during 
long-running tasks
+            self._credentials_file_path = credentials_file
             with eks_hook.generate_config_file(
                 eks_cluster_name=eks_cluster_name,
                 pod_namespace=pod_namespace,
                 credentials_file=credentials_file,
             ) as self.config_file:
                 return super().trigger_reentry(context, event)
+
+    def _write_credentials_to_file(
+        self, credentials_file_path: str, access_key: str, secret_key: str, 
session_token: str | None
+    ) -> None:
+        """
+        Write AWS credentials to an existing credentials file.
+
+        This overwrites the contents of the credentials file with fresh 
credentials,
+        which allows the kubeconfig exec credential plugin to use new 
credentials
+        without regenerating the entire kubeconfig.
+
+        The file was originally created by EksHook._secure_credential_context 
with
+        restrictive permissions (0600 - owner read/write only). This method 
preserves
+        those permissions by using os.open with the same mode flags.
+
+        :param credentials_file_path: Path to the credentials file to update
+        :param access_key: AWS access key ID
+        :param secret_key: AWS secret access key
+        :param session_token: AWS session token (optional)
+        """
+        # Open with same restrictive permissions as _secure_credential_context 
(0600)
+        fd = os.open(credentials_file_path, os.O_WRONLY | os.O_TRUNC, 
stat.S_IRUSR | stat.S_IWUSR)
+        try:
+            with os.fdopen(fd, "w") as f:
+                f.write(f"export AWS_ACCESS_KEY_ID='{access_key}'\n")
+                f.write(f"export AWS_SECRET_ACCESS_KEY='{secret_key}'\n")
+                if session_token:
+                    f.write(f"export AWS_SESSION_TOKEN='{session_token}'\n")
+        except Exception:
+            # If fdopen fails, we need to close the file descriptor manually
+            try:
+                os.close(fd)
+            except OSError:
+                pass
+            raise
+
+    def _refresh_cached_properties(self) -> None:
+        """
+        Refresh cached properties including AWS credentials.
+
+        This method is called by KubernetesPodOperator._handle_api_exception 
(in
+        providers/cncf/kubernetes/operators/pod.py) when a 401 Unauthorized 
error
+        is received from the Kubernetes API. The 401 error indicates that the
+        credentials used to authenticate with EKS have expired.
+
+        The call chain is:
+        1. KubernetesPodOperator._await_pod_completion catches ApiException 
with status 401
+        2. _handle_api_exception is called, which logs a warning and calls 
_refresh_cached_properties
+        3. This override refreshes the AWS credentials file that the 
kubeconfig exec
+           credential plugin reads from (see 
EksHook._secure_credential_context)
+        4. The parent class deletes cached hook/client/pod_manager so they are 
recreated
+           with fresh credentials on next access
+
+        Without this refresh, the kubeconfig would continue to reference stale
+        credentials in the temp file, causing repeated authentication failures.
+        """
+        if self._credentials_file_path:
+            self.log.info("Refreshing AWS credentials for EKS authentication")
+            try:
+                eks_hook = EksHook(
+                    aws_conn_id=self.aws_conn_id,
+                    region_name=self.region,
+                )
+                session = eks_hook.get_session()
+                credentials_obj = session.get_credentials()
+                if credentials_obj is None:
+                    raise AirflowException(
+                        "Unable to retrieve fresh AWS credentials during 
refresh. "
+                        "Credentials may have expired or the AWS connection 
may be misconfigured."
+                    )
+                credentials = credentials_obj.get_frozen_credentials()
+                self._write_credentials_to_file(
+                    self._credentials_file_path,
+                    credentials.access_key,
+                    credentials.secret_key,
+                    credentials.token,
+                )
+                self.log.info("Successfully refreshed AWS credentials for EKS")
+            except Exception:
+                self.log.exception("Failed to refresh AWS credentials.")
+                raise
+        super()._refresh_cached_properties()
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py 
b/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py
index 47dec133f6c..03e2f2513df 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py
@@ -58,12 +58,20 @@ def fetch_access_token_for_cluster(eks_cluster_name: str, 
sts_url: str, region_n
     # the endpoint is regional.
     os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
 
+    credentials = session.get_credentials()
+    if credentials is None:
+        raise ValueError(
+            "No AWS credentials found. Credentials may have expired or not 
been properly configured. "
+            "Please ensure AWS credentials are available through environment 
variables, "
+            "AWS config files, or IAM roles."
+        )
+
     signer = RequestSigner(
         service_id=eks_client.meta.service_model.service_id,
         region_name=session.region_name,
         signing_name="sts",
         signature_version="v4",
-        credentials=session.get_credentials(),
+        credentials=credentials,
         event_emitter=session.events,
     )
 
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
index 3cf453cb5e0..2e1c850ba94 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
@@ -506,7 +506,7 @@ class TestEksCreateNodegroupOperator:
             op_kwargs["create_nodegroup_kwargs"] = create_nodegroup_kwargs
             parameters = {**self.create_nodegroup_params, 
**create_nodegroup_kwargs}
         else:
-            assert "create_nodegroup_params" not in op_kwargs
+            assert "create_nodegroup_kwargs" not in op_kwargs
             parameters = self.create_nodegroup_params
 
         operator = EksCreateNodegroupOperator(task_id=TASK_ID, **op_kwargs)
@@ -514,31 +514,36 @@ class TestEksCreateNodegroupOperator:
         mock_create_nodegroup.assert_called_with(**convert_keys(parameters))
         mock_waiter.assert_not_called()
 
-        @pytest.mark.parametrize(
-            "create_nodegroup_kwargs",
-            [
-                pytest.param(None, id="without nodegroup kwargs"),
-                pytest.param(CREATE_NODEGROUP_KWARGS, id="with nodegroup 
kwargs"),
-            ],
-        )
-        @mock.patch.object(Waiter, "wait")
-        @mock.patch.object(EksHook, "create_nodegroup")
-        def test_execute_with_wait_when_nodegroup_does_not_already_exist(
-            self, mock_create_nodegroup, mock_waiter, create_nodegroup_kwargs
-        ):
-            op_kwargs = {**self.create_nodegroup_params}
-            if create_nodegroup_kwargs:
-                op_kwargs["create_nodegroup_kwargs"] = create_nodegroup_kwargs
-                parameters = {**self.create_nodegroup_params, 
**create_nodegroup_kwargs}
-            else:
-                assert "create_nodegroup_params" not in op_kwargs
-                parameters = self.create_nodegroup_params
-
-            operator = EksCreateNodegroupOperator(task_id=TASK_ID, 
**op_kwargs, wait_for_completion=True)
-            operator.execute({})
-            
mock_create_nodegroup.assert_called_with(**convert_keys(parameters))
-            mock_waiter.assert_called_with(mock.ANY, clusterName=CLUSTER_NAME, 
nodegroupName=NODEGROUP_NAME)
-            assert_expected_waiter_type(mock_waiter, "NodegroupActive")
+    @pytest.mark.parametrize(
+        "create_nodegroup_kwargs",
+        [
+            pytest.param(None, id="without nodegroup kwargs"),
+            pytest.param(CREATE_NODEGROUP_KWARGS, id="with nodegroup kwargs"),
+        ],
+    )
+    @mock.patch.object(Waiter, "wait")
+    @mock.patch.object(EksHook, "create_nodegroup")
+    def test_execute_with_wait_when_nodegroup_does_not_already_exist(
+        self, mock_create_nodegroup, mock_waiter, create_nodegroup_kwargs
+    ):
+        op_kwargs = {**self.create_nodegroup_params}
+        if create_nodegroup_kwargs:
+            op_kwargs["create_nodegroup_kwargs"] = create_nodegroup_kwargs
+            parameters = {**self.create_nodegroup_params, 
**create_nodegroup_kwargs}
+        else:
+            assert "create_nodegroup_kwargs" not in op_kwargs
+            parameters = self.create_nodegroup_params
+
+        operator = EksCreateNodegroupOperator(task_id=TASK_ID, **op_kwargs, 
wait_for_completion=True)
+        operator.execute({})
+        mock_create_nodegroup.assert_called_with(**convert_keys(parameters))
+        mock_waiter.assert_called_with(
+            mock.ANY,
+            clusterName=CLUSTER_NAME,
+            nodegroupName=NODEGROUP_NAME,
+            WaiterConfig={"MaxAttempts": mock.ANY},
+        )
+        assert_expected_waiter_type(mock_waiter, "NodegroupActive")
 
     @mock.patch.object(EksHook, "create_nodegroup")
     def test_create_nodegroup_deferrable(self, mock_create_nodegroup):
@@ -1011,3 +1016,103 @@ class TestEksPodOperator:
             credentials_file=mock_credentials_file,
         )
         assert op.config_file == mock_config_file
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.get_session")
+    @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__", 
return_value=None)
+    @mock.patch(
+        
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator._refresh_cached_properties"
+    )
+    def test_refresh_cached_properties_refreshes_credentials(
+        self,
+        mock_super_refresh,
+        mock_eks_hook,
+        mock_get_session,
+        tmp_path,
+    ):
+        """Test that _refresh_cached_properties refreshes AWS credentials 
file."""
+        # Create a temporary credentials file
+        credentials_file = tmp_path / "test_creds.aws_creds"
+        credentials_file.write_text(
+            "export AWS_ACCESS_KEY_ID='old_key'\n"
+            "export AWS_SECRET_ACCESS_KEY='old_secret'\n"
+            "export AWS_SESSION_TOKEN='old_token'\n"
+        )
+
+        # Mock the credential chain for refresh
+        mock_session = mock.MagicMock()
+        mock_credentials = mock.MagicMock()
+        mock_frozen_credentials = mock.MagicMock()
+        mock_frozen_credentials.access_key = "new_access_key"
+        mock_frozen_credentials.secret_key = "new_secret_key"
+        mock_frozen_credentials.token = "new_token"
+
+        mock_get_session.return_value = mock_session
+        mock_session.get_credentials.return_value = mock_credentials
+        mock_credentials.get_frozen_credentials.return_value = 
mock_frozen_credentials
+
+        op = EksPodOperator(
+            task_id="run_pod",
+            pod_name="run_pod",
+            cluster_name=CLUSTER_NAME,
+            image="amazon/aws-cli:latest",
+            cmds=["sh", "-c", "ls"],
+            labels={"demo": "hello_world"},
+            get_logs=True,
+            on_finish_action="delete_pod",
+        )
+        # Set the credentials file path as it would be during execute()
+        op._credentials_file_path = str(credentials_file)
+
+        # Call the refresh method
+        op._refresh_cached_properties()
+
+        # Verify the credentials file was updated with new credentials
+        updated_content = credentials_file.read_text()
+        assert "new_access_key" in updated_content
+        assert "new_secret_key" in updated_content
+        assert "new_token" in updated_content
+        assert "old_key" not in updated_content
+
+        # Verify super()._refresh_cached_properties() was called
+        mock_super_refresh.assert_called_once()
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.get_session")
+    @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__", 
return_value=None)
+    @mock.patch(
+        
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator._refresh_cached_properties"
+    )
+    def test_refresh_cached_properties_raises_when_no_credentials(
+        self,
+        mock_super_refresh,
+        mock_eks_hook,
+        mock_get_session,
+        tmp_path,
+    ):
+        """Test that _refresh_cached_properties raises when credentials cannot 
be retrieved."""
+        # Create a temporary credentials file
+        credentials_file = tmp_path / "test_creds.aws_creds"
+        credentials_file.write_text("export AWS_ACCESS_KEY_ID='old_key'\n")
+
+        # Mock the credential chain to return None (simulating expired/missing 
credentials)
+        mock_session = mock.MagicMock()
+        mock_get_session.return_value = mock_session
+        mock_session.get_credentials.return_value = None
+
+        op = EksPodOperator(
+            task_id="run_pod",
+            pod_name="run_pod",
+            cluster_name=CLUSTER_NAME,
+            image="amazon/aws-cli:latest",
+            cmds=["sh", "-c", "ls"],
+            labels={"demo": "hello_world"},
+            get_logs=True,
+            on_finish_action="delete_pod",
+        )
+        op._credentials_file_path = str(credentials_file)
+
+        # Call the refresh method and expect it to raise
+        with pytest.raises(AirflowException, match="Unable to retrieve fresh 
AWS credentials"):
+            op._refresh_cached_properties()
+
+        # Verify super()._refresh_cached_properties() was NOT called since we 
raised
+        mock_super_refresh.assert_not_called()
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 413a1fa4748..32ce54465bd 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -355,6 +355,7 @@ class PodManager(LoggingMixin):
         self._watch = watch.Watch()
         self._callbacks = callbacks or []
         self.stop_watching_events = False
+        self.container_log_times: dict[tuple[str, str, str], DateTime] = {}
 
     def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
         """Run POD asynchronously."""
@@ -534,6 +535,10 @@ class PodManager(LoggingMixin):
                                     log_formatter,
                                 )
                                 last_captured_timestamp = message_timestamp
+                                if last_captured_timestamp is not None:
+                                    self.container_log_times[
+                                        (pod.metadata.namespace, 
pod.metadata.name, container_name)
+                                    ] = last_captured_timestamp
                                 message_to_log = message
                                 message_timestamp = line_timestamp
                         else:  # continuation of the previous log line
@@ -554,6 +559,10 @@ class PodManager(LoggingMixin):
                             message_to_log, container_name, 
container_name_log_prefix_enabled, log_formatter
                         )
                     last_captured_timestamp = message_timestamp
+                    if last_captured_timestamp:
+                        self.container_log_times[
+                            (pod.metadata.namespace, pod.metadata.name, 
container_name)
+                        ] = last_captured_timestamp
             except TimeoutError as e:
                 # in case of timeout, increment return time by 2 seconds to 
avoid
                 # duplicate log entries
@@ -663,10 +672,12 @@ class PodManager(LoggingMixin):
         containers_to_log = sorted(containers_to_log, key=lambda cn: 
all_containers.index(cn))
         for c in containers_to_log:
             self._await_init_container_start(pod=pod, container_name=c)
+            since_time = self.container_log_times.get((pod.metadata.namespace, 
pod.metadata.name, c))
             status = self.fetch_container_logs(
                 pod=pod,
                 container_name=c,
                 follow=follow_logs,
+                since_time=since_time,
                 
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
                 log_formatter=log_formatter,
             )
@@ -696,10 +707,12 @@ class PodManager(LoggingMixin):
             pod_name=pod.metadata.name,
         )
         for c in containers_to_log:
+            since_time = self.container_log_times.get((pod.metadata.namespace, 
pod.metadata.name, c))
             status = self.fetch_container_logs(
                 pod=pod,
                 container_name=c,
                 follow=follow_logs,
+                since_time=since_time,
                 
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
                 log_formatter=log_formatter,
             )
@@ -837,6 +850,12 @@ class PodManager(LoggingMixin):
                 resource_version=resource_version,
                 resource_version_match="NotOlderThan" if resource_version else 
None,
             )
+        except TypeError:
+            return self._client.list_namespaced_event(
+                namespace=pod.metadata.namespace,
+                field_selector=f"involvedObject.name={pod.metadata.name}",
+                resource_version=resource_version,
+            )
         except HTTPError as e:
             raise KubernetesApiException(f"There was an error reading the 
kubernetes API: {e}")
 
@@ -899,6 +918,7 @@ class PodManager(LoggingMixin):
             f"then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; "
             f"else echo {EMPTY_XCOM_RESULT}; fi"
         )
+        result = None
         with closing(
             kubernetes_stream(
                 self._client.connect_get_namespaced_pod_exec,
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 544109a599e..b5c57c49167 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -638,6 +638,70 @@ class TestPodManager:
         assert "message3 line1" in caplog.text
         assert "ERROR" not in caplog.text
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+    def test_container_log_times_tracks_last_timestamp(self, 
mock_read_pod_logs, mock_container_is_running):
+        """Test that container_log_times dictionary tracks the last log 
timestamp for each container."""
+        timestamp_string = "2020-10-08T14:16:17.793417674Z"
+        mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} 
message", "utf-8")]
+        mock_container_is_running.return_value = False
+
+        # Create a mock pod with namespace and name
+        mock_pod = mock.MagicMock()
+        mock_pod.metadata.namespace = "test-namespace"
+        mock_pod.metadata.name = "test-pod"
+        container_name = "test-container"
+
+        # Ensure container_log_times is empty initially
+        assert not self.pod_manager.container_log_times
+
+        # Fetch logs which should populate container_log_times
+        self.pod_manager.fetch_container_logs(mock_pod, container_name, 
follow=True)
+
+        # Verify the timestamp was stored in container_log_times
+        key = ("test-namespace", "test-pod", "test-container")
+        assert key in self.pod_manager.container_log_times
+        assert self.pod_manager.container_log_times[key] == cast("DateTime", 
pendulum.parse(timestamp_string))
+
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+    def test_fetch_requested_container_logs_uses_since_time(
+        self, mock_read_pod_logs, mock_container_is_running
+    ):
+        """Test that fetch_requested_container_logs passes since_time from 
container_log_times."""
+        timestamp_string = "2020-10-08T14:16:17.793417674Z"
+        mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} 
message", "utf-8")]
+        mock_container_is_running.return_value = False
+
+        # Create a mock pod
+        mock_pod = mock.MagicMock()
+        mock_pod.metadata.namespace = "test-namespace"
+        mock_pod.metadata.name = "test-pod"
+        mock_container = mock.MagicMock()
+        mock_container.name = "test-container"
+        mock_pod.spec.containers = [mock_container]
+        container_name = "test-container"
+
+        # Pre-populate container_log_times with an earlier timestamp
+        earlier_timestamp = pendulum.parse("2020-10-08T14:15:00.000000000Z")
+        self.pod_manager.container_log_times[("test-namespace", "test-pod", 
"test-container")] = (
+            earlier_timestamp
+        )
+
+        # Mock read_pod to return the mock_pod
+        self.pod_manager.read_pod = mock.Mock(return_value=mock_pod)
+
+        # Fetch logs - this should pass the earlier timestamp as since_time
+        with mock.patch.object(
+            self.pod_manager, "fetch_container_logs", 
wraps=self.pod_manager.fetch_container_logs
+        ) as mock_fetch:
+            self.pod_manager.fetch_requested_container_logs(mock_pod, 
containers=container_name)
+
+            # Verify fetch_container_logs was called with since_time set to 
the earlier timestamp
+            mock_fetch.assert_called_once()
+            call_kwargs = mock_fetch.call_args[1]
+            assert call_kwargs["since_time"] == earlier_timestamp
+
     @pytest.mark.parametrize("status", [409, 429])
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
     def test_start_pod_retries_on_409_or_429_error(self, mock_run_pod_async, 
status):

Reply via email to