jason810496 commented on code in PR #61833:
URL: https://github.com/apache/airflow/pull/61833#discussion_r2813089003


##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -590,6 +610,29 @@ def _validate_tags(self, _, tags: Collection[str]):
         if tags and any(len(tag) > TAG_MAX_LEN for tag in tags):
             raise ValueError(f"tag cannot be longer than {TAG_MAX_LEN} 
characters")
 
+    @allowed_run_types.validator
+    def _validate_allowed_run_types(self, _, allowed_run_types):
+        if not allowed_run_types:
+            return
+        from airflow.utils.types import DagRunType

Review Comment:
   Maybe import from `airflow.sdk.api.datamodels._generated` instead of 
airflow-core?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1810,6 +1810,18 @@ def _create_dagruns_for_partitioned_asset_dags(self, 
session: Session) -> set[st
                 self.log.error("Dag '%s' not found in serialized_dag table", 
apdr.target_dag_id)
                 continue
 
+            dag_model = session.scalar(select(DagModel).where(DagModel.dag_id 
== apdr.target_dag_id).limit(1))
+            if (
+                dag_model
+                and dag_model.allowed_run_types is not None
+                and DagRunType.ASSET_TRIGGERED.value not in 
dag_model.allowed_run_types
+            ):
+                self.log.warning(
+                    "Dag does not allow asset-triggered runs; skipping",
+                    dag_id=apdr.target_dag_id,
+                )
+                continue

Review Comment:
   It seems we could modularize the check and the warning.



##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -100,6 +100,14 @@ class InvalidBackfillDate(AirflowException):
     """
 
 
+class DeniedDagRunType(AirflowException):

Review Comment:
   Maybe `models.dagrun` or `models.dag` module is a better place for 
`DeniedDagRunType`?



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -402,6 +403,13 @@ def materialize_asset(
             f"More than one DAG materializes asset with ID: {asset_id}",
         )
 
+    dm = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id).limit(1))
+    if dm and dm.allowed_run_types is not None and DagRunType.MANUAL.value not 
in dm.allowed_run_types:

Review Comment:
   Not sure whether we should treat β€Ž`materialize_asset` as 
β€Ž`DagRunType.ASSET_TRIGGERED` or β€Ž`DagRunType.MANUAL` πŸ‘€



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py:
##########
@@ -112,6 +112,15 @@ def trigger_dag_run(
             },
         )
 
+    if dm.allowed_run_types is not None and DagRunType.MANUAL.value not in 
dm.allowed_run_types:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            detail={
+                "reason": "denied_run_type",
+                "message": f"Dag with dag_id '{dag_id}' does not allow manual 
runs",
+            },
+        )
+

Review Comment:
   There’s an edge case where the β€Ž`TriggerDagRunOperator` task will also call 
the β€Ž`trigger_dag_run` route, but we can also choose not to set β€Ž`manual` in 
β€Ž`allowed_run_types` in the same Dag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to