akhilesharora commented on code in PR #63020:
URL: https://github.com/apache/airflow/pull/63020#discussion_r3078729456


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1171,6 +1173,100 @@ def trigger_reentry(self, context: Context, event: 
dict[str, Any]) -> Any:
             ) as self.config_file:
                 return super().trigger_reentry(context, event)
 
+    def invoke_defer_method(
+        self, last_log_time: DateTime | None = None, context: Context | None = 
None
+    ) -> None:
+        """
+        Override to generate a token-based kubeconfig for the triggerer.
+
+        The base KubernetesPodOperator.invoke_defer_method() calls 
convert_config_file_to_dict()
+        which reads the kubeconfig file into a dict. For EKS, this kubeconfig 
contains an exec
+        block that references a temp file with AWS credentials. This temp file 
only exists on
+        the worker and is deleted when the context managers exit.
+
+        When the trigger is serialized and sent to the triggerer (which runs 
on a different host),
+        the exec block tries to source a file that doesn't exist, causing 401 
Unauthorized errors.
+
+        This override generates a kubeconfig with an embedded bearer token 
instead of an exec
+        block, allowing the config to work on the triggerer without requiring 
local temp files.
+        """

Review Comment:
   Done



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1171,6 +1173,100 @@ def trigger_reentry(self, context: Context, event: 
dict[str, Any]) -> Any:
             ) as self.config_file:
                 return super().trigger_reentry(context, event)
 
+    def invoke_defer_method(
+        self, last_log_time: DateTime | None = None, context: Context | None = 
None
+    ) -> None:
+        """
+        Override to generate a token-based kubeconfig for the triggerer.
+
+        The base KubernetesPodOperator.invoke_defer_method() calls 
convert_config_file_to_dict()
+        which reads the kubeconfig file into a dict. For EKS, this kubeconfig 
contains an exec
+        block that references a temp file with AWS credentials. This temp file 
only exists on
+        the worker and is deleted when the context managers exit.
+
+        When the trigger is serialized and sent to the triggerer (which runs 
on a different host),
+        the exec block tries to source a file that doesn't exist, causing 401 
Unauthorized errors.
+
+        This override generates a kubeconfig with an embedded bearer token 
instead of an exec
+        block, allowing the config to work on the triggerer without requiring 
local temp files.
+        """
+        eks_hook = EksHook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region,
+        )
+
+        # Generate a kubeconfig dict with an embedded token (no exec block)
+        self._config_dict = eks_hook.generate_config_dict_for_deferral(
+            eks_cluster_name=self.cluster_name,
+            pod_namespace=self.namespace,
+        )
+
+        # Now call the parent's invoke_defer_method, but skip 
convert_config_file_to_dict
+        # since we've already set self._config_dict
+        # We need to replicate the parent logic but use our config_dict
+        import datetime
+
+        from airflow.providers.cncf.kubernetes.triggers.pod import 
ContainerState, KubernetesPodTrigger
+        from airflow.providers.common.compat.sdk import 
AirflowNotFoundException, BaseHook

Review Comment:
   Moved to module level, matches the parent class.



##########
providers/amazon/tests/unit/amazon/aws/hooks/test_eks.py:
##########
@@ -1273,6 +1273,126 @@ def test_generate_config_file(self, mock_conn, 
aws_conn_id, region_name, expecte
             if expected_region_args:
                 assert expected_region_args in command_arg
 
+    @mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn")
+    
@mock.patch("airflow.providers.amazon.aws.utils.eks_get_token.fetch_access_token_for_cluster")
+    def test_generate_config_dict_for_deferral(self, mock_fetch_token, 
mock_conn):
+        """Test that generate_config_dict_for_deferral creates a config with 
embedded token.
+
+        This test verifies that the method generates a kubeconfig dict with a 
bearer token
+        embedded directly (instead of an exec block that references temp 
files), allowing
+        the config to be serialized and used on the triggerer.

Review Comment:
   Already single line, no change needed.



##########
providers/amazon/tests/unit/amazon/aws/operators/test_eks.py:
##########
@@ -1116,3 +1116,92 @@ def 
test_refresh_cached_properties_raises_when_no_credentials(
 
         # Verify super()._refresh_cached_properties() was NOT called since we 
raised
         mock_super_refresh.assert_not_called()
+
+    
@mock.patch("airflow.providers.amazon.aws.operators.eks.EksPodOperator.defer")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.generate_config_dict_for_deferral")
+    @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__", 
return_value=None)
+    def test_invoke_defer_method_generates_token_based_config(
+        self,
+        mock_eks_hook,
+        mock_generate_config_dict,
+        mock_defer,
+    ):
+        """Test that invoke_defer_method generates a token-based config dict 
for the triggerer.
+
+        This test verifies that EksPodOperator.invoke_defer_method() generates 
a kubeconfig
+        with an embedded bearer token (instead of an exec block with temp file 
references)
+        so that the triggerer can authenticate without requiring files that 
only exist on the worker.
+        """

Review Comment:
   Already single line, no change needed.



##########
providers/amazon/newsfragments/63020.bugfix.rst:
##########
@@ -0,0 +1 @@
+Fix EksPodOperator deferrable mode failing on remote triggerers with 401 
Unauthorized by embedding bearer token in kubeconfig instead of using exec 
block with temp file references

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to