uranusjr commented on code in PR #58743:
URL: https://github.com/apache/airflow/pull/58743#discussion_r2674394836


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1089,13 +1101,103 @@ def trigger_reentry(self, context: Context, event: 
dict[str, Any]) -> Any:
         eks_cluster_name = event["eks_cluster_name"]
         pod_namespace = event["namespace"]
         session = eks_hook.get_session()
-        credentials = session.get_credentials().get_frozen_credentials()
+        credentials_obj = session.get_credentials()
+        if credentials_obj is None:
+            raise AirflowException(
+                "Unable to retrieve AWS credentials. Credentials may have 
expired or not been configured. "
+                "Please check your AWS connection configuration."
+            )
+        credentials = credentials_obj.get_frozen_credentials()
         with eks_hook._secure_credential_context(
             credentials.access_key, credentials.secret_key, credentials.token
         ) as credentials_file:
+            # Store credentials file path for potential refresh during 
long-running tasks
+            self._credentials_file_path = credentials_file
             with eks_hook.generate_config_file(
                 eks_cluster_name=eks_cluster_name,
                 pod_namespace=pod_namespace,
                 credentials_file=credentials_file,
             ) as self.config_file:
                 return super().trigger_reentry(context, event)
+
+    def _write_credentials_to_file(
+        self, credentials_file_path: str, access_key: str, secret_key: str, 
session_token: str | None
+    ) -> None:
+        """
+        Write AWS credentials to an existing credentials file.
+
+        This overwrites the contents of the credentials file with fresh 
credentials,
+        which allows the kubeconfig exec credential plugin to use new 
credentials
+        without regenerating the entire kubeconfig.
+
+        The file was originally created by EksHook._secure_credential_context 
with
+        restrictive permissions (0600 - owner read/write only). This method 
preserves
+        those permissions by using os.open with the same mode flags.
+
+        :param credentials_file_path: Path to the credentials file to update
+        :param access_key: AWS access key ID
+        :param secret_key: AWS secret access key
+        :param session_token: AWS session token (optional)
+        """
+        # Open with same restrictive permissions as _secure_credential_context 
(0600)
+        fd = os.open(credentials_file_path, os.O_WRONLY | os.O_TRUNC, 
stat.S_IRUSR | stat.S_IWUSR)
+        try:
+            with os.fdopen(fd, "w") as f:
+                f.write(f"export AWS_ACCESS_KEY_ID='{access_key}'\n")
+                f.write(f"export AWS_SECRET_ACCESS_KEY='{secret_key}'\n")
+                if session_token:
+                    f.write(f"export AWS_SESSION_TOKEN='{session_token}'\n")
+        except Exception:
+            # If fdopen fails, we need to close the file descriptor manually
+            try:
+                os.close(fd)
+            except OSError:
+                pass
+            raise
+
+    def _refresh_cached_properties(self) -> None:
+        """
+        Refresh cached properties including AWS credentials.
+
+        This method is called by KubernetesPodOperator._handle_api_exception 
(in
+        providers/cncf/kubernetes/operators/pod.py) when a 401 Unauthorized 
error
+        is received from the Kubernetes API. The 401 error indicates that the
+        credentials used to authenticate with EKS have expired.
+
+        The call chain is:
+        1. KubernetesPodOperator._await_pod_completion catches ApiException 
with status 401
+        2. _handle_api_exception is called, which logs a warning and calls 
_refresh_cached_properties
+        3. This override refreshes the AWS credentials file that the 
kubeconfig exec
+           credential plugin reads from (see 
EksHook._secure_credential_context)
+        4. The parent class deletes cached hook/client/pod_manager so they are 
recreated
+           with fresh credentials on next access
+
+        Without this refresh, the kubeconfig would continue to reference stale
+        credentials in the temp file, causing repeated authentication failures.
+        """
+        if self._credentials_file_path:
+            self.log.info("Refreshing AWS credentials for EKS authentication")
+            try:
+                eks_hook = EksHook(
+                    aws_conn_id=self.aws_conn_id,
+                    region_name=self.region,
+                )
+                session = eks_hook.get_session()
+                credentials_obj = session.get_credentials()
+                if credentials_obj is None:
+                    raise AirflowException(
+                        "Unable to retrieve fresh AWS credentials during 
refresh. "
+                        "Credentials may have expired or the AWS connection 
may be misconfigured."
+                    )
+                credentials = credentials_obj.get_frozen_credentials()
+                self._write_credentials_to_file(
+                    self._credentials_file_path,
+                    credentials.access_key,
+                    credentials.secret_key,
+                    credentials.token,
+                )
+                self.log.info("Successfully refreshed AWS credentials for EKS")
+            except Exception as e:
+                self.log.error("Failed to refresh AWS credentials: %s", e)

Review Comment:
   ```suggestion
                   self.log.exception("Failed to refresh AWS credentials.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to