amoghrajesh commented on code in PR #50773:
URL: https://github.com/apache/airflow/pull/50773#discussion_r2122747650


##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -897,7 +897,8 @@ def test_run_with_asset_outlets(
     instant = timezone.datetime(2024, 12, 3, 10, 0)
     time_machine.move_to(instant, tick=False)
 
-    run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+    with 
mock.patch("airflow.sdk.execution_time.task_runner._validate_task_inlets_and_outlets"):
+        run(ti, context=ti.get_template_context(), log=mock.MagicMock())

Review Comment:
   Can we please also assert that the mock was called?



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -791,6 +796,22 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger, 
context: Context) -> ToSuperv
     return None
 
 
+def _validate_task_inlets_and_outlets(*, ti: RuntimeTaskInstance, log: Logger) 
-> None:
+    if not ti.task.inlets and not ti.task.outlets:
+        return
+
+    SUPERVISOR_COMMS.send_request(msg=ValidateInletsAndOutlets(ti_id=ti.id), 
log=log)
+    inactive_assets_resp = SUPERVISOR_COMMS.get_message()
+    if TYPE_CHECKING:
+        assert isinstance(inactive_assets_resp, InactiveAssetsResult)
+    if inactive_assets := inactive_assets_resp.inactive_assets:
+        raise AirflowInactiveAssetInInletOrOutletException(
+            inactive_asset_keys=[
+                AssetUniqueKey.from_profile(asset_profile) for asset_profile 
in inactive_assets
+            ]
+        )

Review Comment:
   Interesting why or how this was removed. But as I understand it, this part 
checks "before" running if its possible to run with the current asset settings 
right?



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1215,6 +1217,10 @@ def _handle_request(self, msg: ToSupervisor, log: 
FilteringBoundLogger):
             )
         elif isinstance(msg, DeleteVariable):
             resp = self.client.variables.delete(msg.key)
+        elif isinstance(msg, ValidateInletsAndOutlets):

Review Comment:
   Could you add a test under: `test_handle_requests` in test_supervisor.py?



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -400,12 +407,16 @@ def ti_update_state(
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
         updated_state = ti_patch_payload.state
         task_instance = session.get(TI, ti_id_str)
-        TI.register_asset_changes_in_db(
-            task_instance,
-            ti_patch_payload.task_outlets,  # type: ignore
-            ti_patch_payload.outlet_events,
-            session,
-        )
+        try:
+            TI.register_asset_changes_in_db(
+                task_instance,
+                ti_patch_payload.task_outlets,  # type: ignore
+                ti_patch_payload.outlet_events,
+                session,
+            )
+        except AirflowInactiveAssetInInletOrOutletException:
+            log.exception()

Review Comment:
   ```suggestion
   except AirflowInactiveAssetInInletOrOutletException as e:
       log.error("Asset registration failed due to conflicting asset: %s", e)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to