amoghrajesh commented on code in PR #46613:
URL: https://github.com/apache/airflow/pull/46613#discussion_r1950384710
##########
airflow/models/baseoperatorlink.py:
##########
@@ -20,14 +20,47 @@
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, ClassVar
-import attr
+import attrs
Review Comment:
The new library for attr
##########
airflow/models/baseoperatorlink.py:
##########
@@ -20,14 +20,47 @@
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, ClassVar
-import attr
+import attrs
+
+from airflow.models.xcom import BaseXCom
+from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
[email protected](auto_attribs=True)
[email protected]()
+class GenericOperatorLink(LoggingMixin):
+ """A generic operator link class that can retrieve link only using XCOMs.
Used while deserializing operators."""
+
+ name: str
+ xcom_key: str
+
+ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) ->
str:
+ """
+ Retrieve the link from the XComs.
+
+ :param operator: The Airflow operator object this link is associated
to.
+ :param ti_key: TaskInstance ID to return link for.
+ :return: link to external system, but by pulling it from XComs
+ """
+ self.log.info(
+ "Attempting to retrieve link from XComs with key: %s for task id:
%s", self.xcom_key, ti_key
+ )
+ value = BaseXCom.get_one(key=self.xcom_key, run_id=ti_key.run_id)
+ if not value:
+ self.log.debug(
+ "No link with name: %s present in XCom as key: %s, returning
empty link",
+ self.name,
+ self.xcom_key,
+ )
+ return ""
Review Comment:
Should we be this liberal here or just fail? cc: @ashb
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1166,6 +1166,25 @@ def __init__(self, *args, **kwargs):
self.template_fields = BaseOperator.template_fields
self.operator_extra_links = BaseOperator.operator_extra_links
+ def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None:
+ """
+ For an operator, gets the URLs that the ``extra_links`` entry points
to.
+
+ :meta private:
+
+ :raise ValueError: The error message of a ValueError will be passed on
through to
+ the fronted to show up as a tooltip on the disabled link.
+ :param ti: The TaskInstance for the URL being searched for.
+ :param link_name: The name of the link we're looking for the URL for.
Should be
+ one of the options specified in ``extra_links``.
+ """
+ link = self.operator_extra_link_dict.get(link_name)
+ if not link:
+ link = self.global_operator_extra_link_dict.get(link_name)
+ if not link:
+ return None
+ return link.get_link(self.unmap(None), ti_key=ti.key)
Review Comment:
Can this stay this way or does it have to stay like the previous definition:
```
link: BaseOperatorLink | None =
self.operator_extra_link_dict.get(link_name)
if not link:
link = self.global_operator_extra_link_dict.get(link_name)
if not link:
return None
parameters = inspect.signature(link.get_link).parameters
old_signature = all(name != "ti_key" for name, p in
parameters.items() if p.kind != p.VAR_KEYWORD)
if old_signature:
return link.get_link(self.unmap(None), ti.dag_run.logical_date)
# type: ignore[misc]
return link.get_link(self.unmap(None), ti_key=ti.key)
```
##########
newsfragments/46613.feature.rst:
##########
@@ -0,0 +1 @@
+Operator Links interface changed to not run user code in Airflow Webserver The
Operator Extra links, which can be defined either via plugins or custom
operators now do not execute any user code in the Airflow Webserver, but
instead push the "full" links to XCom backend and the value is again fetched
from the XCom backend when viewing task details in grid view.
Review Comment:
Yeah can do that. Right now, there's a precommit limitation that the
feature.rst can only be a single line long. Let me push a fix for that.
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1520,76 +1523,43 @@ def _deserialize_operator_extra_links(cls,
encoded_op_links: list) -> dict[str,
raise AirflowException("Can't load plugins")
op_predefined_extra_links = {}
- for _operator_links_source in encoded_op_links:
- # Get the key, value pair as Tuple where key is OperatorLink
ClassName
- # and value is the dictionary containing the arguments passed to
the OperatorLink
+ for item in encoded_op_links.items():
+ # Get the name and xcom_key of the encoded operator and use it to
create a GenericOperatorLink object
+ # during deserialization.
#
- # Example of a single iteration:
- #
- # _operator_links_source =
- # {
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink':
{
- # 'index': 0
- # }
- # },
- #
- # list(_operator_links_source.items()) =
- # [
- # (
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
- # {'index': 0}
- # )
- # ]
- #
- # list(_operator_links_source.items())[0] =
- # (
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
- # {
- # 'index': 0
- # }
- # )
-
- _operator_link_class_path, data =
next(iter(_operator_links_source.items()))
- if _operator_link_class_path in get_operator_extra_links():
- single_op_link_class = import_string(_operator_link_class_path)
- elif _operator_link_class_path in
plugins_manager.registered_operator_link_classes:
- single_op_link_class =
plugins_manager.registered_operator_link_classes[
- _operator_link_class_path
- ]
- else:
- log.error("Operator Link class %r not registered",
_operator_link_class_path)
- return {}
-
- op_link_parameters = {param: cls.deserialize(value) for param,
value in data.items()}
- op_predefined_extra_link: BaseOperatorLink =
single_op_link_class(**op_link_parameters)
-
+ # Example:
+ # enc_operator['_operator_extra_links'] =
+ # {
+ # 'airflow': 'airflow_link_key',
+ # 'foo-bar': 'link-key',
+ # 'no_response': 'key',
+ # 'raise_error': 'key'
+ # }
+
+ name, xcom_key = item
+ op_predefined_extra_link = GenericOperatorLink(name=name,
xcom_key=xcom_key)
op_predefined_extra_links.update({op_predefined_extra_link.name:
op_predefined_extra_link})
Review Comment:
I have followed the former here, please take a look
##########
airflow/models/abstractoperator.py:
##########
@@ -157,64 +153,6 @@ def priority_weight_total(self) -> int:
)
)
- @cached_property
- def operator_extra_link_dict(self) -> dict[str, Any]:
- """Returns dictionary of all extra links for the operator."""
- op_extra_links_from_plugin: dict[str, Any] = {}
- from airflow import plugins_manager
-
- plugins_manager.initialize_extra_operators_links_plugins()
- if plugins_manager.operator_extra_links is None:
- raise AirflowException("Can't load operators")
- for ope in plugins_manager.operator_extra_links:
- if ope.operators and self.operator_class in ope.operators:
- op_extra_links_from_plugin.update({ope.name: ope})
-
- operator_extra_links_all = {link.name: link for link in
self.operator_extra_links}
- # Extra links defined in Plugins overrides operator links defined in
operator
- operator_extra_links_all.update(op_extra_links_from_plugin)
-
- return operator_extra_links_all
-
- @cached_property
- def global_operator_extra_link_dict(self) -> dict[str, Any]:
- """Returns dictionary of all global extra links."""
- from airflow import plugins_manager
-
- plugins_manager.initialize_extra_operators_links_plugins()
- if plugins_manager.global_operator_extra_links is None:
- raise AirflowException("Can't load operators")
- return {link.name: link for link in
plugins_manager.global_operator_extra_links}
-
- @cached_property
- def extra_links(self) -> list[str]:
- return
sorted(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
-
- def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None:
- """
- For an operator, gets the URLs that the ``extra_links`` entry points
to.
-
- :meta private:
-
- :raise ValueError: The error message of a ValueError will be passed on
through to
- the fronted to show up as a tooltip on the disabled link.
- :param ti: The TaskInstance for the URL being searched for.
- :param link_name: The name of the link we're looking for the URL for.
Should be
- one of the options specified in ``extra_links``.
- """
- link: BaseOperatorLink | None =
self.operator_extra_link_dict.get(link_name)
- if not link:
- link = self.global_operator_extra_link_dict.get(link_name)
- if not link:
- return None
-
- parameters = inspect.signature(link.get_link).parameters
- old_signature = all(name != "ti_key" for name, p in parameters.items()
if p.kind != p.VAR_KEYWORD)
-
- if old_signature:
- return link.get_link(self.unmap(None), ti.dag_run.logical_date) #
type: ignore[misc]
- return link.get_link(self.unmap(None), ti_key=ti.key)
-
Review Comment:
Moved these out into relevant places:
1. `get_extra_links` goes to SerialisedBaseOperator as its only called by
the webserver and webserver shouldnt have to interact with abstractoperator
2. Rest of the stuff goes into abstractoperator of SDK, so it can be used
within the SDK too and it only makes sense cos this class inherits that one
##########
newsfragments/46613.feature.rst:
##########
@@ -0,0 +1 @@
+Operator Links interface changed to not run user code in Airflow Webserver The
Operator Extra links, which can be defined either via plugins or custom
operators now do not execute any user code in the Airflow Webserver, but
instead push the "full" links to XCom backend and the value is again fetched
from the XCom backend when viewing task details in grid view.
Review Comment:
Yeah can do that. Right now, there's a precommit limitation that the
feature.rst can only be a single line long. Let me push a fix for that.
##########
task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py:
##########
@@ -136,6 +138,39 @@ class AbstractOperator(Templater, DAGNode):
)
)
+ @cached_property
+ def operator_extra_link_dict(self) -> dict[str, Any]:
+ """Returns dictionary of all extra links for the operator."""
+ op_extra_links_from_plugin: dict[str, Any] = {}
+ from airflow import plugins_manager
+
+ plugins_manager.initialize_extra_operators_links_plugins()
+ if plugins_manager.operator_extra_links is None:
+ raise AirflowException("Can't load operators")
+ for ope in plugins_manager.operator_extra_links:
+ if ope.operators and self.operator_class in ope.operators:
+ op_extra_links_from_plugin.update({ope.name: ope})
+
+ operator_extra_links_all = {link.name: link for link in
self.operator_extra_links}
+ # Extra links defined in Plugins overrides operator links defined in
operator
+ operator_extra_links_all.update(op_extra_links_from_plugin)
+
+ return operator_extra_links_all
+
+ @cached_property
+ def global_operator_extra_link_dict(self) -> dict[str, Any]:
+ """Returns dictionary of all global extra links."""
+ from airflow import plugins_manager
+
+ plugins_manager.initialize_extra_operators_links_plugins()
+ if plugins_manager.global_operator_extra_links is None:
+ raise AirflowException("Can't load operators")
+ return {link.name: link for link in
plugins_manager.global_operator_extra_links}
+
+ @cached_property
+ def extra_links(self) -> list[str]:
+ return
sorted(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
+
Review Comment:
Moved from models/abstractoperator to here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]