uranusjr commented on code in PR #43873: URL: https://github.com/apache/airflow/pull/43873#discussion_r1837804087
########## airflow/jobs/scheduler_job_runner.py: ########## @@ -2113,44 +2114,58 @@ def _activate_referenced_assets(assets: Collection[AssetModel], *, session: Sess active_name_to_uri: dict[str, str] = {name: uri for name, uri in active_assets} active_uri_to_name: dict[str, str] = {uri: name for name, uri in active_assets} - def _generate_dag_warnings(offending: AssetModel, attr: str, value: str) -> Iterator[DagWarning]: + def _generate_warning_message( + offending: AssetModel, attr: str, value: str + ) -> Iterator[tuple[str, str]]: for ref in itertools.chain(offending.consuming_dags, offending.producing_tasks): - yield DagWarning( - dag_id=ref.dag_id, - error_type=DagWarningType.ASSET_CONFLICT, - message=f"Cannot activate asset {offending}; {attr} is already associated to {value!r}", + yield ( + ref.dag_id, + f"Cannot activate asset {offending}; {attr} is already associated to {value!r}", ) - def _activate_assets_generate_warnings() -> Iterator[DagWarning]: + def _activate_assets_generate_warnings() -> Iterator[tuple[str, str]]: incoming_name_to_uri: dict[str, str] = {} incoming_uri_to_name: dict[str, str] = {} for asset in assets: if (asset.name, asset.uri) in active_assets: continue existing_uri = active_name_to_uri.get(asset.name) or incoming_name_to_uri.get(asset.name) if existing_uri is not None and existing_uri != asset.uri: - yield from _generate_dag_warnings(asset, "name", existing_uri) + yield from _generate_warning_message(asset, "name", existing_uri) continue existing_name = active_uri_to_name.get(asset.uri) or incoming_uri_to_name.get(asset.uri) if existing_name is not None and existing_name != asset.name: - yield from _generate_dag_warnings(asset, "uri", existing_name) + yield from _generate_warning_message(asset, "uri", existing_name) continue incoming_name_to_uri[asset.name] = asset.uri incoming_uri_to_name[asset.uri] = asset.name session.add(AssetActive.for_asset(asset)) - warnings_to_have = {w.dag_id: w for w in _activate_assets_generate_warnings()} + def _get_first_item(x: Sequence[Any]) -> Any: + return x[0] + + warnings_to_have = { Review Comment: Do you mean we should reduce the warnings from 3 to 1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org