pierrejeambrun commented on code in PR #48174:
URL: https://github.com/apache/airflow/pull/48174#discussion_r2009982034


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+            status.HTTP_500_INTERNAL_SERVER_ERROR,
+        ]
+    ),
+    dependencies=[Depends(action_logging())],

Review Comment:
   We are missing the permissions. Please check other endpoints.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+            status.HTTP_500_INTERNAL_SERVER_ERROR,
+        ]
+    ),
+    dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+    request: DagReserializePostBody,
+    session: SessionDep,  # Add your session dependency
+):
+    """
+    Reserialize DAG bundles in Airflow.
+
+    - **bundle_names**: List of specific bundles to reserialize (all if empty)
+    """
+    try:
+        manager = DagBundlesManager()
+
+        # Getting all bundle names which was retrieved in validation function
+        manager.sync_bundles_to_db()
+        all_bundle_names = set(manager.get_all_bundle_names())
+
+        # Validate bundle names if specified
+        if request.bundle_names:

Review Comment:
   There are utility to fetch query parameters. Please check other endpoints. 
You will be able to do validation and provide default values too.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+            status.HTTP_500_INTERNAL_SERVER_ERROR,
+        ]
+    ),
+    dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+    request: DagReserializePostBody,
+    session: SessionDep,  # Add your session dependency
+):
+    """
+    Reserialize DAG bundles in Airflow.
+
+    - **bundle_names**: List of specific bundles to reserialize (all if empty)
+    """
+    try:
+        manager = DagBundlesManager()
+
+        # Getting all bundle names which was retrieved in validation function
+        manager.sync_bundles_to_db()
+        all_bundle_names = set(manager.get_all_bundle_names())
+
+        # Validate bundle names if specified
+        if request.bundle_names:
+            bundles_to_process = set(request.bundle_names)
+            if len(bundles_to_process - all_bundle_names) > 0:
+                raise HTTPException(
+                    status.HTTP_400_BAD_REQUEST,
+                    f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+                )
+        else:
+            bundles_to_process = all_bundle_names
+
+        file_locations = session.scalars(
+            
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
+        )
+        # Process each bundle
+        parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for 
fileloc in file_locations]
+
+        session.add_all(parsing_requests)
+        return ReserializeResponse(
+            message="DAG bundles reserialized successfully", 
processed_bundles=list(bundles_to_process)
+        )
+    except HTTPException as e:
+        raise e
+
+    except Exception as e:
+        session.rollback()
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail=f"Failed to reserialize DAG bundles: {str(e)}",
+        )

Review Comment:
   This is automatically done by the SessionDependency, can be removed.



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py:
##########
@@ -180,3 +180,16 @@ def latest_dag_version(self) -> DagVersionResponse | None:
         if latest_dag_version is None:
             return latest_dag_version
         return DagVersionResponse.model_validate(latest_dag_version)
+
+
+class ReserializeResponse(BaseModel):
+    """DAG Reserialize serializer for responses."""
+
+    message: str
+    processed_bundles: list[str]

Review Comment:
   You can look for `CollectionResponse` to find them. 
`AssetAliasCollectionResponse`, `AssetResponse`, `DAGResponse`, etc....



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+            status.HTTP_500_INTERNAL_SERVER_ERROR,
+        ]
+    ),
+    dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+    request: DagReserializePostBody,
+    session: SessionDep,  # Add your session dependency
+):
+    """
+    Reserialize DAG bundles in Airflow.
+
+    - **bundle_names**: List of specific bundles to reserialize (all if empty)
+    """
+    try:
+        manager = DagBundlesManager()
+
+        # Getting all bundle names which was retrieved in validation function
+        manager.sync_bundles_to_db()
+        all_bundle_names = set(manager.get_all_bundle_names())
+
+        # Validate bundle names if specified
+        if request.bundle_names:
+            bundles_to_process = set(request.bundle_names)
+            if len(bundles_to_process - all_bundle_names) > 0:
+                raise HTTPException(
+                    status.HTTP_400_BAD_REQUEST,
+                    f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+                )
+        else:
+            bundles_to_process = all_bundle_names
+
+        file_locations = session.scalars(
+            
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
+        )
+        # Process each bundle
+        parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for 
fileloc in file_locations]
+
+        session.add_all(parsing_requests)
+        return ReserializeResponse(
+            message="DAG bundles reserialized successfully", 
processed_bundles=list(bundles_to_process)
+        )
+    except HTTPException as e:
+        raise e
+

Review Comment:
   This achieves nothing. To delete



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+            status.HTTP_500_INTERNAL_SERVER_ERROR,
+        ]
+    ),
+    dependencies=[Depends(action_logging())],
+)
+def reserialize_dags(
+    request: DagReserializePostBody,
+    session: SessionDep,  # Add your session dependency
+):
+    """
+    Reserialize DAG bundles in Airflow.
+
+    - **bundle_names**: List of specific bundles to reserialize (all if empty)
+    """
+    try:
+        manager = DagBundlesManager()
+
+        # Getting all bundle names which was retrieved in validation function
+        manager.sync_bundles_to_db()
+        all_bundle_names = set(manager.get_all_bundle_names())
+
+        # Validate bundle names if specified
+        if request.bundle_names:
+            bundles_to_process = set(request.bundle_names)
+            if len(bundles_to_process - all_bundle_names) > 0:
+                raise HTTPException(
+                    status.HTTP_400_BAD_REQUEST,
+                    f"Invalid bundle name: {bundles_to_process - 
all_bundle_names}",
+                )
+        else:
+            bundles_to_process = all_bundle_names
+
+        file_locations = session.scalars(
+            
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
+        )
+        # Process each bundle
+        parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for 
fileloc in file_locations]

Review Comment:
   I think we need to handle the case where there is no bundle configures in 
the installation. (bundle_name is None), all_bundle_names is empty. And we need 
to reparse all I guess. 



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",

Review Comment:
   Maybe we should change the router, `/parseDagFiles`. 
   
   And the route can just be a post on `/parseDagFiles` (meaning we want to 
reparse multiple files.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+            status.HTTP_500_INTERNAL_SERVER_ERROR,

Review Comment:
   ```suggestion
   ```
   
   This shouldn't be doncumented. That's unexpected and any endpoint can throw 
unexpected 500.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py:
##########
@@ -66,3 +71,63 @@ def reparse_dag_file(
 
     parsing_request = DagPriorityParsingRequest(fileloc=path)
     session.add(parsing_request)
+
+
+@dag_parsing_router.post(
+    "/manage/reserialize",

Review Comment:
   Also it should probably be a `PUT` too. I don't see why PUT for a single 
file and `POST` for multiple.



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py:
##########
@@ -180,3 +180,16 @@ def latest_dag_version(self) -> DagVersionResponse | None:
         if latest_dag_version is None:
             return latest_dag_version
         return DagVersionResponse.model_validate(latest_dag_version)
+
+
+class ReserializeResponse(BaseModel):
+    """DAG Reserialize serializer for responses."""
+
+    message: str
+    processed_bundles: list[str]

Review Comment:
   I think we should unify with other 'list' response type and "Collections". 
You can compare with other collection serializers. 



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py:
##########
@@ -180,3 +180,16 @@ def latest_dag_version(self) -> DagVersionResponse | None:
         if latest_dag_version is None:
             return latest_dag_version
         return DagVersionResponse.model_validate(latest_dag_version)
+
+
+class ReserializeResponse(BaseModel):
+    """DAG Reserialize serializer for responses."""
+
+    message: str
+    processed_bundles: list[str]
+
+
+class DagReserializePostBody(BaseModel):
+    """Dag Serializer for reserialzed bodies."""

Review Comment:
   typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to