bugraoz93 commented on code in PR #47943:
URL: https://github.com/apache/airflow/pull/47943#discussion_r2002655654


##########
airflow/api_fastapi/core_api/datamodels/dags.py:
##########
@@ -113,6 +114,32 @@ class DAGPatchBody(StrictBaseModel):
     is_paused: bool
 
 
+class DagReserializePostBody(BaseModel):
+    """Dag Serializer for reserialzed bodies."""
+
+    bundle_names: list[str] | None
+
+    @field_validator("bundle_names", mode="before")
+    def validate_bundle_names(cls, value):
+        """Validate bundle names format and check for duplicates."""
+        manager = DagBundlesManager()
+        all_bundles = list(manager.get_all_dag_bundles())
+        all_bundle_names = {b.name for b in all_bundles}
+
+        if value is not None:

Review Comment:
   ```suggestion
           if value:
   ```
   



##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -339,3 +342,68 @@ def delete_dag(
             status.HTTP_409_CONFLICT, f"Task instances of dag with id: 
'{dag_id}' are still running"
         )
     return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.post(
+    "/manage/reserialize",

Review Comment:
   ```suggestion
       "/reserialize",
   ```
   I think we don't need additional `manage`. It would be simpler 



##########
airflow/api_fastapi/core_api/datamodels/dags.py:
##########
@@ -113,6 +114,32 @@ class DAGPatchBody(StrictBaseModel):
     is_paused: bool
 
 
+class DagReserializePostBody(BaseModel):
+    """Dag Serializer for reserialzed bodies."""
+
+    bundle_names: list[str] | None
+
+    @field_validator("bundle_names", mode="before")
+    def validate_bundle_names(cls, value):
+        """Validate bundle names format and check for duplicates."""
+        manager = DagBundlesManager()
+        all_bundles = list(manager.get_all_dag_bundles())

Review Comment:
   I am not sure if we need to init and load all the DAG bundles for each 
request. Maybe we can have a method in it to get_all_dag_bundles_by_name()



##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -339,3 +342,68 @@ def delete_dag(
             status.HTTP_409_CONFLICT, f"Task instances of dag with id: 
'{dag_id}' are still running"
         )
     return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_422_UNPROCESSABLE_ENTITY,
+        ]
+    ),
+    dependencies=[Depends(action_logging())],
+)
+async def reserialize_dags(
+    request: DagReserializePostBody,
+    session: SessionDep,  # Add your session dependency
+):
+    """
+    Reserialize DAG bundles in Airflow.
+
+    - **bundle_names**: List of specific bundles to reserialize (all if empty)
+    """
+    try:
+        manager = DagBundlesManager()
+        manager.sync_bundles_to_db(session=session)
+        session.commit()
+
+        # Get all available bundles
+        all_bundles = list(manager.get_all_dag_bundles())
+        all_bundle_names = {b.name for b in all_bundles}
+
+        # Validate bundle names if specified
+        if request.bundle_names:
+            bundles_to_process = set(request.bundle_names)
+        else:
+            bundles_to_process = all_bundle_names
+
+        # Process each bundle
+        processed = []
+        for bundle in all_bundles:
+            if bundle.name not in bundles_to_process:
+                continue
+
+            try:
+                bundle.initialize()
+                dag_bag = DagBag(bundle.path, bundle_path=bundle.path, 
include_examples=False)
+                dag_bag.sync_to_db(bundle.name, 
bundle_version=bundle.get_current_version(), session=session)
+                processed.append(bundle.name)
+            except Exception as e:
+                raise HTTPException(
+                    status_code=status.HTTP_400_BAD_REQUEST,
+                    detail=f"Error processing bundle {bundle.name}: {str(e)}",
+                )
+
+        session.commit()
+        return ReserializeResponse(
+            message="DAG bundles reserialized successfully", 
processed_bundles=processed
+        )
+
+    except Exception as e:
+        session.rollback()

Review Comment:
   This could maybe go to finally?



##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -339,3 +342,68 @@ def delete_dag(
             status.HTTP_409_CONFLICT, f"Task instances of dag with id: 
'{dag_id}' are still running"
         )
     return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.post(
+    "/manage/reserialize",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_422_UNPROCESSABLE_ENTITY,
+        ]
+    ),
+    dependencies=[Depends(action_logging())],
+)
+async def reserialize_dags(
+    request: DagReserializePostBody,
+    session: SessionDep,  # Add your session dependency
+):
+    """
+    Reserialize DAG bundles in Airflow.
+
+    - **bundle_names**: List of specific bundles to reserialize (all if empty)
+    """
+    try:
+        manager = DagBundlesManager()
+        manager.sync_bundles_to_db(session=session)
+        session.commit()
+
+        # Get all available bundles
+        all_bundles = list(manager.get_all_dag_bundles())
+        all_bundle_names = {b.name for b in all_bundles}
+
+        # Validate bundle names if specified
+        if request.bundle_names:
+            bundles_to_process = set(request.bundle_names)
+        else:
+            bundles_to_process = all_bundle_names
+
+        # Process each bundle
+        processed = []
+        for bundle in all_bundles:
+            if bundle.name not in bundles_to_process:
+                continue
+
+            try:
+                bundle.initialize()
+                dag_bag = DagBag(bundle.path, bundle_path=bundle.path, 
include_examples=False)
+                dag_bag.sync_to_db(bundle.name, 
bundle_version=bundle.get_current_version(), session=session)
+                processed.append(bundle.name)
+            except Exception as e:
+                raise HTTPException(
+                    status_code=status.HTTP_400_BAD_REQUEST,
+                    detail=f"Error processing bundle {bundle.name}: {str(e)}",
+                )
+
+        session.commit()
+        return ReserializeResponse(
+            message="DAG bundles reserialized successfully", 
processed_bundles=processed
+        )
+
+    except Exception as e:
+        session.rollback()
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,

Review Comment:
   I'm not sure about this broad status code. It should also be documented in 
openapi spec. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to