ashb commented on code in PR #43076:
URL: https://github.com/apache/airflow/pull/43076#discussion_r1814869498


##########
task_sdk/src/airflow/sdk/definitions/baseoperator.py:
##########
@@ -0,0 +1,1141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import abc
+import collections.abc
+import contextlib
+import copy
+import inspect
+import warnings
+from collections.abc import Iterable, Sequence
+from dataclasses import dataclass
+from datetime import datetime, timedelta
+from functools import total_ordering, wraps
+from types import FunctionType
+from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, cast
+
+import attrs
+
+from airflow.exceptions import FailStopDagInvalidTriggerRule
+from airflow.sdk.definitions.abstractoperator import (
+    DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
+    DEFAULT_OWNER,
+    DEFAULT_POOL_SLOTS,
+    DEFAULT_PRIORITY_WEIGHT,
+    DEFAULT_QUEUE,
+    DEFAULT_RETRIES,
+    DEFAULT_RETRY_DELAY,
+    DEFAULT_TASK_EXECUTION_TIMEOUT,
+    DEFAULT_TRIGGER_RULE,
+    DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
+    DEFAULT_WEIGHT_RULE,
+    AbstractOperator,
+)
+from airflow.sdk.definitions.decorators import fixup_decorator_warning_stack
+from airflow.sdk.definitions.node import validate_key
+from airflow.sdk.types import NOTSET, validate_instance_args
+from airflow.task.priority_strategy import PriorityWeightStrategy, 
validate_and_load_priority_weight_strategy
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.types import AttributeRemoved
+
+T = TypeVar("T", bound=FunctionType)
+
+if TYPE_CHECKING:
+    # from ..execution_time.context import Context
+    class Context: ...
+
+    class ParamsDict: ...
+
+    from airflow.sdk.definitions.dag import DAG
+    from airflow.utils.operator_resources import Resources
+
+from airflow.sdk.definitions.taskgroup import TaskGroup
+
+# TODO: Task-SDK
+AirflowException = RuntimeError
+ParamsDict = dict
+
+
+def _get_parent_defaults(dag: DAG | None, task_group: TaskGroup | None) -> 
tuple[dict, ParamsDict]:
+    if not dag:
+        return {}, ParamsDict()
+    dag_args = copy.copy(dag.default_args)
+    dag_params = copy.deepcopy(dag.params)
+    if task_group:
+        if task_group.default_args and not isinstance(task_group.default_args, 
collections.abc.Mapping):
+            raise TypeError("default_args must be a mapping")
+        dag_args.update(task_group.default_args)
+    return dag_args, dag_params
+
+
+def get_merged_defaults(
+    dag: DAG | None,
+    task_group: TaskGroup | None,
+    task_params: collections.abc.MutableMapping | None,
+    task_default_args: dict | None,
+) -> tuple[dict, ParamsDict]:
+    args, params = _get_parent_defaults(dag, task_group)
+    if task_params:
+        if not isinstance(task_params, collections.abc.Mapping):
+            raise TypeError(f"params must be a mapping, got 
{type(task_params)}")
+        params.update(task_params)
+    if task_default_args:
+        if not isinstance(task_default_args, collections.abc.Mapping):
+            raise TypeError(f"default_args must be a mapping, got 
{type(task_params)}")
+        args.update(task_default_args)
+        with contextlib.suppress(KeyError):
+            params.update(task_default_args["params"] or {})
+    return args, params
+
+
+class BaseOperatorMeta(abc.ABCMeta):
+    """Metaclass of BaseOperator."""
+
+    @classmethod
+    def _apply_defaults(cls, func: T) -> T:
+        """
+        Look for an argument named "default_args", and fill the unspecified 
arguments from it.
+
+        Since python2.* isn't clear about which arguments are missing when
+        calling a function, and that this can be quite confusing with 
multi-level
+        inheritance and argument defaults, this decorator also alerts with
+        specific information about the missing arguments.
+        """
+        # Cache inspect.signature for the wrapper closure to avoid calling it
+        # at every decorated invocation. This is separate sig_cache created
+        # per decoration, i.e. each function decorated using apply_defaults 
will
+        # have a different sig_cache.
+        sig_cache = inspect.signature(func)
+        non_variadic_params = {
+            name: param
+            for (name, param) in sig_cache.parameters.items()
+            if param.name != "self" and param.kind not in 
(param.VAR_POSITIONAL, param.VAR_KEYWORD)
+        }
+        non_optional_args = {
+            name
+            for name, param in non_variadic_params.items()
+            if param.default == param.empty and name != "task_id"
+        }
+
+        fixup_decorator_warning_stack(func)
+
+        @wraps(func)
+        def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> 
Any:
+            from airflow.sdk.definitions.contextmanager import DagContext, 
TaskGroupContext
+
+            if args:
+                raise TypeError("Use keyword arguments when initializing 
operators")
+
+            instantiated_from_mapped = kwargs.pop(
+                "_airflow_from_mapped",
+                getattr(self, "_BaseOperator__from_mapped", False),
+            )
+
+            dag: DAG | None = kwargs.get("dag") or DagContext.get_current()
+            task_group: TaskGroup | None = kwargs.get("task_group")
+            if dag and not task_group:
+                task_group = TaskGroupContext.get_current(dag)
+
+            default_args, merged_params = get_merged_defaults(
+                dag=dag,
+                task_group=task_group,
+                task_params=kwargs.pop("params", None),
+                task_default_args=kwargs.pop("default_args", None),
+            )
+
+            for arg in sig_cache.parameters:
+                if arg not in kwargs and arg in default_args:
+                    kwargs[arg] = default_args[arg]
+
+            missing_args = non_optional_args.difference(kwargs)
+            if len(missing_args) == 1:
+                raise TypeError(f"missing keyword argument 
{missing_args.pop()!r}")
+            elif missing_args:
+                display = ", ".join(repr(a) for a in sorted(missing_args))
+                raise TypeError(f"missing keyword arguments {display}")
+
+            if merged_params:
+                kwargs["params"] = merged_params
+
+            hook = getattr(self, "_hook_apply_defaults", None)
+            if hook:
+                args, kwargs = hook(**kwargs, default_args=default_args)
+                default_args = kwargs.pop("default_args", {})
+
+            if not hasattr(self, "_BaseOperator__init_kwargs"):
+                object.__setattr__(self, "_BaseOperator__init_kwargs", {})
+            object.__setattr__(self, "_BaseOperator__from_mapped", 
instantiated_from_mapped)
+
+            result = func(self, **kwargs, default_args=default_args)
+
+            # Store the args passed to init -- we need them to support 
task.map serialization!
+            self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
+
+            # Set upstream task defined by XComArgs passed to template fields 
of the operator.
+            # BUT: only do this _ONCE_, not once for each class in the 
hierarchy
+            if not instantiated_from_mapped and func == 
self.__init__.__wrapped__:  # type: ignore[misc]
+                self._set_xcomargs_dependencies()
+                # Mark instance as instantiated so that futre attr setting 
updates xcomarg-based deps.
+                object.__setattr__(self, "_BaseOperator__instantiated", True)
+
+            return result
+
+        apply_defaults.__non_optional_args = non_optional_args  # type: ignore
+        apply_defaults.__param_names = set(non_variadic_params)  # type: ignore
+
+        return cast(T, apply_defaults)
+
+    def __new__(cls, name, bases, namespace, **kwargs):
+        # TODO: Task-SDK
+        # execute_method = namespace.get("execute")
+        # if callable(execute_method) and not getattr(execute_method, 
"__isabstractmethod__", False):
+        #     namespace["execute"] = 
ExecutorSafeguard().decorator(execute_method)
+        new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
+        with contextlib.suppress(KeyError):
+            # Update the partial descriptor with the class method, so it calls 
the actual function
+            # (but let subclasses override it if they need to)
+            # TODO: Task-SDK
+            # partial_desc = vars(new_cls)["partial"]
+            # if isinstance(partial_desc, _PartialDescriptor):
+            #     partial_desc.class_method = classmethod(partial)
+            ...
+
+        # We patch `__init__` only if the class defines it.
+        if inspect.getmro(new_cls)[1].__init__ is not new_cls.__init__:
+            new_cls.__init__ = cls._apply_defaults(new_cls.__init__)
+
+        return new_cls
+
+
+# TODO: The following mapping is used to validate that the arguments passed to 
the BaseOperator are of the
+#  correct type. This is a temporary solution until we find a more 
sophisticated method for argument
+#  validation. One potential method is to use `get_type_hints` from the typing 
module. However, this is not
+#  fully compatible with future annotations for Python versions below 3.10. 
Once we require a minimum Python
+#  version that supports `get_type_hints` effectively or find a better 
approach, we can replace this
+#  manual type-checking method.
+BASEOPERATOR_ARGS_EXPECTED_TYPES = {
+    "task_id": str,
+    "email": (str, Sequence),
+    "email_on_retry": bool,
+    "email_on_failure": bool,
+    "retries": int,
+    "retry_exponential_backoff": bool,
+    "depends_on_past": bool,
+    "ignore_first_depends_on_past": bool,
+    "wait_for_past_depends_before_skipping": bool,
+    "wait_for_downstream": bool,
+    "priority_weight": int,
+    "queue": str,
+    "pool": str,
+    "pool_slots": int,
+    "trigger_rule": str,
+    "run_as_user": str,
+    "task_concurrency": int,
+    "map_index_template": str,
+    "max_active_tis_per_dag": int,
+    "max_active_tis_per_dagrun": int,
+    "executor": str,
+    "do_xcom_push": bool,
+    "multiple_outputs": bool,
+    "doc": str,
+    "doc_md": str,
+    "doc_json": str,
+    "doc_yaml": str,
+    "doc_rst": str,
+    "task_display_name": str,
+    "logger_name": str,
+    "allow_nested_operators": bool,
+    "start_date": datetime,
+    "end_date": datetime,
+}
+
+
+# Note: BaseOperator is defined as a dataclass, and not an attrs class as we 
do too much metaprogramming in
+# here (metaclass, custom `__setattr__` behaviour) and this fights with attrs 
too much to make it worth it.
+#
+# To future reader: if you want to try and make this a "normal" attrs class, 
go ahead and attempt it. If you
+# get no where leave your record here for the next poor soul and what problems 
you ran in to.
+#
+# @ashb, 2024/10/14
+# - "Can't combine custom __setattr__ with on_setattr hooks"
+# - Setting class-wide `define(on_setarrs=...)` isn't called for non-attrs 
subclasses
+@total_ordering
+@dataclass(repr=False, kw_only=True)
+class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
+    r"""
+    Abstract base class for all operators.
+
+    Since operators create objects that become nodes in the DAG, BaseOperator
+    contains many recursive methods for DAG crawling behavior. To derive from
+    this class, you are expected to override the constructor and the 'execute'
+    method.
+
+    Operators derived from this class should perform or trigger certain tasks
+    synchronously (wait for completion). Example of operators could be an
+    operator that runs a Pig job (PigOperator), a sensor operator that
+    waits for a partition to land in Hive (HiveSensorOperator), or one that
+    moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these
+    operators (tasks) target specific operations, running specific scripts,
+    functions or data transfers.
+
+    This class is abstract and shouldn't be instantiated. Instantiating a
+    class derived from this one results in the creation of a task object,
+    which ultimately becomes a node in DAG objects. Task dependencies should
+    be set by using the set_upstream and/or set_downstream methods.
+
+    :param task_id: a unique, meaningful id for the task
+    :param owner: the owner of the task. Using a meaningful description
+        (e.g. user/person/team/role name) to clarify ownership is recommended.
+    :param email: the 'to' email address(es) used in email alerts. This can be 
a
+        single email or multiple ones. Multiple addresses can be specified as a
+        comma or semicolon separated string or by passing a list of strings.
+    :param email_on_retry: Indicates whether email alerts should be sent when a
+        task is retried
+    :param email_on_failure: Indicates whether email alerts should be sent when
+        a task failed
+    :param retries: the number of retries that should be performed before
+        failing the task
+    :param retry_delay: delay between retries, can be set as ``timedelta`` or
+        ``float`` seconds, which will be converted into ``timedelta``,
+        the default is ``timedelta(seconds=300)``.
+    :param retry_exponential_backoff: allow progressively longer waits between
+        retries by using exponential backoff algorithm on retry delay (delay
+        will be converted into seconds)
+    :param max_retry_delay: maximum delay interval between retries, can be set 
as
+        ``timedelta`` or ``float`` seconds, which will be converted into 
``timedelta``.
+    :param start_date: The ``start_date`` for the task, determines
+        the ``execution_date`` for the first task instance. The best practice
+        is to have the start_date rounded
+        to your DAG's ``schedule_interval``. Daily jobs have their start_date
+        some day at 00:00:00, hourly jobs have their start_date at 00:00
+        of a specific hour. Note that Airflow simply looks at the latest
+        ``execution_date`` and adds the ``schedule_interval`` to determine
+        the next ``execution_date``. It is also very important
+        to note that different tasks' dependencies
+        need to line up in time. If task A depends on task B and their
+        start_date are offset in a way that their execution_date don't line
+        up, A's dependencies will never be met. If you are looking to delay
+        a task, for example running a daily task at 2AM, look into the
+        ``TimeSensor`` and ``TimeDeltaSensor``. We advise against using
+        dynamic ``start_date`` and recommend using fixed ones. Read the
+        FAQ entry about start_date for more information.
+    :param end_date: if specified, the scheduler won't go beyond this date
+    :param depends_on_past: when set to true, task instances will run
+        sequentially and only if the previous instance has succeeded or has 
been skipped.
+        The task instance for the start_date is allowed to run.
+    :param wait_for_past_depends_before_skipping: when set to true, if the 
task instance
+        should be marked as skipped, and depends_on_past is true, the ti will 
stay on None state
+        waiting the task of the previous run
+    :param wait_for_downstream: when set to true, an instance of task
+        X will wait for tasks immediately downstream of the previous instance
+        of task X to finish successfully or be skipped before it runs. This is 
useful if the
+        different instances of a task X alter the same asset, and this asset
+        is used by tasks downstream of task X. Note that depends_on_past
+        is forced to True wherever wait_for_downstream is used. Also note that
+        only tasks *immediately* downstream of the previous task instance are 
waited
+        for; the statuses of any tasks further downstream are ignored.
+    :param dag: a reference to the dag the task is attached to (if any)
+    :param priority_weight: priority weight of this task against other task.
+        This allows the executor to trigger higher priority tasks before
+        others when things get backed up. Set priority_weight as a higher
+        number for more important tasks.
+    :param weight_rule: weighting method used for the effective total
+        priority weight of the task. Options are:
+        ``{ downstream | upstream | absolute }`` default is ``downstream``
+        When set to ``downstream`` the effective weight of the task is the
+        aggregate sum of all downstream descendants. As a result, upstream
+        tasks will have higher weight and will be scheduled more aggressively
+        when using positive weight values. This is useful when you have
+        multiple dag run instances and desire to have all upstream tasks to
+        complete for all runs before each dag can continue processing
+        downstream tasks. When set to ``upstream`` the effective weight is the
+        aggregate sum of all upstream ancestors. This is the opposite where
+        downstream tasks have higher weight and will be scheduled more
+        aggressively when using positive weight values. This is useful when you
+        have multiple dag run instances and prefer to have each dag complete
+        before starting upstream tasks of other dags.  When set to
+        ``absolute``, the effective weight is the exact ``priority_weight``
+        specified without additional weighting. You may want to do this when
+        you know exactly what priority weight each task should have.
+        Additionally, when set to ``absolute``, there is bonus effect of
+        significantly speeding up the task creation process as for very large
+        DAGs. Options can be set as string or using the constants defined in
+        the static class ``airflow.utils.WeightRule``
+        |experimental|
+        Since 2.9.0, Airflow allows to define custom priority weight strategy,
+        by creating a subclass of
+        ``airflow.task.priority_strategy.PriorityWeightStrategy`` and 
registering
+        in a plugin, then providing the class path or the class instance via
+        ``weight_rule`` parameter. The custom priority weight strategy will be
+        used to calculate the effective total priority weight of the task 
instance.
+    :param queue: which queue to target when running this job. Not
+        all executors implement queue management, the CeleryExecutor
+        does support targeting specific queues.
+    :param pool: the slot pool this task should run in, slot pools are a
+        way to limit concurrency for certain tasks
+    :param pool_slots: the number of pool slots this task should use (>= 1)
+        Values less than 1 are not allowed.
+    :param sla: time by which the job is expected to succeed. Note that
+        this represents the ``timedelta`` after the period is closed. For
+        example if you set an SLA of 1 hour, the scheduler would send an email
+        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
+        has not succeeded yet.
+        The scheduler pays special attention for jobs with an SLA and
+        sends alert
+        emails for SLA misses. SLA misses are also recorded in the database
+        for future reference. All tasks that share the same SLA time
+        get bundled in a single email, sent soon after that time. SLA
+        notification are sent once and only once for each task instance.
+    :param execution_timeout: max time allowed for the execution of
+        this task instance, if it goes beyond it will raise and fail.
+    :param on_failure_callback: a function or list of functions to be called 
when a task instance
+        of this task fails. a context dictionary is passed as a single
+        parameter to this function. Context contains references to related
+        objects to the task instance and is documented under the macros
+        section of the API.
+    :param on_execute_callback: much like the ``on_failure_callback`` except
+        that it is executed right before the task is executed.
+    :param on_retry_callback: much like the ``on_failure_callback`` except
+        that it is executed when retries occur.
+    :param on_success_callback: much like the ``on_failure_callback`` except
+        that it is executed when the task succeeds.
+    :param on_skipped_callback: much like the ``on_failure_callback`` except
+        that it is executed when skipped occur; this callback will be called 
only if AirflowSkipException get raised.
+        Explicitly it is NOT called if a task is not started to be executed 
because of a preceding branching
+        decision in the DAG or a trigger rule which causes execution to skip 
so that the task execution
+        is never scheduled.
+    :param pre_execute: a function to be called immediately before task
+        execution, receiving a context dictionary; raising an exception will
+        prevent the task from being executed.
+
+        |experimental|
+    :param post_execute: a function to be called immediately after task
+        execution, receiving a context dictionary and task result; raising an
+        exception will prevent the task from succeeding.
+
+        |experimental|
+    :param trigger_rule: defines the rule by which dependencies are applied
+        for the task to get triggered. Options are:
+        ``{ all_success | all_failed | all_done | all_skipped | one_success | 
one_done |
+        one_failed | none_failed | none_failed_min_one_success | none_skipped 
| always}``
+        default is ``all_success``. Options can be set as string or
+        using the constants defined in the static class
+        ``airflow.utils.TriggerRule``
+    :param resources: A map of resource parameter names (the argument names of 
the
+        Resources constructor) to their values.
+    :param run_as_user: unix username to impersonate while running the task
+    :param max_active_tis_per_dag: When set, a task will be able to limit the 
concurrent
+        runs across execution_dates.
+    :param max_active_tis_per_dagrun: When set, a task will be able to limit 
the concurrent
+        task instances per DAG run.
+    :param executor: Which executor to target when running this task. NOT YET 
SUPPORTED
+    :param executor_config: Additional task-level configuration parameters 
that are
+        interpreted by a specific executor. Parameters are namespaced by the 
name of
+        executor.
+
+        **Example**: to run this task in a specific docker container through
+        the KubernetesExecutor ::
+
+            MyOperator(..., executor_config={"KubernetesExecutor": {"image": 
"myCustomDockerImage"}})
+
+    :param do_xcom_push: if True, an XCom is pushed containing the Operator's
+        result
+    :param multiple_outputs: if True and do_xcom_push is True, pushes multiple 
XComs, one for each
+        key in the returned dictionary result. If False and do_xcom_push is 
True, pushes a single XCom.
+    :param task_group: The TaskGroup to which the task should belong. This is 
typically provided when not
+        using a TaskGroup as a context manager.
+    :param doc: Add documentation or notes to your Task objects that is 
visible in
+        Task Instance details View in the Webserver
+    :param doc_md: Add documentation (in Markdown format) or notes to your 
Task objects
+        that is visible in Task Instance details View in the Webserver
+    :param doc_rst: Add documentation (in RST format) or notes to your Task 
objects
+        that is visible in Task Instance details View in the Webserver
+    :param doc_json: Add documentation (in JSON format) or notes to your Task 
objects
+        that is visible in Task Instance details View in the Webserver
+    :param doc_yaml: Add documentation (in YAML format) or notes to your Task 
objects
+        that is visible in Task Instance details View in the Webserver
+    :param task_display_name: The display name of the task which appears on 
the UI.
+    :param logger_name: Name of the logger used by the Operator to emit logs.
+        If set to `None` (default), the logger name will fall back to
+        `airflow.task.operators.{class.__module__}.{class.__name__}` (e.g. 
SimpleHttpOperator will have
+        
*airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator*
 as logger).
+    :param allow_nested_operators: if True, when an operator is executed 
within another one a warning message
+        will be logged. If False, then an exception will be raised if the 
operator is badly used (e.g. nested
+        within another one). In future releases of Airflow this parameter will 
be removed and an exception
+        will always be thrown when operators are nested within each other 
(default is True).
+
+        **Example**: example of a bad operator mixin usage::
+
+            @task(provide_context=True)
+            def say_hello_world(**context):
+                hello_world_task = BashOperator(
+                    task_id="hello_world_task",
+                    bash_command="python -c \"print('Hello, world!')\"",
+                    dag=dag,
+                )
+                hello_world_task.execute(context)
+    """
+
+    task_id: str
+    owner: str = DEFAULT_OWNER
+    email: str | Sequence[str] | None = None
+    email_on_retry: bool = True
+    email_on_failure: bool = True
+    retries: int | None = DEFAULT_RETRIES
+    retry_delay: timedelta | float = DEFAULT_RETRY_DELAY
+    retry_exponential_backoff: bool = False
+    max_retry_delay: timedelta | float | None = None
+    start_date: datetime | None = None
+    end_date: datetime | None = None
+    depends_on_past: bool = False
+    ignore_first_depends_on_past: bool = DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST
+    wait_for_past_depends_before_skipping: bool = 
DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING
+    wait_for_downstream: bool = False
+    dag: DAG | None = None
+    params: collections.abc.MutableMapping | None = None
+    default_args: dict | None = None
+    priority_weight: int = DEFAULT_PRIORITY_WEIGHT
+    # TODO:
+    weight_rule: PriorityWeightStrategy | str = DEFAULT_WEIGHT_RULE
+    queue: str = DEFAULT_QUEUE
+    pool: str = "default"
+    pool_slots: int = DEFAULT_POOL_SLOTS
+    execution_timeout: timedelta | None = DEFAULT_TASK_EXECUTION_TIMEOUT
+    # on_execute_callback: None | TaskStateChangeCallback | 
list[TaskStateChangeCallback] = None
+    # on_failure_callback: None | TaskStateChangeCallback | 
list[TaskStateChangeCallback] = None
+    # on_success_callback: None | TaskStateChangeCallback | 
list[TaskStateChangeCallback] = None
+    # on_retry_callback: None | TaskStateChangeCallback | 
list[TaskStateChangeCallback] = None
+    # on_skipped_callback: None | TaskStateChangeCallback | 
list[TaskStateChangeCallback] = None
+    # pre_execute: TaskPreExecuteHook | None = None
+    # post_execute: TaskPostExecuteHook | None = None

Review Comment:
   Nope, openlineage uses "Listeners" which are global to the deployment, not 
specific to one dag or task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to