ferruzzi commented on code in PR #68106:
URL: https://github.com/apache/airflow/pull/68106#discussion_r3383907538


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -495,37 +478,38 @@ def start(  # type: ignore[override]
         **kwargs,
     ):
         proc_id = job.id if job is not None else uuid4()
-        proc = super().start(id=proc_id, job=job, target=cls.run_in_process, 
logger=logger, **kwargs)
 
-        msg = messages.StartTriggerer()
-        proc.send_msg(msg, request_id=0)
+        from airflow.sdk.api.client import Client
+        api = in_process_api_server()
+        client = Client(base_url=None, token="", dry_run=True, 
transport=api.transport)
+        client.base_url = "http://in-process.invalid./";
+
+        proc = super().start(id=proc_id, job=job, 
client=client,target=cls.run_in_process, logger=logger, **kwargs)
+        proc.send_msg(messages.StartTriggerer(), request_id=0)
         return proc
 
-    @functools.cached_property
-    def client(self) -> Client:
-        return self.make_client()
+    # @functools.cached_property
+    # def client(self) -> Client:
+    #     return self.make_client()
 
-    def make_client(self) -> Client:
-        """
-        Build the API client used to talk to the API server.
+    # def make_client(self) -> Client:
+    #     """
+    #     Build the API client used to talk to the API server.
 
-        Subclasses may override this to substitute a different transport — 
e.g. a
-        real HTTP client pointing at a remote API server — instead of the 
default
-        in-process one. The returned client must have ``base_url`` set; 
downstream
-        request handling (``self.client.variables``, ``.xcoms``, etc.) reads it
-        when issuing requests.
-        """
-        from airflow.sdk.api.client import Client
+    #     Subclasses may override this to substitute a different transport — 
e.g. a
+    #     real HTTP client pointing at a remote API server — instead of the 
default
+    #     in-process one. The returned client must have ``base_url`` set; 
downstream
+    #     request handling (``self.client.variables``, ``.xcoms``, etc.) reads 
it
+    #     when issuing requests.
+    #     """
+    #     from airflow.sdk.api.client import Client
 
-        client = Client(base_url=None, token="", dry_run=True, 
transport=in_process_api_server().transport)
-        # Mypy is wrong -- the setter accepts a string on the property setter! 
`URLType = URL | str`
-        client.base_url = "http://in-process.invalid./";
-        return client
+    #     client = Client(base_url=None, token="", dry_run=True, 
transport=in_process_api_server().transport)
+    #     # Mypy is wrong -- the setter accepts a string on the property 
setter! `URLType = URL | str`
+    #     client.base_url = "http://in-process.invalid./";
+    #     return client

Review Comment:
   Presumably these comments should have been removed after you tested the 
code, yah?



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -542,66 +526,33 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: 
FilteringBoundLogger, r
                     except Exception:
                         log.exception("Failed to upload trigger logs to 
remote", trigger_id=id)
                     finally:
-                        # Close the FD explicitly even if upload raised, 
otherwise the file
-                        # handle leaks for every failed upload.
                         factory.close()
-
-            response = messages.TriggerStateSync(
-                to_create=[],
-                to_cancel=self.cancelling_triggers,
-            )
-
-            # Pull out of these dequeues in a thread-safe manner
+            sync = messages.TriggerStateSync(to_create=[], to_cancel=set())
+            sync.to_cancel = self.cancelling_triggers.copy()

Review Comment:
   Good call using copy() here.   It's a slight change in behavior, but for the 
better I think.



##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -232,6 +233,7 @@ def builder(job=None):
             stdin=mock_stdin,
             process=process,
             capacity=10,
+            client = mocker.Mock(spec=Client),

Review Comment:
   Pretty sure the linter would correct this... have you run the static checks?
   ```suggestion
               client=mocker.Mock(spec=Client),
   ```



##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -2086,9 +2086,10 @@ def 
test_handle_request_get_connection_masks_password_and_extra(self, proc):
             password="super-secret-password",
             extra='{"api_key":"super-secret-extra"}',
         )
+        mock_masker = MagicMock()
 
         with (
-            patch("airflow.dag_processing.processor.mask_secret") as 
mock_mask_secret,
+            patch("airflow.sdk._shared.secrets_masker._secrets_masker", 
return_value=mock_masker),

Review Comment:
   Here and below; This should stay how it was, I think.  We aren't testing how 
the secret is asked, only that it was.  You should be able to just mock 
`airflow.sdk.execution_time.request_handlers.mask_secret` and leave the rest of 
the test unchanged.



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -542,66 +526,33 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: 
FilteringBoundLogger, r
                     except Exception:
                         log.exception("Failed to upload trigger logs to 
remote", trigger_id=id)
                     finally:
-                        # Close the FD explicitly even if upload raised, 
otherwise the file
-                        # handle leaks for every failed upload.

Review Comment:
   Maybe leave this comment in, it appears to still be useful unless I'm 
missing something?



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1678,221 +1606,63 @@ def final_state(self):
         return TaskInstanceState.FAILED
 
     def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, 
req_id: int):
-        if isinstance(msg, MaskSecret):
-            log.debug("Received message from task runner (body omitted)", 
msg=type(msg))
-        else:
-            log.debug("Received message from task runner", msg=msg)

Review Comment:
   I think  these log messages can/should stay unless you see a reason to drop 
them?



##########
task-sdk/src/airflow/sdk/execution_time/request_handlers.py:
##########
@@ -272,3 +346,242 @@ def handle_get_xcom(client: Client, msg: GetXCom) -> 
tuple[BaseModel | None, dic
         xcom_result = XComResult.from_xcom_response(xcom)
         return xcom_result, {"exclude_unset": True}
     return xcom, {}
+
+
+@handles(GetAssetByName)
+def handle_get_asset_by_name(client: Client, msg: GetAssetByName) -> 
tuple[BaseModel | None, dict[str, bool]]:
+    asset_resp = client.assets.get(name=msg.name)
+    if isinstance(asset_resp, AssetResponse):
+        asset_result = AssetResult.from_asset_response(asset_resp)
+        return asset_result, {"exclude_unset": True}
+    return asset_resp, {}
+
+
+@handles(GetAssetByUri)
+def handle_get_asset_by_uri(client: Client, msg: GetAssetByUri) -> 
tuple[BaseModel | None, dict[str, bool]]:
+    asset_resp = client.assets.get(uri=msg.uri)
+    if isinstance(asset_resp, AssetResponse):
+        asset_result = AssetResult.from_asset_response(asset_resp)
+        return asset_result, {"exclude_unset": True}
+    return asset_resp, {}
+
+
+@handles(GetAssetsByAlias)
+def handle_get_assets_by_alias(
+    client: Client, msg: GetAssetsByAlias
+) -> tuple[BaseModel | None, dict[str, bool]]:
+    asset_resp = client.assets.get_by_alias(alias_name=msg.alias_name)
+    if isinstance(asset_resp, AssetResponse):

Review Comment:
   Verify this.  it looks to me like `get_by_alias` is returning a 
`AssetsByAliasResult` not a  `AssetResponse`.  I think you can just drop this 
isinstance entirely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to