This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 5b7156d8643 [v2-10-test] Fix task id validation in BaseOperator 
(#44938)
5b7156d8643 is described below

commit 5b7156d8643531caa1374ab58c17d01c21a14f8a
Author: GPK <[email protected]>
AuthorDate: Sun Dec 15 06:21:12 2024 +0000

    [v2-10-test] Fix task id validation in BaseOperator (#44938)
    
    * Fix task_id validation in baseoperator
    
    * Fix task_id validation in baseoperator
    
    * add additional tests to check task id length
---
 airflow/models/baseoperator.py    |  4 +++-
 tests/models/test_baseoperator.py | 27 +++++++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 773552184f1..65900276271 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -965,12 +965,14 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
                 category=RemovedInAirflow3Warning,
                 stacklevel=3,
             )
-        validate_key(task_id)
 
         dag = dag or DagContext.get_current_dag()
         task_group = task_group or TaskGroupContext.get_current_task_group(dag)
 
         self.task_id = task_group.child_id(task_id) if task_group else task_id
+
+        validate_key(self.task_id)
+
         if not self.__from_mapped and task_group:
             task_group.add(self)
 
diff --git a/tests/models/test_baseoperator.py 
b/tests/models/test_baseoperator.py
index 48aaf2699b9..8ce9ca195e9 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -564,6 +564,33 @@ class TestBaseOperator:
         assert [op2] == tgop3.get_direct_relatives(upstream=False)
         assert [op2] == tgop4.get_direct_relatives(upstream=False)
 
+    def 
test_baseoperator_raises_exception_when_task_id_plus_taskgroup_id_exceeds_250_chars(self):
+        """Test exception is raised when operator task id + taskgroup id > 250 
chars."""
+        dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())
+
+        tg1 = TaskGroup("A" * 20, dag=dag)
+        with pytest.raises(AirflowException, match="The key has to be less 
than 250 characters"):
+            BaseOperator(task_id="1" * 250, task_group=tg1, dag=dag)
+
+    def 
test_baseoperator_with_task_id_and_taskgroup_id_less_than_250_chars(self):
+        """Test exception is not raised when operator task id + taskgroup id < 
250 chars."""
+        dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())
+
+        tg1 = TaskGroup("A" * 10, dag=dag)
+        try:
+            BaseOperator(task_id="1" * 239, task_group=tg1, dag=dag)
+        except Exception as e:
+            pytest.fail(f"Exception raised: {e}")
+
+    def test_baseoperator_with_task_id_less_than_250_chars(self):
+        """Test exception is not raised when operator task id  < 250 chars."""
+        dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())
+
+        try:
+            BaseOperator(task_id="1" * 249, dag=dag)
+        except Exception as e:
+            pytest.fail(f"Exception raised: {e}")
+
     def test_chain_linear(self):
         dag = DAG(dag_id="test_chain_linear", schedule=None, 
start_date=datetime.now())
 

Reply via email to