This is an automated email from the ASF dual-hosted git repository.
rnhttr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 48f00a7093e FIX: BigQuery traceability labels missing in TaskGroup
(#47583)
48f00a7093e is described below
commit 48f00a7093e5da53a61fc97afdecd8fe97bfe170
Author: punitchauhan771 <[email protected]>
AuthorDate: Sat Mar 15 02:05:08 2025 +0530
FIX: BigQuery traceability labels missing in TaskGroup (#47583)
* FIX: BigQuery traceability labels missing in TaskGroup
* Test: Add BigQuery TaskGroup traceability label propagation tests
- Added test cases to verify that:
• Task IDs with dots ('.') are sanitized by replacing them with hyphens
('-').
• In TaskGroups with prefix_group_id set to False, task IDs containing
dots are correctly transformed.
- Removed the obsolete test case from test_labels_invalid_names() that
previously raised errors for task names with dots, as this scenario is now
supported.
* Fix: Replace specific BigQuery query with generic test query
Replaces the hardcoded query referencing
`bigquery-public-data.america_health_rankings`
with a generic `SELECT * FROM any` for test purposes. This avoids
dependency on a
specific public dataset.
* fix: run pre-commit to correct code structure
---
.../providers/google/cloud/operators/bigquery.py | 2 +-
.../unit/google/cloud/operators/test_bigquery.py | 73 +++++++++++++++++++---
2 files changed, 66 insertions(+), 9 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index fdbd9900070..1952c7261cf 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -2897,7 +2897,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator,
_BigQueryInsertJobOpera
def _add_job_labels(self) -> None:
dag_label = self.dag_id.lower()
- task_label = self.task_id.lower()
+ task_label = self.task_id.lower().replace(".", "-")
if LABEL_REGEX.match(dag_label) and LABEL_REGEX.match(task_label):
automatic_labels = {"airflow-dag": dag_label, "airflow-task":
task_label}
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
index f488bd7229b..01f42d19473 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
@@ -79,6 +79,7 @@ from airflow.providers.google.cloud.triggers.bigquery import (
BigQueryIntervalCheckTrigger,
BigQueryValueCheckTrigger,
)
+from airflow.utils.task_group import TaskGroup
from airflow.utils.timezone import datetime
pytestmark = pytest.mark.db_test
@@ -2300,7 +2301,7 @@ class TestBigQueryInsertJobOperator:
},
}
op = BigQueryInsertJobOperator(
- task_id="task.with.dots.is.allowed",
+
task_id="task_id_with_exactly_64_characters_00000000000000000000000000000",
configuration=configuration,
location=TEST_DATASET_LOCATION,
project_id=TEST_GCP_PROJECT_ID,
@@ -2308,14 +2309,70 @@ class TestBigQueryInsertJobOperator:
op._add_job_labels()
assert "labels" not in configuration
- op = BigQueryInsertJobOperator(
-
task_id="task_id_with_exactly_64_characters_00000000000000000000000000000",
- configuration=configuration,
- location=TEST_DATASET_LOCATION,
- project_id=TEST_GCP_PROJECT_ID,
- )
+ def test_labels_replace_dots_with_hyphens(self, dag_maker):
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ },
+ }
+ with dag_maker("dag_replace_dots_with_hyphens"):
+ op = BigQueryInsertJobOperator(
+ task_id="task.name.with.dots",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
op._add_job_labels()
- assert "labels" not in configuration
+ assert "labels" in configuration
+ assert configuration["labels"]["airflow-dag"] ==
"dag_replace_dots_with_hyphens"
+ assert configuration["labels"]["airflow-task"] == "task-name-with-dots"
+
+ with dag_maker("dag_with_taskgroup"):
+ with TaskGroup("task_group"):
+ op = BigQueryInsertJobOperator(
+ task_id="task_name",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
+ op._add_job_labels()
+ assert "labels" in configuration
+ assert configuration["labels"]["airflow-dag"] == "dag_with_taskgroup"
+ assert configuration["labels"]["airflow-task"] ==
"task_group-task_name"
+
+ def test_labels_with_task_group_prefix_group_id(self, dag_maker):
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ },
+ }
+ with dag_maker("dag_with_taskgroup"):
+ with TaskGroup("task_group", prefix_group_id=False):
+ op = BigQueryInsertJobOperator(
+ task_id="task_name",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
+ op._add_job_labels()
+ assert "labels" in configuration
+ assert configuration["labels"]["airflow-dag"] == "dag_with_taskgroup"
+ assert configuration["labels"]["airflow-task"] == "task_name"
+
+ with dag_maker("dag_with_taskgroup_prefix_group_id_false_with_dots"):
+ with TaskGroup("task_group_prefix_group_id_false",
prefix_group_id=False):
+ op = BigQueryInsertJobOperator(
+ task_id="task.name.with.dots",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
+ op._add_job_labels()
+ assert "labels" in configuration
+ assert configuration["labels"]["airflow-dag"] ==
"dag_with_taskgroup_prefix_group_id_false_with_dots"
+ assert configuration["labels"]["airflow-task"] == "task-name-with-dots"
def test_handle_job_error_raises_on_error_result_or_error(self, caplog):
caplog.set_level(logging.ERROR)