pierrejeambrun commented on code in PR #48174: URL: https://github.com/apache/airflow/pull/48174#discussion_r2009982034
########## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py: ########## @@ -66,3 +71,63 @@ def reparse_dag_file( parsing_request = DagPriorityParsingRequest(fileloc=path) session.add(parsing_request) + + +@dag_parsing_router.post( + "/manage/reserialize", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + status.HTTP_500_INTERNAL_SERVER_ERROR, + ] + ), + dependencies=[Depends(action_logging())], Review Comment: We are missing the permissions. Please check other endpoints. ########## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py: ########## @@ -66,3 +71,63 @@ def reparse_dag_file( parsing_request = DagPriorityParsingRequest(fileloc=path) session.add(parsing_request) + + +@dag_parsing_router.post( + "/manage/reserialize", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + status.HTTP_500_INTERNAL_SERVER_ERROR, + ] + ), + dependencies=[Depends(action_logging())], +) +def reserialize_dags( + request: DagReserializePostBody, + session: SessionDep, # Add your session dependency +): + """ + Reserialize DAG bundles in Airflow. + + - **bundle_names**: List of specific bundles to reserialize (all if empty) + """ + try: + manager = DagBundlesManager() + + # Getting all bundle names which was retrieved in validation function + manager.sync_bundles_to_db() + all_bundle_names = set(manager.get_all_bundle_names()) + + # Validate bundle names if specified + if request.bundle_names: Review Comment: There are utility to fetch query parameters. Please check other endpoints. You will be able to do validation and provide default values too. ########## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py: ########## @@ -66,3 +71,63 @@ def reparse_dag_file( parsing_request = DagPriorityParsingRequest(fileloc=path) session.add(parsing_request) + + +@dag_parsing_router.post( + "/manage/reserialize", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + status.HTTP_500_INTERNAL_SERVER_ERROR, + ] + ), + dependencies=[Depends(action_logging())], +) +def reserialize_dags( + request: DagReserializePostBody, + session: SessionDep, # Add your session dependency +): + """ + Reserialize DAG bundles in Airflow. + + - **bundle_names**: List of specific bundles to reserialize (all if empty) + """ + try: + manager = DagBundlesManager() + + # Getting all bundle names which was retrieved in validation function + manager.sync_bundles_to_db() + all_bundle_names = set(manager.get_all_bundle_names()) + + # Validate bundle names if specified + if request.bundle_names: + bundles_to_process = set(request.bundle_names) + if len(bundles_to_process - all_bundle_names) > 0: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Invalid bundle name: {bundles_to_process - all_bundle_names}", + ) + else: + bundles_to_process = all_bundle_names + + file_locations = session.scalars( + select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process))) + ) + # Process each bundle + parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for fileloc in file_locations] + + session.add_all(parsing_requests) + return ReserializeResponse( + message="DAG bundles reserialized successfully", processed_bundles=list(bundles_to_process) + ) + except HTTPException as e: + raise e + + except Exception as e: + session.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to reserialize DAG bundles: {str(e)}", + ) Review Comment: This is automatically done by the SessionDependency, can be removed. ########## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py: ########## @@ -180,3 +180,16 @@ def latest_dag_version(self) -> DagVersionResponse | None: if latest_dag_version is None: return latest_dag_version return DagVersionResponse.model_validate(latest_dag_version) + + +class ReserializeResponse(BaseModel): + """DAG Reserialize serializer for responses.""" + + message: str + processed_bundles: list[str] Review Comment: You can look for `CollectionResponse` to find them. `AssetAliasCollectionResponse`, `AssetResponse`, `DAGResponse`, etc.... ########## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py: ########## @@ -66,3 +71,63 @@ def reparse_dag_file( parsing_request = DagPriorityParsingRequest(fileloc=path) session.add(parsing_request) + + +@dag_parsing_router.post( + "/manage/reserialize", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + status.HTTP_500_INTERNAL_SERVER_ERROR, + ] + ), + dependencies=[Depends(action_logging())], +) +def reserialize_dags( + request: DagReserializePostBody, + session: SessionDep, # Add your session dependency +): + """ + Reserialize DAG bundles in Airflow. + + - **bundle_names**: List of specific bundles to reserialize (all if empty) + """ + try: + manager = DagBundlesManager() + + # Getting all bundle names which was retrieved in validation function + manager.sync_bundles_to_db() + all_bundle_names = set(manager.get_all_bundle_names()) + + # Validate bundle names if specified + if request.bundle_names: + bundles_to_process = set(request.bundle_names) + if len(bundles_to_process - all_bundle_names) > 0: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Invalid bundle name: {bundles_to_process - all_bundle_names}", + ) + else: + bundles_to_process = all_bundle_names + + file_locations = session.scalars( + select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process))) + ) + # Process each bundle + parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for fileloc in file_locations] + + session.add_all(parsing_requests) + return ReserializeResponse( + message="DAG bundles reserialized successfully", processed_bundles=list(bundles_to_process) + ) + except HTTPException as e: + raise e + Review Comment: This achieves nothing. To delete ########## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py: ########## @@ -66,3 +71,63 @@ def reparse_dag_file( parsing_request = DagPriorityParsingRequest(fileloc=path) session.add(parsing_request) + + +@dag_parsing_router.post( + "/manage/reserialize", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + status.HTTP_500_INTERNAL_SERVER_ERROR, + ] + ), + dependencies=[Depends(action_logging())], +) +def reserialize_dags( + request: DagReserializePostBody, + session: SessionDep, # Add your session dependency +): + """ + Reserialize DAG bundles in Airflow. + + - **bundle_names**: List of specific bundles to reserialize (all if empty) + """ + try: + manager = DagBundlesManager() + + # Getting all bundle names which was retrieved in validation function + manager.sync_bundles_to_db() + all_bundle_names = set(manager.get_all_bundle_names()) + + # Validate bundle names if specified + if request.bundle_names: + bundles_to_process = set(request.bundle_names) + if len(bundles_to_process - all_bundle_names) > 0: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Invalid bundle name: {bundles_to_process - all_bundle_names}", + ) + else: + bundles_to_process = all_bundle_names + + file_locations = session.scalars( + select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process))) + ) + # Process each bundle + parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for fileloc in file_locations] Review Comment: I think we need to handle the case where there is no bundle configures in the installation. (bundle_name is None), all_bundle_names is empty. And we need to reparse all I guess. ########## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py: ########## @@ -66,3 +71,63 @@ def reparse_dag_file( parsing_request = DagPriorityParsingRequest(fileloc=path) session.add(parsing_request) + + +@dag_parsing_router.post( + "/manage/reserialize", Review Comment: Maybe we should change the router, `/parseDagFiles`. And the route can just be a post on `/parseDagFiles` (meaning we want to reparse multiple files. ########## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py: ########## @@ -66,3 +71,63 @@ def reparse_dag_file( parsing_request = DagPriorityParsingRequest(fileloc=path) session.add(parsing_request) + + +@dag_parsing_router.post( + "/manage/reserialize", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + status.HTTP_500_INTERNAL_SERVER_ERROR, Review Comment: ```suggestion ``` This shouldn't be doncumented. That's unexpected and any endpoint can throw unexpected 500. ########## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_parsing.py: ########## @@ -66,3 +71,63 @@ def reparse_dag_file( parsing_request = DagPriorityParsingRequest(fileloc=path) session.add(parsing_request) + + +@dag_parsing_router.post( + "/manage/reserialize", Review Comment: Also it should probably be a `PUT` too. I don't see why PUT for a single file and `POST` for multiple. ########## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py: ########## @@ -180,3 +180,16 @@ def latest_dag_version(self) -> DagVersionResponse | None: if latest_dag_version is None: return latest_dag_version return DagVersionResponse.model_validate(latest_dag_version) + + +class ReserializeResponse(BaseModel): + """DAG Reserialize serializer for responses.""" + + message: str + processed_bundles: list[str] Review Comment: I think we should unify with other 'list' response type and "Collections". You can compare with other collection serializers. ########## airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py: ########## @@ -180,3 +180,16 @@ def latest_dag_version(self) -> DagVersionResponse | None: if latest_dag_version is None: return latest_dag_version return DagVersionResponse.model_validate(latest_dag_version) + + +class ReserializeResponse(BaseModel): + """DAG Reserialize serializer for responses.""" + + message: str + processed_bundles: list[str] + + +class DagReserializePostBody(BaseModel): + """Dag Serializer for reserialzed bodies.""" Review Comment: typo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org