This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ce4b8e3029 fix(providers/google): sanitize Dataproc batch labels to 
use dashes instead of underscores (#62926)
9ce4b8e3029 is described below

commit 9ce4b8e30295646f5f1fec938f809f51c3219314
Author: Yoann <[email protected]>
AuthorDate: Wed Mar 11 17:50:34 2026 -0700

    fix(providers/google): sanitize Dataproc batch labels to use dashes instead 
of underscores (#62926)
    
    * fix(providers/google): sanitize Dataproc batch labels to use dashes 
instead of underscores
    
    GCP labels only allow lowercase letters, numbers, and dashes.
    `__update_batch_labels` was replacing dots/spaces with underscores,
    causing InvalidArgument errors when DAG/task IDs contained underscores
    (e.g. task groups with dots in their path).
    
    Also fixes the validation regex to match GCP's actual label requirements
    and corrects the dag_display_name validation check which was incorrectly
    testing dag_id instead.
    
    Closes: #59332
    
    * fix: use valid dag_id in test (no spaces allowed)
    
    * fix: apply ruff format
---
 .../providers/google/cloud/operators/dataproc.py   | 10 +++++-----
 .../unit/google/cloud/operators/test_dataproc.py   | 23 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py 
b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
index 38ac0ff8fc4..41e76713b1e 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py
@@ -2662,10 +2662,10 @@ class 
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
             )
 
     def __update_batch_labels(self):
-        dag_id = re.sub(r"[.\s]", "_", self.dag_id.lower())
-        task_id = re.sub(r"[.\s]", "_", self.task_id.lower())
+        dag_id = re.sub(r"[^a-z0-9-]", "-", self.dag_id.lower())
+        task_id = re.sub(r"[^a-z0-9-]", "-", self.task_id.lower())
 
-        labels_regex = re.compile(r"^[a-z][\w-]{0,62}$")
+        labels_regex = re.compile(r"^[a-z0-9][a-z0-9-]{0,62}$")
         if not labels_regex.match(dag_id) or not labels_regex.match(task_id):
             return
 
@@ -2673,8 +2673,8 @@ class 
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
         new_labels = {"airflow-dag-id": dag_id, "airflow-task-id": task_id}
 
         if self._dag:
-            dag_display_name = re.sub(r"[.\s]", "_", 
self._dag.dag_display_name.lower())
-            if labels_regex.match(dag_id):
+            dag_display_name = re.sub(r"[^a-z0-9-]", "-", 
self._dag.dag_display_name.lower())
+            if labels_regex.match(dag_display_name):
                 new_labels["airflow-dag-display-name"] = dag_display_name
 
         if isinstance(self.batch, Batch):
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py 
b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index edb2bbe6c1c..20e622ac356 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -4275,6 +4275,29 @@ class TestDataprocCreateBatchOperator:
         )
         TestDataprocCreateBatchOperator.__assert_batch_create(mock_hook, 
expected_batch)
 
+    @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
+    @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+    def test_create_batch_labels_sanitize_underscores_and_dots(self, 
mock_hook, to_dict_mock):
+        """Labels with dots and underscores should be replaced with dashes 
(GCP requirement)."""
+        dag_id_with_dots = "my.dag_id.with.dots"
+        task_id_with_underscores = "process_data.step_one"
+        expected_batch = {
+            **BATCH,
+            "labels": {
+                "airflow-dag-id": "my-dag-id-with-dots",
+                "airflow-dag-display-name": "my-dag-id-with-dots",
+                "airflow-task-id": "process-data-step-one",
+            },
+        }
+        DataprocCreateBatchOperator(
+            task_id=task_id_with_underscores,
+            dag=DAG(dag_id=dag_id_with_dots),
+            batch=BATCH,
+            region=GCP_REGION,
+        ).execute(context=EXAMPLE_CONTEXT)
+
+        TestDataprocCreateBatchOperator.__assert_batch_create(mock_hook, 
expected_batch)
+
 
 class TestDataprocDeleteBatchOperator:
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))

Reply via email to