ashb commented on code in PR #67878:
URL: https://github.com/apache/airflow/pull/67878#discussion_r3354728349


##########
airflow-core/src/airflow/api_fastapi/auth/dag_processor_token.py:
##########
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Mint and provision the bearer token the DAG processor presents to the API 
server (AIP-92).
+
+The DAG processor parses (and forks) user code, so it must never hold the 
deployment signing key
+or mint its own token. A *trusted* component runs the helpers here -- the 
deployment's provisioning
+step (a Helm init container, a docker-compose init service) or ``airflow 
standalone`` -- mints the
+token and writes it to ``[dag_processor] api_token_path``. The processor only 
ever reads that file
+(re-reading it as it is rotated), so it carries a token without being able to 
forge one.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+from pathlib import Path
+
+from airflow.api_fastapi.auth.tokens import JWTGenerator, get_signing_args
+from airflow.configuration import conf
+
+log = logging.getLogger(__name__)
+
+# The Execution API is task-instance scoped: its ``sub`` is validated as a 
UUID. The DAG processor
+# is not a task instance, so its token carries an all-zero sentinel UUID 
rather than a real id.
+DAG_PROCESSOR_TOKEN_SUBJECT = "00000000-0000-0000-0000-000000000000"

Review Comment:
   `sub=dag-processor` -- this leads to an interesting connudrum. I'd really 
like to be able to tie requests to an Connection or Variable to a specific 
file, which means some kind of token exchange to get a per-file scoped token.
   
   I think the point about `CurrentTIToken` leads to a more sailient design 
question though: is it right to use the Execution API for this. The "Execution" 
part of the API is not really true for dag parsing. I also don't want us to 
have to duplicate things into the /dag-processing/ API (I'm already not happy 
with how much we have to duplicate from the public API to the Execution API, 
doing that a third time makes me sad.
   
   I'm in favour of 2 generally, and probably the `TIClaims` and `TIToken` 
classes are a mistake/overly specific naming. Nothing seems to look at TIClaims 
that I can see.
   
   ```
   $ rg 'token\.' airflow-core/src/airflow/api_fastapi/execution_api
   airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
   130:    parent_ti = session.get(TaskInstance, token.id)
   
   airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
   55:        token.id,
   
   airflow-core/src/airflow/api_fastapi/execution_api/routes/connections.py
   38:        token.id,
   
   airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py
   47:        token.id,
   
   airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
   318:    if token.claims.scope == "workload":
   
   airflow-core/src/airflow/api_fastapi/execution_api/datamodels/token.py
   32:    Validated JWT claims for a task identity token.
   
   airflow-core/src/airflow/api_fastapi/execution_api/security.py
   97:    dedup or Cadwyn replays) return the cached token.
   160:    token_scope = token.claims.scope
   187:        if str(token.id) != ti_self_id:
   239:        return await session.scalar(_team_name_for_ti_stmt(token.id))
   ```
   
   Also the "per-team connection" in exec API currently wouldn't work anyway as 
the team is looked up from task_instance -> dag_model -> dag_bundle  -> 
dag_bundle.teams, so that wouldn't work with a fake uuid anyway. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to