This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c33ae738aa Add a `@task.stub` to allow tasks in other languages to be 
defined in dags (#56055)
3c33ae738aa is described below

commit 3c33ae738aac3c344a4cf6bed5e7e50cc4d2e0c3
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Sep 24 17:58:33 2025 +0100

    Add a `@task.stub` to allow tasks in other languages to be defined in dags 
(#56055)
    
    As of today, even with Edge Executor and TaskSDK, Dags must be defined in
    Python. However if we want tasks to run in other languages we need 
"something"
    we can put in the python dag that is an operator. This is that.
    
    It's implementation is quite simple -- it checks the function is empty
    (because if it's not someone might mistakenly think it would run the
    function), and then _if_ it ever runs, it raises an exception.
---
 providers/standard/provider.yaml                   |   2 +
 .../airflow/providers/standard/decorators/stub.py  | 102 +++++++++++++++++++++
 .../providers/standard/get_provider_info.py        |   1 +
 .../tests/unit/standard/decorators/test_stub.py    |  57 ++++++++++++
 4 files changed, 162 insertions(+)

diff --git a/providers/standard/provider.yaml b/providers/standard/provider.yaml
index a22788f0f06..c497815d924 100644
--- a/providers/standard/provider.yaml
+++ b/providers/standard/provider.yaml
@@ -141,3 +141,5 @@ task-decorators:
     name: sensor
   - class-name: 
airflow.providers.standard.decorators.short_circuit.short_circuit_task
     name: short_circuit
+  - class-name: airflow.providers.standard.decorators.stub.stub
+    name: stub
diff --git 
a/providers/standard/src/airflow/providers/standard/decorators/stub.py 
b/providers/standard/src/airflow/providers/standard/decorators/stub.py
new file mode 100644
index 00000000000..c771dcdb436
--- /dev/null
+++ b/providers/standard/src/airflow/providers/standard/decorators/stub.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import ast
+from collections.abc import Callable
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+    from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator, 
task_decorator_factory
+else:
+    try:
+        from airflow.sdk.bases.decorator import DecoratedOperator, 
TaskDecorator, task_decorator_factory
+    except ModuleNotFoundError:
+        from airflow.decorators.base import (
+            DecoratedOperator,
+            TaskDecorator,
+            task_decorator_factory,
+        )
+
+if TYPE_CHECKING:
+    from airflow.sdk.definitions.context import Context
+
+
+class _StubOperator(DecoratedOperator):
+    custom_operator_name: str = "@task.stub"
+
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        task_id: str,
+        **kwargs,
+    ) -> None:
+        super().__init__(
+            python_callable=python_callable,
+            task_id=task_id,
+            **kwargs,
+        )
+        # Validate python callable
+        module = ast.parse(self.get_python_source())
+
+        if len(module.body) != 1:
+            raise RuntimeError("Expected a single statement")
+        fn = module.body[0]
+        if not isinstance(fn, ast.FunctionDef):
+            raise RuntimeError("Expected a single sync function")
+        for stmt in fn.body:
+            if isinstance(stmt, ast.Pass):
+                continue
+            if isinstance(stmt, ast.Expr):
+                if isinstance(stmt.value, ast.Constant) and 
isinstance(stmt.value.value, (str, type(...))):
+                    continue
+
+            raise ValueError(
+                f"Functions passed to @task.stub must be an empty function 
(`pass`, or `...` only) (got {stmt})"
+            )
+
+        ...
+
+    def execute(self, context: Context) -> Any:
+        raise RuntimeError(
+            "@task.stub should not be executed directly -- we expected this to 
go to a remote worker. "
+            "Check your pool and worker configs"
+        )
+
+
+def stub(
+    python_callable: Callable | None = None,
+    queue: str | None = None,
+    executor: str | None = None,
+    **kwargs,
+) -> TaskDecorator:
+    """
+    Define a stub task in the DAG.
+
+    Stub tasks exist in the Dag graph only, but the execution must happen in 
an external
+    environment via the Task Execution Interface.
+
+    """
+    return task_decorator_factory(
+        decorated_operator_class=_StubOperator,
+        python_callable=python_callable,
+        queue=queue,
+        executor=executor,
+        **kwargs,
+    )
diff --git 
a/providers/standard/src/airflow/providers/standard/get_provider_info.py 
b/providers/standard/src/airflow/providers/standard/get_provider_info.py
index bd7118c78aa..4babc6dafe8 100644
--- a/providers/standard/src/airflow/providers/standard/get_provider_info.py
+++ b/providers/standard/src/airflow/providers/standard/get_provider_info.py
@@ -144,5 +144,6 @@ def get_provider_info():
                 "class-name": 
"airflow.providers.standard.decorators.short_circuit.short_circuit_task",
                 "name": "short_circuit",
             },
+            {"class-name": "airflow.providers.standard.decorators.stub.stub", 
"name": "stub"},
         ],
     }
diff --git a/providers/standard/tests/unit/standard/decorators/test_stub.py 
b/providers/standard/tests/unit/standard/decorators/test_stub.py
new file mode 100644
index 00000000000..406318ca2b9
--- /dev/null
+++ b/providers/standard/tests/unit/standard/decorators/test_stub.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import contextlib
+
+import pytest
+
+from airflow.providers.standard.decorators.stub import stub
+
+
+def fn_ellipsis(): ...
+
+
+def fn_pass(): ...
+
+
+def fn_doc():
+    """Some string"""
+
+
+def fn_doc_pass():
+    """Some string"""
+    pass
+
+
+def fn_code():
+    return None
+
+
[email protected](
+    ("fn", "error"),
+    [
+        pytest.param(fn_ellipsis, contextlib.nullcontext(), id="ellipsis"),
+        pytest.param(fn_pass, contextlib.nullcontext(), id="pass"),
+        pytest.param(fn_doc, contextlib.nullcontext(), id="doc"),
+        pytest.param(fn_doc_pass, contextlib.nullcontext(), id="doc-and-pass"),
+        pytest.param(fn_code, pytest.raises(ValueError, match="must be an 
empty function"), id="not-empty"),
+    ],
+)
+def test_stub_signature(fn, error):
+    with error:
+        stub(fn)()

Reply via email to