alexott commented on code in PR #61458:
URL: https://github.com/apache/airflow/pull/61458#discussion_r2803037213


##########
providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py:
##########
@@ -525,6 +544,360 @@ def _is_oauth_token_valid(token: dict, 
time_key="expires_on") -> bool:
 
         return int(token[time_key]) > (int(time.time()) + 
TOKEN_REFRESH_LEAD_TIME)
 
+    def _get_k8s_jwt_token(self) -> str:
+        """
+        Get JWT token from Kubernetes.
+
+        Supports two methods:
+        1. Projected volume: reads token directly from configured path
+        2. TokenRequest API: dynamically requests token from K8s API
+
+        :return: JWT Service Account token string
+        """
+        if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+            self.log.info("Using Kubernetes projected volume token")
+            return self._get_k8s_projected_volume_token()
+
+        self.log.info("Using Kubernetes TokenRequest API")
+        return self._get_k8s_token_request_api()
+
+    async def _a_get_k8s_jwt_token(self) -> str:
+        """Async version of _get_k8s_jwt_token()."""
+        if "k8s_projected_volume_token_path" in 
self.databricks_conn.extra_dejson:
+            self.log.info("Using Kubernetes projected volume token")
+            return await self._a_get_k8s_projected_volume_token()
+
+        self.log.info("Using Kubernetes TokenRequest API")
+        return await self._a_get_k8s_token_request_api()
+
+    def _get_k8s_projected_volume_token(self) -> str:
+        """
+        Get JWT token from Kubernetes projected volume.
+
+        Reads a pre-configured service account token from a projected volume.
+        The token should be configured in the Pod spec with the desired 
audience
+        and expiration settings.
+
+        :return: JWT Service Account token string
+        """
+        projected_token_path: str = 
self.databricks_conn.extra_dejson["k8s_projected_volume_token_path"]
+
+        try:
+            with open(projected_token_path) as f:
+                token = f.read().strip()
+
+            if not token:
+                raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+            self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+            return token
+        except FileNotFoundError as e:
+            raise AirflowException(
+                f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+                "Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+            ) from e
+        except PermissionError as e:
+            raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+    async def _a_get_k8s_projected_volume_token(self) -> str:
+        """Async version of _get_k8s_projected_volume_token()."""
+        projected_token_path: str = 
self.databricks_conn.extra_dejson["k8s_projected_volume_token_path"]
+
+        try:
+            async with aiofiles.open(projected_token_path) as f:
+                token = (await f.read()).strip()
+
+            if not token:
+                raise AirflowException(f"Token file at {projected_token_path} 
is empty")
+
+            self.log.debug("Successfully read token from projected volume at 
%s", projected_token_path)
+            return token
+        except FileNotFoundError as e:
+            raise AirflowException(
+                f"Kubernetes projected volume token not found at 
{projected_token_path}. "
+                "Ensure your Pod has a projected volume configured with 
serviceAccountToken."
+            ) from e
+        except PermissionError as e:
+            raise AirflowException(f"Permission denied reading token from 
{projected_token_path}") from e
+
+    @staticmethod
+    def _build_k8s_token_request_payload(audience: str, expiration_seconds: 
int) -> dict[str, Any]:
+        """
+        Build the JSON payload for Kubernetes TokenRequest API.
+
+        :param audience: The audience value for the JWT token
+        :param expiration_seconds: Token expiration in seconds
+        :return: TokenRequest API payload dictionary
+        """
+        return {
+            "apiVersion": "authentication.k8s.io/v1",
+            "kind": "TokenRequest",
+            "spec": {
+                "audiences": [audience],
+                "expirationSeconds": expiration_seconds,
+            },
+        }
+
+    def _get_k8s_token_request_api(self) -> str:
+        """
+        Get JWT token using Kubernetes TokenRequest API.
+
+        Dynamically requests a service account token from the Kubernetes API 
server
+        with custom audience and expiration settings.
+
+        :return: JWT Service Account token string
+        """
+        audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+        expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+        token_path = self.databricks_conn.extra_dejson.get(
+            "k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+        )
+        namespace_path = self.databricks_conn.extra_dejson.get(
+            "k8s_namespace_path", DEFAULT_K8S_NAMESPACE_PATH
+        )
+
+        try:
+            with open(token_path) as f:
+                in_cluster_token = f.read().strip()
+
+            with open(namespace_path) as f:
+                namespace = f.read().strip()
+
+            # Call Kubernetes TokenRequest API with the in-cluster token
+            token_request_url = (
+                
f"{K8S_TOKEN_SERVICE_URL}/api/v1/namespaces/{namespace}/serviceaccounts/default/token"
+            )
+
+            for attempt in self._get_retry_object():
+                with attempt:
+                    resp = requests.post(
+                        token_request_url,
+                        headers={
+                            "Authorization": f"Bearer {in_cluster_token}",
+                            "Content-Type": "application/json",
+                        },
+                        json=self._build_k8s_token_request_payload(audience, 
expiration_seconds),
+                        verify=False,  # K8s in-cluster uses self-signed certs
+                        timeout=self.token_timeout_seconds,
+                    )
+                    resp.raise_for_status()
+                    try:
+                        response_json = resp.json()
+                        return response_json["status"]["token"]
+                    except (JSONDecodeError, ValueError) as e:
+                        raise AirflowException(
+                            f"Invalid JSON response from Kubernetes API. 
Response: {resp.text[:500]}"
+                        ) from e
+                    except KeyError as e:
+                        raise AirflowException(
+                            f"Malformed Kubernetes token response: missing key 
'{e}'. "
+                            f"Response: {resp.text[:500]}"
+                        ) from e
+        except FileNotFoundError as e:
+            raise AirflowException(
+                "Kubernetes service account token not found. "
+                "This authentication method only works when running inside a 
Kubernetes cluster."
+            ) from e
+        except RetryError:
+            raise AirflowException(
+                f"Failed to get Kubernetes JWT token after {self.retry_limit} 
retries. Giving up."
+            )
+        except requests_exceptions.HTTPError as e:
+            msg = f"Failed to get Kubernetes JWT token. Response: 
{e.response.content.decode()}, Status Code: {e.response.status_code}"
+            raise AirflowException(msg)
+
+        raise RuntimeError("Failed to get JWT token")
+
+    async def _a_get_k8s_token_request_api(self) -> str:
+        """Async version of _get_k8s_token_request_api()."""
+        audience = self.databricks_conn.extra_dejson.get("audience", 
DEFAULT_K8S_AUDIENCE)
+        expiration_seconds = 
self.databricks_conn.extra_dejson.get("expiration_seconds", 3600)
+        token_path = self.databricks_conn.extra_dejson.get(
+            "k8s_token_path", DEFAULT_K8S_SERVICE_ACCOUNT_TOKEN_PATH
+        )
+        namespace_path = self.databricks_conn.extra_dejson.get(
+            "k8s_namespace_path", DEFAULT_K8S_NAMESPACE_PATH
+        )
+
+        try:
+            async with aiofiles.open(token_path) as f:
+                in_cluster_token = (await f.read()).strip()
+
+            async with aiofiles.open(namespace_path) as f:
+                namespace = (await f.read()).strip()
+
+            # Call Kubernetes TokenRequest API with the in-cluster token
+            token_request_url = (
+                
f"{K8S_TOKEN_SERVICE_URL}/api/v1/namespaces/{namespace}/serviceaccounts/default/token"
+            )
+
+            async for attempt in self._a_get_retry_object():
+                with attempt:
+                    async with self._session.post(
+                        token_request_url,
+                        headers={
+                            "Authorization": f"Bearer {in_cluster_token}",
+                            "Content-Type": "application/json",
+                        },
+                        json=self._build_k8s_token_request_payload(audience, 
expiration_seconds),
+                        ssl=False,  # K8s in-cluster uses self-signed certs
+                        timeout=self.token_timeout_seconds,
+                    ) as resp:
+                        resp.raise_for_status()
+                        try:
+                            jsn = await resp.json()
+                            return jsn["status"]["token"]
+                        except (aiohttp.ContentTypeError, ValueError) as e:
+                            # Try to read response text if JSON parsing failed
+                            try:
+                                response_text = await resp.text()
+                            except Exception:
+                                response_text = "unable to read response"
+                            raise AirflowException(
+                                f"Invalid JSON response from Kubernetes API. 
Response: {response_text[:500]}"
+                            ) from e
+                        except KeyError as e:
+                            # Response body already consumed, use the parsed 
JSON for error message
+                            raise AirflowException(
+                                f"Malformed Kubernetes token response: missing 
key '{e}'. "
+                                f"Response structure: {jsn}"
+                            ) from e
+        except FileNotFoundError as e:
+            raise AirflowException(
+                "Kubernetes service account token not found. "
+                "This authentication method only works when running inside a 
Kubernetes cluster."
+            ) from e
+        except RetryError:
+            raise AirflowException(
+                f"Failed to get Kubernetes JWT token after {self.retry_limit} 
retries. Giving up."
+            )
+        except aiohttp.ClientResponseError as err:
+            raise AirflowException(
+                f"Failed to get Kubernetes JWT token. Response: {err.message}, 
Status Code: {err.status}"
+            )
+        raise RuntimeError("Failed to get JWT token")
+
+    def _get_required_client_id(self) -> str:
+        """
+        Get and validate client_id for Kubernetes OIDC token federation.
+
+        :return: Service principal client ID
+        :raises AirflowException: If client_id is not provided
+        """
+        client_id = self.databricks_conn.extra_dejson.get("client_id", None)

Review Comment:
   Do we really need `None` as a default value? Normal Python `dict.get` 
returns None if there is no entry



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to