This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5abe7acf66 Fix generation temp filename in
`DataprocSubmitPySparkJobOperator` (#39498)
5abe7acf66 is described below
commit 5abe7acf666f8021cbdcd358079c2b8b86c37651
Author: Andrey Anshin <[email protected]>
AuthorDate: Thu May 9 03:45:45 2024 +0400
Fix generation temp filename in `DataprocSubmitPySparkJobOperator` (#39498)
---
airflow/providers/google/cloud/operators/dataproc.py | 2 +-
.../google/cloud/operators/test_dataproc.py | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index 34e9395f23..2b93e7ae55 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -2023,7 +2023,7 @@ class
DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
@staticmethod
def _generate_temp_filename(filename):
- return
f"{time:%Y%m%d%H%M%S}_{uuid.uuid4()!s:.8}_{ntpath.basename(filename)}"
+ return
f"{time.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4()!s:.8}_{ntpath.basename(filename)}"
def _upload_file_temp(self, bucket, local_file):
"""Upload a local file to a Google Cloud Storage bucket."""
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py
b/tests/providers/google/cloud/operators/test_dataproc.py
index 7d986b5017..0d97cfe36c 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import inspect
from unittest import mock
from unittest.mock import MagicMock, Mock, call
+from uuid import UUID
import pytest
from google.api_core.exceptions import AlreadyExists, NotFound
@@ -2567,6 +2568,25 @@ class TestDataProcPySparkOperator:
job = op.generate_job()
assert self.job == job
+ @pytest.mark.parametrize(
+ "filename",
+ [
+ pytest.param("/foo/bar/baz.py", id="absolute"),
+ pytest.param("foo/bar/baz.py", id="relative"),
+ pytest.param("baz.py", id="base-filename"),
+ pytest.param(r"C:\foo\bar\baz.py", id="windows-path"),
+ ],
+ )
+ def test_generate_temp_filename(self, filename, time_machine):
+ time_machine.move_to(datetime(2024, 2, 29, 1, 2, 3), tick=False)
+ with mock.patch(
+ DATAPROC_PATH.format("uuid.uuid4"),
return_value=UUID("12345678-0000-4000-0000-000000000000")
+ ):
+ assert (
+
DataprocSubmitPySparkJobOperator._generate_temp_filename(filename)
+ == "20240229010203_12345678_baz.py"
+ )
+
class TestDataprocCreateWorkflowTemplateOperator:
@mock.patch(DATAPROC_PATH.format("DataprocHook"))