This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ce4b8e3029 fix(providers/google): sanitize Dataproc batch labels to
use dashes instead of underscores (#62926)
9ce4b8e3029 is described below
commit 9ce4b8e30295646f5f1fec938f809f51c3219314
Author: Yoann <[email protected]>
AuthorDate: Wed Mar 11 17:50:34 2026 -0700
fix(providers/google): sanitize Dataproc batch labels to use dashes instead
of underscores (#62926)
* fix(providers/google): sanitize Dataproc batch labels to use dashes
instead of underscores
GCP labels only allow lowercase letters, numbers, and dashes.
`__update_batch_labels` was replacing dots/spaces with underscores,
causing InvalidArgument errors when DAG/task IDs contained underscores
(e.g. task groups with dots in their path).
Also fixes the validation regex to match GCP's actual label requirements
and corrects the dag_display_name validation check which was incorrectly
testing dag_id instead.
Closes: #59332
* fix: use valid dag_id in test (no spaces allowed)
* fix: apply ruff format
---
.../providers/google/cloud/operators/dataproc.py | 10 +++++-----
.../unit/google/cloud/operators/test_dataproc.py | 23 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
index 38ac0ff8fc4..41e76713b1e 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
@@ -2662,10 +2662,10 @@ class
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
)
def __update_batch_labels(self):
- dag_id = re.sub(r"[.\s]", "_", self.dag_id.lower())
- task_id = re.sub(r"[.\s]", "_", self.task_id.lower())
+ dag_id = re.sub(r"[^a-z0-9-]", "-", self.dag_id.lower())
+ task_id = re.sub(r"[^a-z0-9-]", "-", self.task_id.lower())
- labels_regex = re.compile(r"^[a-z][\w-]{0,62}$")
+ labels_regex = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
if not labels_regex.match(dag_id) or not labels_regex.match(task_id):
return
@@ -2673,8 +2673,8 @@ class
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
new_labels = {"airflow-dag-id": dag_id, "airflow-task-id": task_id}
if self._dag:
- dag_display_name = re.sub(r"[.\s]", "_",
self._dag.dag_display_name.lower())
- if labels_regex.match(dag_id):
+ dag_display_name = re.sub(r"[^a-z0-9-]", "-",
self._dag.dag_display_name.lower())
+ if labels_regex.match(dag_display_name):
new_labels["airflow-dag-display-name"] = dag_display_name
if isinstance(self.batch, Batch):
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index edb2bbe6c1c..20e622ac356 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -4275,6 +4275,29 @@ class TestDataprocCreateBatchOperator:
)
TestDataprocCreateBatchOperator.__assert_batch_create(mock_hook,
expected_batch)
+ @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
+ @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+ def test_create_batch_labels_sanitize_underscores_and_dots(self,
mock_hook, to_dict_mock):
+ """Labels with dots and underscores should be replaced with dashes
(GCP requirement)."""
+ dag_id_with_dots = "my.dag_id.with.dots"
+ task_id_with_underscores = "process_data.step_one"
+ expected_batch = {
+ **BATCH,
+ "labels": {
+ "airflow-dag-id": "my-dag-id-with-dots",
+ "airflow-dag-display-name": "my-dag-id-with-dots",
+ "airflow-task-id": "process-data-step-one",
+ },
+ }
+ DataprocCreateBatchOperator(
+ task_id=task_id_with_underscores,
+ dag=DAG(dag_id=dag_id_with_dots),
+ batch=BATCH,
+ region=GCP_REGION,
+ ).execute(context=EXAMPLE_CONTEXT)
+
+ TestDataprocCreateBatchOperator.__assert_batch_create(mock_hook,
expected_batch)
+
class TestDataprocDeleteBatchOperator:
@mock.patch(DATAPROC_PATH.format("DataprocHook"))