This is an automated email from the ASF dual-hosted git repository.

rnhttr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 48f00a7093e FIX: BigQuery traceability labels missing in TaskGroup 
(#47583)
48f00a7093e is described below

commit 48f00a7093e5da53a61fc97afdecd8fe97bfe170
Author: punitchauhan771 <[email protected]>
AuthorDate: Sat Mar 15 02:05:08 2025 +0530

    FIX: BigQuery traceability labels missing in TaskGroup (#47583)
    
    * FIX: BigQuery traceability labels missing in TaskGroup
    
    * Test: Add BigQuery TaskGroup traceability label propagation tests
    
    - Added test cases to verify that:
      • Task IDs with dots ('.') are sanitized by replacing them with hyphens 
('-').
      • In TaskGroups with prefix_group_id set to False, task IDs containing 
dots are correctly transformed.
    - Removed the obsolete test case from test_labels_invalid_names() that 
previously raised errors for task names with dots, as this scenario is now 
supported.
    
    * Fix: Replace specific BigQuery query with generic test query
    
    Replaces the hardcoded query referencing 
`bigquery-public-data.america_health_rankings`
    with a generic `SELECT * FROM any` for test purposes. This avoids 
dependency on a
    specific public dataset.
    
    * fix: run pre-commit to correct code structure
---
 .../providers/google/cloud/operators/bigquery.py   |  2 +-
 .../unit/google/cloud/operators/test_bigquery.py   | 73 +++++++++++++++++++---
 2 files changed, 66 insertions(+), 9 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index fdbd9900070..1952c7261cf 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -2897,7 +2897,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryInsertJobOpera
 
     def _add_job_labels(self) -> None:
         dag_label = self.dag_id.lower()
-        task_label = self.task_id.lower()
+        task_label = self.task_id.lower().replace(".", "-")
 
         if LABEL_REGEX.match(dag_label) and LABEL_REGEX.match(task_label):
             automatic_labels = {"airflow-dag": dag_label, "airflow-task": 
task_label}
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
index f488bd7229b..01f42d19473 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
@@ -79,6 +79,7 @@ from airflow.providers.google.cloud.triggers.bigquery import (
     BigQueryIntervalCheckTrigger,
     BigQueryValueCheckTrigger,
 )
+from airflow.utils.task_group import TaskGroup
 from airflow.utils.timezone import datetime
 
 pytestmark = pytest.mark.db_test
@@ -2300,7 +2301,7 @@ class TestBigQueryInsertJobOperator:
             },
         }
         op = BigQueryInsertJobOperator(
-            task_id="task.with.dots.is.allowed",
+            
task_id="task_id_with_exactly_64_characters_00000000000000000000000000000",
             configuration=configuration,
             location=TEST_DATASET_LOCATION,
             project_id=TEST_GCP_PROJECT_ID,
@@ -2308,14 +2309,70 @@ class TestBigQueryInsertJobOperator:
         op._add_job_labels()
         assert "labels" not in configuration
 
-        op = BigQueryInsertJobOperator(
-            
task_id="task_id_with_exactly_64_characters_00000000000000000000000000000",
-            configuration=configuration,
-            location=TEST_DATASET_LOCATION,
-            project_id=TEST_GCP_PROJECT_ID,
-        )
+    def test_labels_replace_dots_with_hyphens(self, dag_maker):
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            },
+        }
+        with dag_maker("dag_replace_dots_with_hyphens"):
+            op = BigQueryInsertJobOperator(
+                task_id="task.name.with.dots",
+                configuration=configuration,
+                location=TEST_DATASET_LOCATION,
+                project_id=TEST_GCP_PROJECT_ID,
+            )
         op._add_job_labels()
-        assert "labels" not in configuration
+        assert "labels" in configuration
+        assert configuration["labels"]["airflow-dag"] == 
"dag_replace_dots_with_hyphens"
+        assert configuration["labels"]["airflow-task"] == "task-name-with-dots"
+
+        with dag_maker("dag_with_taskgroup"):
+            with TaskGroup("task_group"):
+                op = BigQueryInsertJobOperator(
+                    task_id="task_name",
+                    configuration=configuration,
+                    location=TEST_DATASET_LOCATION,
+                    project_id=TEST_GCP_PROJECT_ID,
+                )
+        op._add_job_labels()
+        assert "labels" in configuration
+        assert configuration["labels"]["airflow-dag"] == "dag_with_taskgroup"
+        assert configuration["labels"]["airflow-task"] == 
"task_group-task_name"
+
+    def test_labels_with_task_group_prefix_group_id(self, dag_maker):
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            },
+        }
+        with dag_maker("dag_with_taskgroup"):
+            with TaskGroup("task_group", prefix_group_id=False):
+                op = BigQueryInsertJobOperator(
+                    task_id="task_name",
+                    configuration=configuration,
+                    location=TEST_DATASET_LOCATION,
+                    project_id=TEST_GCP_PROJECT_ID,
+                )
+        op._add_job_labels()
+        assert "labels" in configuration
+        assert configuration["labels"]["airflow-dag"] == "dag_with_taskgroup"
+        assert configuration["labels"]["airflow-task"] == "task_name"
+
+        with dag_maker("dag_with_taskgroup_prefix_group_id_false_with_dots"):
+            with TaskGroup("task_group_prefix_group_id_false", 
prefix_group_id=False):
+                op = BigQueryInsertJobOperator(
+                    task_id="task.name.with.dots",
+                    configuration=configuration,
+                    location=TEST_DATASET_LOCATION,
+                    project_id=TEST_GCP_PROJECT_ID,
+                )
+        op._add_job_labels()
+        assert "labels" in configuration
+        assert configuration["labels"]["airflow-dag"] == 
"dag_with_taskgroup_prefix_group_id_false_with_dots"
+        assert configuration["labels"]["airflow-task"] == "task-name-with-dots"
 
     def test_handle_job_error_raises_on_error_result_or_error(self, caplog):
         caplog.set_level(logging.ERROR)

Reply via email to