This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 3767-add-jwt-authentication-to-python-client
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3767-add-jwt-authentication-to-python-client by this push:
     new e215670ed4 Add jwt credentials
e215670ed4 is described below

commit e215670ed4eb976becb1a0cb0bde80f2882179f4
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Sep 9 10:57:46 2025 +0200

    Add jwt credentials
---
 .../streampipes/client/credential_provider.py      | 36 +++++++++++++++
 .../tests/client/test_credential_provider.py       | 52 +++++++++++++++++-----
 2 files changed, 78 insertions(+), 10 deletions(-)

diff --git 
a/streampipes-client-python/streampipes/client/credential_provider.py 
b/streampipes-client-python/streampipes/client/credential_provider.py
index f2fd843b4d..edac71dde8 100644
--- a/streampipes-client-python/streampipes/client/credential_provider.py
+++ b/streampipes-client-python/streampipes/client/credential_provider.py
@@ -30,6 +30,7 @@ from typing import Dict, Optional
 __all__ = [
     "CredentialProvider",
     "StreamPipesApiKeyCredentials",
+    "StreamPipesTokenCredentials",
 ]
 
 from typing_extensions import deprecated
@@ -211,3 +212,38 @@ class StreamPipesApiKeyCredentials(CredentialProvider):
             "X-API-User": user,
             "X-API-Key": token,
         }
+
+
+class StreamPipesTokenCredentials(CredentialProvider):
+    """A credential provider that allows authentication via a JSON Web Token 
(JWT).
+
+    Parameters
+    ----------
+    jwt: str
+        The JSON Web Token to be used for authenticating API requests.
+        This token must include the required claims as issued by StreamPipes.
+    """
+
+    def __init__(self, jwt: str):
+        self.jwt = jwt
+
+    def update_token(self, jwt: str):
+        """Update the stored JWT token.
+
+        Parameters
+        ----------
+        jwt: str
+            The new JSON Web Token to replace the existing one.
+        """
+        self.jwt = jwt
+
+    @property
+    def _authentication_headers(self) -> Dict[str, str]:
+        """Provides the HTTP headers used for authentication with the JWT 
token.
+
+        Returns
+        -------
+        dict
+            A dictionary containing the `Authorization` header with the JWT.
+        """
+        return {"Authorization": self.jwt}
diff --git a/streampipes-client-python/tests/client/test_credential_provider.py 
b/streampipes-client-python/tests/client/test_credential_provider.py
index 57a7b975c4..ef03f259f8 100644
--- a/streampipes-client-python/tests/client/test_credential_provider.py
+++ b/streampipes-client-python/tests/client/test_credential_provider.py
@@ -17,14 +17,15 @@
 import os
 from unittest import TestCase
 
-from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
+from streampipes.client.credential_provider import (
+    StreamPipesApiKeyCredentials,
+    StreamPipesTokenCredentials,
+)
 
 
 class TestStreamPipesApiKeyCredentials(TestCase):
-
     @staticmethod
     def _clear_envs():
-
         if StreamPipesApiKeyCredentials._ENV_KEY_API in os.environ.keys():
             del os.environ[StreamPipesApiKeyCredentials._ENV_KEY_API]
 
@@ -51,7 +52,6 @@ class TestStreamPipesApiKeyCredentials(TestCase):
         self.assertEqual("api-key", credentials.api_key)
 
     def test_pass_credentials_envs_set(self):
-
         os.environ[StreamPipesApiKeyCredentials._ENV_KEY_API] = 
"another-api-key"
         os.environ[StreamPipesApiKeyCredentials._ENV_KEY_USERNAME] = 
"another-user-name"
 
@@ -61,7 +61,6 @@ class TestStreamPipesApiKeyCredentials(TestCase):
         self.assertEqual("api-key", credentials.api_key)
 
     def test_pass_username(self):
-
         self._clear_envs()
         os.environ[StreamPipesApiKeyCredentials._ENV_KEY_API] = 
"another-api-key"
 
@@ -71,14 +70,12 @@ class TestStreamPipesApiKeyCredentials(TestCase):
         self.assertEqual("another-api-key", credentials.api_key)
 
     def test_pass_username_api_key_not_set(self):
-
         self._clear_envs()
 
         with self.assertRaises(AttributeError):
             StreamPipesApiKeyCredentials(username="username")
 
     def test_pass_api_key(self):
-
         self._clear_envs()
         os.environ[StreamPipesApiKeyCredentials._ENV_KEY_USERNAME] = 
"another-username"
 
@@ -88,21 +85,18 @@ class TestStreamPipesApiKeyCredentials(TestCase):
         self.assertEqual("api-key", credentials.api_key)
 
     def test_pass_api_key_username_not_set(self):
-
         self._clear_envs()
 
         with self.assertRaises(AttributeError):
             StreamPipesApiKeyCredentials(api_key="api-key")
 
     def test_nothing_set(self):
-
         self._clear_envs()
 
         with self.assertRaises(AttributeError):
             StreamPipesApiKeyCredentials()
 
     def test_all_from_envs(self):
-
         os.environ[StreamPipesApiKeyCredentials._ENV_KEY_API] = 
"another-api-key"
         os.environ[StreamPipesApiKeyCredentials._ENV_KEY_USERNAME] = 
"another-username"
 
@@ -110,3 +104,41 @@ class TestStreamPipesApiKeyCredentials(TestCase):
 
         self.assertEqual("another-username", credentials.username)
         self.assertEqual("another-api-key", credentials.api_key)
+
+    def test_initialization_sets_jwt(self):
+        token = "Bearer test-token-123"
+        credentials = StreamPipesTokenCredentials(jwt=token)
+
+        self.assertEqual(credentials.jwt, token)
+
+    def test_update_token_changes_jwt(self):
+        token1 = "Bearer old-token"
+        token2 = "Bearer new-token"
+
+        credentials = StreamPipesTokenCredentials(jwt=token1)
+        credentials.update_token(token2)
+
+        self.assertEqual(credentials.jwt, token2)
+        self.assertNotEqual(credentials.jwt, token1)
+
+    def test_authentication_headers_returns_correct_dict(self):
+        token = "Bearer test-token-456"
+        credentials = StreamPipesTokenCredentials(jwt=token)
+
+        headers = credentials._authentication_headers
+
+        self.assertIsInstance(headers, dict)
+        self.assertIn("Authorization", headers)
+        self.assertEqual(headers["Authorization"], token)
+
+    def test_multiple_instances_do_not_share_state(self):
+        token1 = "Bearer token-1"
+        token2 = "Bearer token-2"
+
+        credentials1 = StreamPipesTokenCredentials(jwt=token1)
+        credentials2 = StreamPipesTokenCredentials(jwt=token2)
+
+        self.assertEqual(credentials1.jwt, token1)
+        self.assertEqual(credentials2.jwt, token2)
+        
self.assertEqual(credentials1._authentication_headers["Authorization"], token1)
+        
self.assertEqual(credentials2._authentication_headers["Authorization"], token2)

Reply via email to