uranusjr commented on code in PR #43873:
URL: https://github.com/apache/airflow/pull/43873#discussion_r1837804087


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -2113,44 +2114,58 @@ def _activate_referenced_assets(assets: 
Collection[AssetModel], *, session: Sess
         active_name_to_uri: dict[str, str] = {name: uri for name, uri in 
active_assets}
         active_uri_to_name: dict[str, str] = {uri: name for name, uri in 
active_assets}
 
-        def _generate_dag_warnings(offending: AssetModel, attr: str, value: 
str) -> Iterator[DagWarning]:
+        def _generate_warning_message(
+            offending: AssetModel, attr: str, value: str
+        ) -> Iterator[tuple[str, str]]:
             for ref in itertools.chain(offending.consuming_dags, 
offending.producing_tasks):
-                yield DagWarning(
-                    dag_id=ref.dag_id,
-                    error_type=DagWarningType.ASSET_CONFLICT,
-                    message=f"Cannot activate asset {offending}; {attr} is 
already associated to {value!r}",
+                yield (
+                    ref.dag_id,
+                    f"Cannot activate asset {offending}; {attr} is already 
associated to {value!r}",
                 )
 
-        def _activate_assets_generate_warnings() -> Iterator[DagWarning]:
+        def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]:
             incoming_name_to_uri: dict[str, str] = {}
             incoming_uri_to_name: dict[str, str] = {}
             for asset in assets:
                 if (asset.name, asset.uri) in active_assets:
                     continue
                 existing_uri = active_name_to_uri.get(asset.name) or 
incoming_name_to_uri.get(asset.name)
                 if existing_uri is not None and existing_uri != asset.uri:
-                    yield from _generate_dag_warnings(asset, "name", 
existing_uri)
+                    yield from _generate_warning_message(asset, "name", 
existing_uri)
                     continue
                 existing_name = active_uri_to_name.get(asset.uri) or 
incoming_uri_to_name.get(asset.uri)
                 if existing_name is not None and existing_name != asset.name:
-                    yield from _generate_dag_warnings(asset, "uri", 
existing_name)
+                    yield from _generate_warning_message(asset, "uri", 
existing_name)
                     continue
                 incoming_name_to_uri[asset.name] = asset.uri
                 incoming_uri_to_name[asset.uri] = asset.name
                 session.add(AssetActive.for_asset(asset))
 
-        warnings_to_have = {w.dag_id: w for w in 
_activate_assets_generate_warnings()}
+        def _get_first_item(x: Sequence[Any]) -> Any:
+            return x[0]
+
+        warnings_to_have = {

Review Comment:
   Do you mean we should reduce the warnings from 3 to 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to