This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 5b7156d8643 [v2-10-test] Fix task id validation in BaseOperator
(#44938)
5b7156d8643 is described below
commit 5b7156d8643531caa1374ab58c17d01c21a14f8a
Author: GPK <[email protected]>
AuthorDate: Sun Dec 15 06:21:12 2024 +0000
[v2-10-test] Fix task id validation in BaseOperator (#44938)
* Fix task_id validation in baseoperator
* Fix task_id validation in baseoperator
* add additional tests to check task id length
---
airflow/models/baseoperator.py | 4 +++-
tests/models/test_baseoperator.py | 27 +++++++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 773552184f1..65900276271 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -965,12 +965,14 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
category=RemovedInAirflow3Warning,
stacklevel=3,
)
- validate_key(task_id)
dag = dag or DagContext.get_current_dag()
task_group = task_group or TaskGroupContext.get_current_task_group(dag)
self.task_id = task_group.child_id(task_id) if task_group else task_id
+
+ validate_key(self.task_id)
+
if not self.__from_mapped and task_group:
task_group.add(self)
diff --git a/tests/models/test_baseoperator.py
b/tests/models/test_baseoperator.py
index 48aaf2699b9..8ce9ca195e9 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -564,6 +564,33 @@ class TestBaseOperator:
assert [op2] == tgop3.get_direct_relatives(upstream=False)
assert [op2] == tgop4.get_direct_relatives(upstream=False)
+ def
test_baseoperator_raises_exception_when_task_id_plus_taskgroup_id_exceeds_250_chars(self):
+ """Test exception is raised when operator task id + taskgroup id > 250
chars."""
+ dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())
+
+ tg1 = TaskGroup("A" * 20, dag=dag)
+ with pytest.raises(AirflowException, match="The key has to be less
than 250 characters"):
+ BaseOperator(task_id="1" * 250, task_group=tg1, dag=dag)
+
+ def
test_baseoperator_with_task_id_and_taskgroup_id_less_than_250_chars(self):
+ """Test exception is not raised when operator task id + taskgroup id <
250 chars."""
+ dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())
+
+ tg1 = TaskGroup("A" * 10, dag=dag)
+ try:
+ BaseOperator(task_id="1" * 239, task_group=tg1, dag=dag)
+ except Exception as e:
+ pytest.fail(f"Exception raised: {e}")
+
+ def test_baseoperator_with_task_id_less_than_250_chars(self):
+ """Test exception is not raised when operator task id < 250 chars."""
+ dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())
+
+ try:
+ BaseOperator(task_id="1" * 249, dag=dag)
+ except Exception as e:
+ pytest.fail(f"Exception raised: {e}")
+
def test_chain_linear(self):
dag = DAG(dag_id="test_chain_linear", schedule=None,
start_date=datetime.now())