This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a43e98d050 Fix DataprocJobBaseOperator not being compatible with 
dotted names (#23439). (#23791)
a43e98d050 is described below

commit a43e98d05047d9c4d5a7778bcb10efc4bdef7a01
Author: Guilherme Martins Crocetti 
<[email protected]>
AuthorDate: Sun May 22 08:43:21 2022 -0300

    Fix DataprocJobBaseOperator not being compatible with dotted names 
(#23439). (#23791)
    
    * job_name parameter is now sanitized, replacing dots by underscores.
---
 airflow/providers/google/cloud/hooks/dataproc.py   |  7 +++--
 .../providers/google/cloud/hooks/test_dataproc.py  | 32 +++++++++++++-------
 .../google/cloud/operators/test_dataproc.py        | 35 +++++++++++++++++-----
 3 files changed, 52 insertions(+), 22 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataproc.py 
b/airflow/providers/google/cloud/hooks/dataproc.py
index 784b4dd169..d870d80726 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -58,7 +58,7 @@ class DataProcJobBuilder:
         job_type: str,
         properties: Optional[Dict[str, str]] = None,
     ) -> None:
-        name = task_id + "_" + str(uuid.uuid4())[:8]
+        name = f"{task_id.replace('.', '_')}_{uuid.uuid4()!s:.8}"
         self.job_type = job_type
         self.job = {
             "job": {
@@ -175,11 +175,12 @@ class DataProcJobBuilder:
 
     def set_job_name(self, name: str) -> None:
         """
-        Set Dataproc job name.
+        Set Dataproc job name. Job name is sanitized, replacing dots by 
underscores.
 
         :param name: Job name.
         """
-        self.job["job"]["reference"]["job_id"] = name + "_" + 
str(uuid.uuid4())[:8]
+        sanitized_name = f"{name.replace('.', '_')}_{uuid.uuid4()!s:.8}"
+        self.job["job"]["reference"]["job_id"] = sanitized_name
 
     def build(self) -> Dict:
         """
diff --git a/tests/providers/google/cloud/hooks/test_dataproc.py 
b/tests/providers/google/cloud/hooks/test_dataproc.py
index 2356f3cc00..a8d91daf40 100644
--- a/tests/providers/google/cloud/hooks/test_dataproc.py
+++ b/tests/providers/google/cloud/hooks/test_dataproc.py
@@ -23,6 +23,7 @@ from unittest.mock import ANY
 import pytest
 from google.api_core.gapic_v1.method import DEFAULT
 from google.cloud.dataproc_v1 import JobStatus
+from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, 
DataProcJobBuilder
@@ -472,27 +473,28 @@ class TestDataProcJobBuilder(unittest.TestCase):
             properties={"test": "test"},
         )
 
+    @parameterized.expand([TASK_ID, f"group.{TASK_ID}"])
     @mock.patch(DATAPROC_STRING.format("uuid.uuid4"))
-    def test_init(self, mock_uuid):
+    def test_init(self, job_name, mock_uuid):
         mock_uuid.return_value = "uuid"
         properties = {"test": "test"}
-        job = {
+        expected_job_id = f"{job_name}_{mock_uuid.return_value}".replace(".", 
"_")
+        expected_job = {
             "job": {
                 "labels": {"airflow-version": AIRFLOW_VERSION},
                 "placement": {"cluster_name": CLUSTER_NAME},
-                "reference": {"job_id": TASK_ID + "_uuid", "project_id": 
GCP_PROJECT},
+                "reference": {"job_id": expected_job_id, "project_id": 
GCP_PROJECT},
                 "test": {"properties": properties},
             }
         }
         builder = DataProcJobBuilder(
             project_id=GCP_PROJECT,
-            task_id=TASK_ID,
+            task_id=job_name,
             cluster_name=CLUSTER_NAME,
             job_type="test",
             properties=properties,
         )
-
-        assert job == builder.job
+        assert expected_job == builder.job
 
     def test_add_labels(self):
         labels = {"key": "value"}
@@ -559,14 +561,22 @@ class TestDataProcJobBuilder(unittest.TestCase):
         self.builder.set_python_main(main)
         assert main == 
self.builder.job["job"][self.job_type]["main_python_file_uri"]
 
+    @parameterized.expand(
+        [
+            ("simple", "name"),
+            ("name with underscores", "name_with_dash"),
+            ("name with dot", "group.name"),
+            ("name with dot and underscores", "group.name_with_dash"),
+        ]
+    )
     @mock.patch(DATAPROC_STRING.format("uuid.uuid4"))
-    def test_set_job_name(self, mock_uuid):
+    def test_set_job_name(self, name, job_name, mock_uuid):
         uuid = "test_uuid"
+        expected_job_name = f"{job_name}_{uuid[:8]}".replace(".", "_")
         mock_uuid.return_value = uuid
-        name = "name"
-        self.builder.set_job_name(name)
-        name += "_" + uuid[:8]
-        assert name == self.builder.job["job"]["reference"]["job_id"]
+        self.builder.set_job_name(job_name)
+        assert expected_job_name == 
self.builder.job["job"]["reference"]["job_id"]
+        assert len(self.builder.job["job"]["reference"]["job_id"]) == 
len(job_name) + 9
 
     def test_build(self):
         assert self.builder.job == self.builder.build()
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py 
b/tests/providers/google/cloud/operators/test_dataproc.py
index 7342fc57a6..7992f5423e 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -1204,8 +1204,9 @@ class TestDataProcHiveOperator(unittest.TestCase):
     query = "define sin HiveUDF('sin');"
     variables = {"key": "value"}
     job_id = "uuid_id"
+    job_name = "simple"
     job = {
-        "reference": {"project_id": GCP_PROJECT, "job_id": 
"{{task.task_id}}_{{ds_nodash}}_" + job_id},
+        "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
         "labels": {"airflow-version": AIRFLOW_VERSION},
         "hive_job": {"query_list": {"queries": [query]}, "script_variables": 
variables},
@@ -1226,6 +1227,7 @@ class TestDataProcHiveOperator(unittest.TestCase):
         mock_hook.return_value.submit_job.return_value.reference.job_id = 
self.job_id
 
         op = DataprocSubmitHiveJobOperator(
+            job_name=self.job_name,
             task_id=TASK_ID,
             region=GCP_LOCATION,
             gcp_conn_id=GCP_CONN_ID,
@@ -1249,6 +1251,7 @@ class TestDataProcHiveOperator(unittest.TestCase):
         mock_uuid.return_value = self.job_id
 
         op = DataprocSubmitHiveJobOperator(
+            job_name=self.job_name,
             task_id=TASK_ID,
             region=GCP_LOCATION,
             gcp_conn_id=GCP_CONN_ID,
@@ -1263,8 +1266,9 @@ class TestDataProcPigOperator(unittest.TestCase):
     query = "define sin HiveUDF('sin');"
     variables = {"key": "value"}
     job_id = "uuid_id"
+    job_name = "simple"
     job = {
-        "reference": {"project_id": GCP_PROJECT, "job_id": 
"{{task.task_id}}_{{ds_nodash}}_" + job_id},
+        "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
         "labels": {"airflow-version": AIRFLOW_VERSION},
         "pig_job": {"query_list": {"queries": [query]}, "script_variables": 
variables},
@@ -1285,6 +1289,7 @@ class TestDataProcPigOperator(unittest.TestCase):
         mock_hook.return_value.submit_job.return_value.reference.job_id = 
self.job_id
 
         op = DataprocSubmitPigJobOperator(
+            job_name=self.job_name,
             task_id=TASK_ID,
             region=GCP_LOCATION,
             gcp_conn_id=GCP_CONN_ID,
@@ -1308,6 +1313,7 @@ class TestDataProcPigOperator(unittest.TestCase):
         mock_uuid.return_value = self.job_id
 
         op = DataprocSubmitPigJobOperator(
+            job_name=self.job_name,
             task_id=TASK_ID,
             region=GCP_LOCATION,
             gcp_conn_id=GCP_CONN_ID,
@@ -1321,15 +1327,16 @@ class TestDataProcPigOperator(unittest.TestCase):
 class TestDataProcSparkSqlOperator(unittest.TestCase):
     query = "SHOW DATABASES;"
     variables = {"key": "value"}
+    job_name = "simple"
     job_id = "uuid_id"
     job = {
-        "reference": {"project_id": GCP_PROJECT, "job_id": 
"{{task.task_id}}_{{ds_nodash}}_" + job_id},
+        "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
         "labels": {"airflow-version": AIRFLOW_VERSION},
         "spark_sql_job": {"query_list": {"queries": [query]}, 
"script_variables": variables},
     }
     other_project_job = {
-        "reference": {"project_id": "other-project", "job_id": 
"{{task.task_id}}_{{ds_nodash}}_" + job_id},
+        "reference": {"project_id": "other-project", "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
         "labels": {"airflow-version": AIRFLOW_VERSION},
         "spark_sql_job": {"query_list": {"queries": [query]}, 
"script_variables": variables},
@@ -1350,6 +1357,7 @@ class TestDataProcSparkSqlOperator(unittest.TestCase):
         mock_hook.return_value.submit_job.return_value.reference.job_id = 
self.job_id
 
         op = DataprocSubmitSparkSqlJobOperator(
+            job_name=self.job_name,
             task_id=TASK_ID,
             region=GCP_LOCATION,
             gcp_conn_id=GCP_CONN_ID,
@@ -1375,6 +1383,7 @@ class TestDataProcSparkSqlOperator(unittest.TestCase):
         mock_hook.return_value.submit_job.return_value.reference.job_id = 
self.job_id
 
         op = DataprocSubmitSparkSqlJobOperator(
+            job_name=self.job_name,
             project_id="other-project",
             task_id=TASK_ID,
             region=GCP_LOCATION,
@@ -1399,6 +1408,7 @@ class TestDataProcSparkSqlOperator(unittest.TestCase):
         mock_uuid.return_value = self.job_id
 
         op = DataprocSubmitSparkSqlJobOperator(
+            job_name=self.job_name,
             task_id=TASK_ID,
             region=GCP_LOCATION,
             gcp_conn_id=GCP_CONN_ID,
@@ -1412,10 +1422,11 @@ class TestDataProcSparkSqlOperator(unittest.TestCase):
 class TestDataProcSparkOperator(DataprocJobTestBase):
     main_class = "org.apache.spark.examples.SparkPi"
     jars = ["file:///usr/lib/spark/examples/jars/spark-examples.jar"]
+    job_name = "simple"
     job = {
         "reference": {
             "project_id": GCP_PROJECT,
-            "job_id": "{{task.task_id}}_{{ds_nodash}}_" + TEST_JOB_ID,
+            "job_id": f"{job_name}_{TEST_JOB_ID}",
         },
         "placement": {"cluster_name": "cluster-1"},
         "labels": {"airflow-version": AIRFLOW_VERSION},
@@ -1440,6 +1451,7 @@ class TestDataProcSparkOperator(DataprocJobTestBase):
         self.extra_links_manager_mock.attach_mock(mock_hook, 'hook')
 
         op = DataprocSubmitSparkJobOperator(
+            job_name=self.job_name,
             task_id=TASK_ID,
             region=GCP_LOCATION,
             gcp_conn_id=GCP_CONN_ID,
@@ -1505,9 +1517,10 @@ def 
test_submit_spark_job_operator_extra_links(mock_hook, dag_maker, create_task
 class TestDataProcHadoopOperator(unittest.TestCase):
     args = ["wordcount", "gs://pub/shakespeare/rose.txt"]
     jar = "file:///usr/lib/spark/examples/jars/spark-examples.jar"
+    job_name = "simple"
     job_id = "uuid_id"
     job = {
-        "reference": {"project_id": GCP_PROJECT, "job_id": 
"{{task.task_id}}_{{ds_nodash}}_" + job_id},
+        "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
         "labels": {"airflow-version": AIRFLOW_VERSION},
         "hadoop_job": {"main_jar_file_uri": jar, "args": args},
@@ -1529,6 +1542,7 @@ class TestDataProcHadoopOperator(unittest.TestCase):
         mock_uuid.return_value = self.job_id
 
         op = DataprocSubmitHadoopJobOperator(
+            job_name=self.job_name,
             task_id=TASK_ID,
             region=GCP_LOCATION,
             gcp_conn_id=GCP_CONN_ID,
@@ -1542,8 +1556,9 @@ class TestDataProcHadoopOperator(unittest.TestCase):
 class TestDataProcPySparkOperator(unittest.TestCase):
     uri = "gs://{}/{}"
     job_id = "uuid_id"
+    job_name = "simple"
     job = {
-        "reference": {"project_id": GCP_PROJECT, "job_id": 
"{{task.task_id}}_{{ds_nodash}}_" + job_id},
+        "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
         "labels": {"airflow-version": AIRFLOW_VERSION},
         "pyspark_job": {"main_python_file_uri": uri},
@@ -1562,7 +1577,11 @@ class TestDataProcPySparkOperator(unittest.TestCase):
         mock_uuid.return_value = self.job_id
 
         op = DataprocSubmitPySparkJobOperator(
-            task_id=TASK_ID, region=GCP_LOCATION, gcp_conn_id=GCP_CONN_ID, 
main=self.uri
+            job_name=self.job_name,
+            task_id=TASK_ID,
+            region=GCP_LOCATION,
+            gcp_conn_id=GCP_CONN_ID,
+            main=self.uri,
         )
         job = op.generate_job()
         assert self.job == job

Reply via email to