ashb commented on code in PR #47008:
URL: https://github.com/apache/airflow/pull/47008#discussion_r1979071671


##########
airflow/serialization/serialized_objects.py:
##########
@@ -64,6 +63,7 @@
     BaseAsset,
 )
 from airflow.sdk.definitions.baseoperator import BaseOperator as 
TaskSDKBaseOperator
+from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
 from airflow.sdk.definitions.mappedoperator import MappedOperator
 from airflow.sdk.definitions.param import Param, ParamsDict
 from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup

Review Comment:
   We should decide (not in this PR) how we want to import these inside core -- 
do we import from individual files, or should it all be from `airflow.sdk` only.
   
   Things we expect users to need vs "internal" things needs some consideration.



##########
.pre-commit-config.yaml:
##########
@@ -694,10 +694,11 @@ repos:
       - id: check-base-operator-usage
         language: pygrep
         name: Check BaseOperatorLink core imports
-        description: Make sure BaseOperatorLink is imported from 
airflow.models.baseoperatorlink in core
-        entry: "from airflow\\.models import.* BaseOperatorLink"
+        description: Make sure BaseOperatorLink is imported from airflow.sdk 
in core
+        entry: ".*from airflow\\.sdk\\..* import.* BaseOperatorLink\\b"

Review Comment:
   Having the `\..*` part on the end somewhat defies the point of the check.
   
   This check existed to ensure that it was done as `from airflow.models import 
BaseOperatorLink` and not `from airflow.models.baseoperator import 
BaseOperatorLink` -- so I think this should be
   
   ```suggestion
           description: Make sure BaseOperatorLink is imported from airflow.sdk 
in core
           entry: "[ ]*from airflow\\.sdk import.* BaseOperatorLink\\b"
   ```



##########
.pre-commit-config.yaml:
##########
@@ -709,8 +710,11 @@ repos:
       - id: check-base-operator-usage
         language: pygrep
         name: Check BaseOperator[Link] other imports
-        description: Make sure BaseOperator[Link] is imported from 
airflow.models outside of core
-        entry: "from airflow\\.models\\.baseoperator(link)? import.* 
BaseOperator"
+        description: Make sure BaseOperator is imported from airflow.models 
and BaseOperatorLink is imported from airflow.models or airflow.sdk.* outside 
of core
+        entry: >
+          (from airflow\\.models\\.baseoperator import.* BaseOperator\\b|
+           from airflow\\.models import.* BaseOperatorLink\\b|
+           from airflow\\.sdk\\..* import.* BaseOperatorLink\\b)
         pass_filenames: true
         files: >
           (?x)

Review Comment:
   And since there is no now difference betweeen how core and providers import 
BaseOperatorLink, I think we can remove the Link part of this check.



##########
newsfragments/47008.significant.rst:
##########


Review Comment:
   Lets not add a new newsfragment for this, but instead add it to the existing 
AIP-72 mega fragment please.



##########
task_sdk/src/airflow/sdk/definitions/mappedoperator.py:
##########
@@ -63,13 +63,13 @@
     from airflow.models.abstractoperator import (
         TaskStateChangeCallback,
     )
-    from airflow.models.baseoperatorlink import BaseOperatorLink
     from airflow.models.expandinput import (
         ExpandInput,
         OperatorExpandArgument,
         OperatorExpandKwargsArgument,
     )
     from airflow.models.xcom_arg import XComArg
+    from airflow.sdk import BaseOperatorLink

Review Comment:
   From inside the SDK we probably want to import from the definitions directly 
for consistency.



##########
task_sdk/src/airflow/sdk/__init__.py:
##########
@@ -67,6 +67,7 @@
     "AssetAny": ".definitions.asset",
     "AssetWatcher": ".definitions.asset",
     "BaseOperator": ".definitions.baseoperator",
+    "BaseOperatorLink": ".definitions.baseoperatorlink",

Review Comment:
   I think we should also have a matching import in the TYPE_CHECKING block.



##########
newsfragments/47008.significant.rst:
##########
@@ -0,0 +1,22 @@
+``BaseOperatorLink`` has now been moved into the task SDK to be consumed by 
DAG authors to write custom operator links.
+
+Any occurrences of imports from ``airflow.models.baseoperatorlink`` will need 
to be updated to ``airflow.sdk.definitions.baseoperatorlink``
+
+* Types of change
+
+  * [x] Dag changes
+  * [ ] Config changes
+  * [ ] API changes
+  * [ ] CLI changes
+  * [ ] Behaviour changes
+  * [x] Plugin changes
+  * [ ] Dependency changes
+  * [ ] Code interface changes
+
+* Migration rules needed
+
+  * ruff
+
+    * AIR302
+
+      * [ ] ``airflow.models.baseoperatorlink`` → 
``airflow.sdk.definitions.baseoperatorlink``

Review Comment:
   ```suggestion
         * [ ] ``airflow.models.baseoperatorlink`` → ``airflow.sdk``
   ```



##########
providers/google/src/airflow/providers/google/cloud/links/datafusion.py:
##########
@@ -21,13 +21,19 @@
 
 from typing import TYPE_CHECKING, ClassVar
 
-from airflow.models import BaseOperatorLink, XCom
+from airflow.models import XCom
 
 if TYPE_CHECKING:
     from airflow.models import BaseOperator
     from airflow.models.taskinstancekey import TaskInstanceKey
     from airflow.utils.context import Context
 
+from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink
+else:
+    from airflow.models.baseoperatorlink import BaseOperatorLink  # type: 
ignore[no-redef]

Review Comment:
   Nit: put this before the `if TYPE_CHECKING` please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to