ashb commented on a change in pull request #14219:
URL: https://github.com/apache/airflow/pull/14219#discussion_r584772584



##########
File path: airflow/api_connexion/webserver_auth.py
##########
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+from functools import wraps
+from typing import Callable, TypeVar, cast
+
+import jwt
+from flask import Response, current_app, request
+from flask_appbuilder.const import AUTH_LDAP
+from flask_login import login_user
+
+from airflow.configuration import conf
+
+SECRET = conf.get("webserver", "secret_key")
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def encode_auth_token(user_id):
+    """Generate authentication token"""
+    expire_time = conf.getint("webserver", "session_lifetime_minutes")
+    payload = {
+        'exp': datetime.utcnow() + timedelta(minutes=expire_time),
+        'iat': datetime.utcnow(),
+        'sub': user_id,
+    }
+    return jwt.encode(payload, SECRET, algorithm='HS256')
+
+
+def decode_auth_token(auth_token):
+    """Decode authentication token"""
+    try:
+        payload = jwt.decode(auth_token, SECRET, algorithms=['HS256'])
+        return payload['sub']
+    except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
+        # Do not raise exception here due to auth_backend auth
+        return None
+
+
+def auth_current_user():
+    """Checks the authentication and return the current user"""
+    auth_header = request.headers.get("Authorization", None)
+    ab_security_manager = current_app.appbuilder.sm
+    token = None
+    user = None
+    if auth_header:
+        try:
+            token = auth_header.split(" ")[1]
+        except IndexError:
+            # Do not raise exception here due to auth_backend auth
+            return None
+    if auth_header and auth_header.startswith("Basic"):

Review comment:
       Something about the way this is integrated still doesn't sit right with 
me.
   
   This implicitly allows basic auth logins, with no way to disable it, which 
isn't good.
   
   Also the way this interacts with the _other_ auth backend configured in the 
app feels a big clunky :( 

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1381,11 +1381,178 @@ paths:
               schema:
                 $ref: '#/components/schemas/VersionInfo'
 
+  /login:
+    post:
+      summary: User login
+      description: |
+        Verify user and return a user object and JWT token as well
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.user_endpoint
+      operationId: login
+      tags: [User]
+
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/UserLogin'

Review comment:
       UserLogin is only ever going to be used here isn't it? If so just inline 
to object here.

##########
File path: tests/api_connexion/test_webserver_auth.py
##########
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from base64 import b64encode
+from datetime import datetime, timedelta
+
+import jwt
+from flask_login import current_user
+from parameterized import parameterized
+
+from airflow.www.app import create_app
+from tests.test_utils.api_connexion_utils import assert_401
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_pools
+
+
+class TestWebserverAuth(unittest.TestCase):
+    def setUp(self) -> None:
+        with conf_vars({("api", "auth_backend"): 
"tests.test_utils.remote_user_api_auth_backend"}):
+            self.app = create_app(testing=True)
+
+        self.appbuilder = self.app.appbuilder  # pylint: disable=no-member
+        role_admin = self.appbuilder.sm.find_role("Admin")
+        self.tester = self.appbuilder.sm.find_user(username="test")
+        if not self.tester:
+            self.appbuilder.sm.add_user(
+                username="test",
+                first_name="test",
+                last_name="test",
+                email="[email protected]",
+                role=role_admin,
+                password="test",
+            )

Review comment:
       This should be in setupClass

##########
File path: airflow/api_connexion/webserver_auth.py
##########
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+from functools import wraps
+from typing import Callable, TypeVar, cast
+
+import jwt
+from flask import Response, current_app, request
+from flask_appbuilder.const import AUTH_LDAP
+from flask_login import login_user
+
+from airflow.configuration import conf
+
+SECRET = conf.get("webserver", "secret_key")
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def encode_auth_token(user_id):
+    """Generate authentication token"""
+    expire_time = conf.getint("webserver", "session_lifetime_minutes")

Review comment:
       Yeah, having the password change def should invalidate the token -- that 
could only work for "local" (in DB) users I think, but it's still worth doing.

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1381,11 +1381,178 @@ paths:
               schema:
                 $ref: '#/components/schemas/VersionInfo'
 
+  /login:
+    post:
+      summary: User login
+      description: |
+        Verify user and return a user object and JWT token as well
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.user_endpoint
+      operationId: login
+      tags: [User]
+
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/UserLogin'
+        '400':
+          $ref: '#/components/responses/BadRequest'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
 components:
   # Reusable schemas (data models)
   schemas:
     # Database entities
+    User:
+      description: >
+        A user object
+      type: object
+      properties:
+        id:
+          type: string
+          description: The user id
+          readOnly: true
+        first_name:
+          type: string
+          description: The user firstname
+        last_name:
+          type: string
+          description: The user lastname
+        username:
+          type: string
+          description: The username
+        email:
+          type: string
+          description: The user's email
+        active:
+          type: boolean
+          description: Whether the user is active
+        last_login:
+          type: string
+          format: datetime
+          description: The last user login
+          readOnly: true
+        login_count:
+          type: integer
+          description: The login count
+          readOnly: true
+        failed_login_count:
+          type: integer
+          description: The number of times the login failed
+          readOnly: true
+        roles:
+          type: array
+          description: User roles
+          items:
+            $ref: '#/components/schemas/RoleCollectionItem'
+          readOnly: true
+          nullable: true

Review comment:
       Should this just be a list fo Role _names_?
   
   I also don't think this should be nullable.
   
   ✅ `roles: []`
   ❎ `roles: null`

##########
File path: tests/api_connexion/test_webserver_auth.py
##########
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from base64 import b64encode
+from datetime import datetime, timedelta
+
+import jwt
+from flask_login import current_user
+from parameterized import parameterized
+
+from airflow.www.app import create_app
+from tests.test_utils.api_connexion_utils import assert_401
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_pools
+
+
+class TestWebserverAuth(unittest.TestCase):
+    def setUp(self) -> None:
+        with conf_vars({("api", "auth_backend"): 
"tests.test_utils.remote_user_api_auth_backend"}):
+            self.app = create_app(testing=True)
+
+        self.appbuilder = self.app.appbuilder  # pylint: disable=no-member
+        role_admin = self.appbuilder.sm.find_role("Admin")
+        self.tester = self.appbuilder.sm.find_user(username="test")
+        if not self.tester:
+            self.appbuilder.sm.add_user(
+                username="test",
+                first_name="test",
+                last_name="test",
+                email="[email protected]",
+                role=role_admin,
+                password="test",
+            )
+        clear_db_pools()
+
+    def tearDown(self) -> None:
+        clear_db_pools()
+
+    def test_successful_login(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        with self.app.test_client() as test_client:
+            response = test_client.post("api/v1/login", 
headers={"Authorization": token})
+        assert isinstance(response.json["token"], str)
+        assert response.json["user"]['email'] == "[email protected]"
+
+    def test_can_view_other_endpoints(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        with self.app.test_client() as test_client:
+            response = test_client.post("api/v1/login", 
headers={"Authorization": token})
+            assert current_user.email == "[email protected]"
+            token = response.json["token"]

Review comment:
       Ideally you should get the token by calling a python method directly -- 
by calling this endpoint you are "retesting" the login endpoint, which it would 
be better to avoid.

##########
File path: airflow/api_connexion/security.py
##########
@@ -14,24 +14,27 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 from functools import wraps
 from typing import Callable, Optional, Sequence, Tuple, TypeVar, cast
 
 from flask import Response, current_app
 
 from airflow.api_connexion.exceptions import PermissionDenied, Unauthenticated
+from airflow.api_connexion.webserver_auth import requires_authentication
 
 T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
 
 
 def check_authentication() -> None:
     """Checks that the request has valid authorization information."""
     response = current_app.api_auth.requires_authentication(Response)()
-    if response.status_code != 200:
+    jwt_response = requires_authentication(Response)()
+    if response.status_code != 200 and jwt_response.status_code != 200:
         # since this handler only checks authentication, not authorization,
         # we should always return 401
-        raise Unauthenticated(headers=response.headers)
+        if response.status_code != 200:
+            raise Unauthenticated(headers=response.headers)
+        raise Unauthenticated(headers=jwt_response.headers)

Review comment:
       This is a bit complex, and we check multiple auths. We should instead do 
something like:
   
   ```python
       response = current_app.api_auth.requires_authentication(Response)()
       if response.status_code == 200:
           return
      jwt_response = requires_authentication(Response)()
       if response.status_code == 200:
           return
       raise Unauthenticated(headers=response.headers)
   ```
   
   Less nesting, less checking of auth headers that we don't need to do.

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1381,11 +1381,178 @@ paths:
               schema:
                 $ref: '#/components/schemas/VersionInfo'
 
+  /login:
+    post:
+      summary: User login
+      description: |
+        Verify user and return a user object and JWT token as well
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.user_endpoint
+      operationId: login
+      tags: [User]
+
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/UserLogin'
+        '400':
+          $ref: '#/components/responses/BadRequest'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
 
 components:
   # Reusable schemas (data models)
   schemas:
     # Database entities
+    User:
+      description: >
+        A user object
+      type: object
+      properties:
+        id:
+          type: string
+          description: The user id
+          readOnly: true
+        first_name:
+          type: string
+          description: The user firstname
+        last_name:
+          type: string
+          description: The user lastname
+        username:
+          type: string
+          description: The username
+        email:
+          type: string
+          description: The user's email
+        active:
+          type: boolean
+          description: Whether the user is active
+        last_login:
+          type: string
+          format: datetime
+          description: The last user login
+          readOnly: true
+        login_count:
+          type: integer
+          description: The login count
+          readOnly: true
+        failed_login_count:
+          type: integer
+          description: The number of times the login failed
+          readOnly: true
+        roles:
+          type: array
+          description: User roles
+          items:
+            $ref: '#/components/schemas/RoleCollectionItem'
+          readOnly: true
+          nullable: true
+        created_on:
+          type: string
+          format: datetime
+          description: The date user was created
+          readOnly: true
+        changed_on:
+          type: string
+          format: datetime
+          description: The date user was changed
+          readOnly: true
+
+    UserLogin:
+      description: Login item
+      allOf:
+        - $ref: '#/components/schemas/User'
+        - type: object
+          properties:
+            token:
+              type: string
+              nullable: false
+              description: JWT token
+
+    RoleCollectionItem:
+      description: Role collection item
+      type: object
+      properties:
+        id:
+          type: string
+          description: The role ID
+        name:
+          type: string
+          description: The name of the role
+        permissions:
+          type: array
+          items:
+            $ref: '#/components/schemas/PermissionView'
+
+    RoleCollection:
+      description: Role Collections
+      type: object
+      properties:
+        roles:
+          type: array
+          items:
+            $ref: '#/components/schemas/RoleCollectionItem'
+
+    PermissionCollectionItem:
+      description: Permission Collection Item
+      type: object
+      properties:
+        id:
+          type: string
+          description: The permission ID
+        name:
+          type: string
+          description: The name of the permission
+          nullable: false
+
+    PermissionCollection:
+      description: Permission Collection
+      type: object
+      properties:
+        actions:
+          type: array
+          items:
+            $ref: '#/components/schemas/PermissionCollectionItem'
+
+    PermissionView:
+      description: Permission view item
+      type: object
+      properties:
+        id:
+          type: string
+          description: The PermissionView ID
+        action:
+          type: string
+          description: The name of the permission
+        resource:
+          type: string
+          description: The resource name
+
+    ResourceCollectionItem:
+      description: Resource Collection Item
+      type: object
+      properties:
+        id:
+          type: string
+          description: The Resource ID
+        name:
+          type: string
+          description: The name of the Resource
+          nullable: false
+
+    ResourceCollection:

Review comment:
       Where is this used?

##########
File path: tests/api_connexion/test_webserver_auth.py
##########
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from base64 import b64encode
+from datetime import datetime, timedelta
+
+import jwt
+from flask_login import current_user
+from parameterized import parameterized
+
+from airflow.www.app import create_app
+from tests.test_utils.api_connexion_utils import assert_401
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_pools
+
+
+class TestWebserverAuth(unittest.TestCase):
+    def setUp(self) -> None:
+        with conf_vars({("api", "auth_backend"): 
"tests.test_utils.remote_user_api_auth_backend"}):
+            self.app = create_app(testing=True)
+
+        self.appbuilder = self.app.appbuilder  # pylint: disable=no-member
+        role_admin = self.appbuilder.sm.find_role("Admin")
+        self.tester = self.appbuilder.sm.find_user(username="test")
+        if not self.tester:
+            self.appbuilder.sm.add_user(
+                username="test",
+                first_name="test",
+                last_name="test",
+                email="[email protected]",
+                role=role_admin,
+                password="test",
+            )
+        clear_db_pools()
+
+    def tearDown(self) -> None:
+        clear_db_pools()
+
+    def test_successful_login(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        with self.app.test_client() as test_client:
+            response = test_client.post("api/v1/login", 
headers={"Authorization": token})
+        assert isinstance(response.json["token"], str)
+        assert response.json["user"]['email'] == "[email protected]"
+
+    def test_can_view_other_endpoints(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        with self.app.test_client() as test_client:
+            response = test_client.post("api/v1/login", 
headers={"Authorization": token})
+            assert current_user.email == "[email protected]"
+            token = response.json["token"]
+            response2 = test_client.get("/api/v1/pools", 
headers={"Authorization": "Bearer " + token})
+        assert response2.status_code == 200
+        assert response2.json == {
+            "pools": [
+                {
+                    "name": "default_pool",
+                    "slots": 128,
+                    "occupied_slots": 0,
+                    "running_slots": 0,
+                    "queued_slots": 0,
+                    "open_slots": 128,
+                },
+            ],
+            "total_entries": 1,
+        }
+
+    def test_raises_for_the_none_algorithm(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        payload = {
+            'exp': datetime.utcnow() + timedelta(minutes=10),
+            'iat': datetime.utcnow(),
+            'sub': self.tester.id,
+        }
+        forgedtoken = jwt.encode(payload, key=None, algorithm=None).decode()
+        with self.app.test_client() as test_client:
+            test_client.post("api/v1/login", headers={"Authorization": token})
+            assert current_user.email == "[email protected]"

Review comment:
       ```suggestion
   ```
   Not needed I don't think.

##########
File path: tests/api_connexion/test_webserver_auth.py
##########
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from base64 import b64encode
+from datetime import datetime, timedelta
+
+import jwt
+from flask_login import current_user
+from parameterized import parameterized
+
+from airflow.www.app import create_app
+from tests.test_utils.api_connexion_utils import assert_401
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_pools
+
+
+class TestWebserverAuth(unittest.TestCase):
+    def setUp(self) -> None:
+        with conf_vars({("api", "auth_backend"): 
"tests.test_utils.remote_user_api_auth_backend"}):
+            self.app = create_app(testing=True)
+
+        self.appbuilder = self.app.appbuilder  # pylint: disable=no-member
+        role_admin = self.appbuilder.sm.find_role("Admin")
+        self.tester = self.appbuilder.sm.find_user(username="test")
+        if not self.tester:
+            self.appbuilder.sm.add_user(
+                username="test",
+                first_name="test",
+                last_name="test",
+                email="[email protected]",
+                role=role_admin,
+                password="test",
+            )
+        clear_db_pools()
+
+    def tearDown(self) -> None:
+        clear_db_pools()

Review comment:
       I don't think we need to clear pools.

##########
File path: tests/api_connexion/test_webserver_auth.py
##########
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from base64 import b64encode
+from datetime import datetime, timedelta
+
+import jwt
+from flask_login import current_user
+from parameterized import parameterized
+
+from airflow.www.app import create_app
+from tests.test_utils.api_connexion_utils import assert_401
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_pools
+
+
+class TestWebserverAuth(unittest.TestCase):
+    def setUp(self) -> None:
+        with conf_vars({("api", "auth_backend"): 
"tests.test_utils.remote_user_api_auth_backend"}):
+            self.app = create_app(testing=True)
+
+        self.appbuilder = self.app.appbuilder  # pylint: disable=no-member
+        role_admin = self.appbuilder.sm.find_role("Admin")
+        self.tester = self.appbuilder.sm.find_user(username="test")
+        if not self.tester:
+            self.appbuilder.sm.add_user(
+                username="test",
+                first_name="test",
+                last_name="test",
+                email="[email protected]",
+                role=role_admin,
+                password="test",
+            )
+        clear_db_pools()
+
+    def tearDown(self) -> None:
+        clear_db_pools()
+
+    def test_successful_login(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        with self.app.test_client() as test_client:
+            response = test_client.post("api/v1/login", 
headers={"Authorization": token})
+        assert isinstance(response.json["token"], str)
+        assert response.json["user"]['email'] == "[email protected]"
+
+    def test_can_view_other_endpoints(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        with self.app.test_client() as test_client:
+            response = test_client.post("api/v1/login", 
headers={"Authorization": token})
+            assert current_user.email == "[email protected]"
+            token = response.json["token"]
+            response2 = test_client.get("/api/v1/pools", 
headers={"Authorization": "Bearer " + token})
+        assert response2.status_code == 200
+        assert response2.json == {
+            "pools": [
+                {
+                    "name": "default_pool",
+                    "slots": 128,
+                    "occupied_slots": 0,
+                    "running_slots": 0,
+                    "queued_slots": 0,
+                    "open_slots": 128,
+                },
+            ],
+            "total_entries": 1,
+        }
+
+    def test_raises_for_the_none_algorithm(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        payload = {
+            'exp': datetime.utcnow() + timedelta(minutes=10),
+            'iat': datetime.utcnow(),
+            'sub': self.tester.id,
+        }
+        forgedtoken = jwt.encode(payload, key=None, algorithm=None).decode()
+        with self.app.test_client() as test_client:
+            test_client.post("api/v1/login", headers={"Authorization": token})
+            assert current_user.email == "[email protected]"
+            response = test_client.get("/api/v1/pools", 
headers={"Authorization": "Bearer " + forgedtoken})
+        assert response.status_code == 401
+
+    @parameterized.expand(
+        [
+            ("basic",),
+            ("basic ",),
+            ("bearer",),
+            ("test:test",),
+            (b64encode(b"test:test").decode(),),
+            ("bearer ",),
+            ("basic: ",),
+            ("basic 123",),
+        ]
+    )
+    def test_malformed_headers(self, token):
+        with self.app.test_client() as test_client:
+            response = test_client.get("/api/v1/pools", 
headers={"Authorization": token})
+            assert response.status_code == 401

Review comment:
       Is 400 (Client Error) more appropriate here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to