aviralgarg05 commented on code in PR #58743:
URL: https://github.com/apache/airflow/pull/58743#discussion_r2593238101
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1089,13 +1099,81 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
eks_cluster_name = event["eks_cluster_name"]
pod_namespace = event["namespace"]
session = eks_hook.get_session()
- credentials = session.get_credentials().get_frozen_credentials()
+ credentials_obj = session.get_credentials()
+ if credentials_obj is None:
+ raise AirflowException(
+ "Unable to retrieve AWS credentials. Credentials may have
expired or not been configured. "
+ "Please check your AWS connection configuration."
+ )
+ credentials = credentials_obj.get_frozen_credentials()
with eks_hook._secure_credential_context(
credentials.access_key, credentials.secret_key, credentials.token
) as credentials_file:
+ # Store credentials file path for potential refresh during
long-running tasks
+ self._credentials_file_path = credentials_file
with eks_hook.generate_config_file(
eks_cluster_name=eks_cluster_name,
pod_namespace=pod_namespace,
credentials_file=credentials_file,
) as self.config_file:
return super().trigger_reentry(context, event)
+
+ def _write_credentials_to_file(
+ self, credentials_file_path: str, access_key: str, secret_key: str,
session_token: str | None
+ ) -> None:
+ """
+ Write AWS credentials to an existing credentials file.
+
+ This overwrites the contents of the credentials file with fresh
credentials,
+ which allows the kubeconfig exec credential plugin to use new
credentials
+ without regenerating the entire kubeconfig.
+
+ :param credentials_file_path: Path to the credentials file to update
+ :param access_key: AWS access key ID
+ :param secret_key: AWS secret access key
+ :param session_token: AWS session token (optional)
+ """
+ with open(credentials_file_path, "w") as f:
+ f.write(f"export AWS_ACCESS_KEY_ID='{access_key}'\n")
+ f.write(f"export AWS_SECRET_ACCESS_KEY='{secret_key}'\n")
+ if session_token:
+ f.write(f"export AWS_SESSION_TOKEN='{session_token}'\n")
+
+ def _refresh_cached_properties(self) -> None:
+ """
+ Refresh cached properties including AWS credentials.
+
+ This override ensures that when Kubernetes credentials expire (401
error),
+ we refresh the AWS credentials file before recreating the Kubernetes
clients.
+ This is necessary because EKS uses an exec credential plugin that reads
+ credentials from the temporary file created during execute().
Review Comment:
The 401 error handling occurs in
[KubernetesPodOperator._handle_api_exception()] at line 794 in [pod.py]. When
an ApiException with status 401 is caught, it logs a warning and calls
_refresh_cached_properties(). This is where the EksPodOperator override
refreshes the AWS credentials file before the parent class recreates the
Kubernetes clients.
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1089,13 +1099,81 @@
eks_cluster_name = event["eks_cluster_name"]
pod_namespace = event["namespace"]
session = eks_hook.get_session()
- credentials = session.get_credentials().get_frozen_credentials()
+ credentials_obj = session.get_credentials()
+ if credentials_obj is None:
+ raise AirflowException(
+ "Unable to retrieve AWS credentials. Credentials may have
expired or not been configured. "
+ "Please check your AWS connection configuration."
+ )
+ credentials = credentials_obj.get_frozen_credentials()
with eks_hook._secure_credential_context(
credentials.access_key, credentials.secret_key, credentials.token
) as credentials_file:
+ # Store credentials file path for potential refresh during
long-running tasks
+ self._credentials_file_path = credentials_file
with eks_hook.generate_config_file(
eks_cluster_name=eks_cluster_name,
pod_namespace=pod_namespace,
credentials_file=credentials_file,
) as self.config_file:
return super().trigger_reentry(context, event)
+
+ def _write_credentials_to_file(
+ self, credentials_file_path: str, access_key: str, secret_key: str,
session_token: str | None
+ ) -> None:
+ """
+ Write AWS credentials to an existing credentials file.
+
+ This overwrites the contents of the credentials file with fresh
credentials,
+ which allows the kubeconfig exec credential plugin to use new
credentials
+ without regenerating the entire kubeconfig.
+
+ :param credentials_file_path: Path to the credentials file to update
+ :param access_key: AWS access key ID
+ :param secret_key: AWS secret access key
+ :param session_token: AWS session token (optional)
+ """
+ with open(credentials_file_path, "w") as f:
+ f.write(f"export AWS_ACCESS_KEY_ID='{access_key}'\n")
+ f.write(f"export AWS_SECRET_ACCESS_KEY='{secret_key}'\n")
Review Comment:
I have fixed this issue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]