This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1fe8cddde8 Fix location requirement in
DataflowTemplatedJobStartOperator (#37069)
1fe8cddde8 is described below
commit 1fe8cddde8159ecffc61f204b8383f5d1ff39047
Author: shardbread <[email protected]>
AuthorDate: Mon Feb 5 15:34:33 2024 +0300
Fix location requirement in DataflowTemplatedJobStartOperator (#37069)
Co-authored-by: tverdokhlib <[email protected]>
---
airflow/providers/google/cloud/operators/dataflow.py | 5 ++++-
tests/providers/google/cloud/operators/test_dataflow.py | 7 +++++--
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataflow.py
b/airflow/providers/google/cloud/operators/dataflow.py
index 2b4b787262..7cfc2e6c06 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -676,6 +676,9 @@ class
DataflowTemplatedJobStartOperator(GoogleCloudBaseOperator):
options = self.dataflow_default_options
options.update(self.options)
+ if not self.location:
+ self.location = DEFAULT_DATAFLOW_LOCATION
+
self.job = self.hook.start_template_dataflow(
job_name=self.job_name,
variables=options,
@@ -703,7 +706,7 @@ class
DataflowTemplatedJobStartOperator(GoogleCloudBaseOperator):
trigger=TemplateJobStartTrigger(
project_id=self.project_id,
job_id=job_id,
- location=self.location if self.location else
DEFAULT_DATAFLOW_LOCATION,
+ location=self.location,
gcp_conn_id=self.gcp_conn_id,
poll_sleep=self.poll_sleep,
impersonation_chain=self.impersonation_chain,
diff --git a/tests/providers/google/cloud/operators/test_dataflow.py
b/tests/providers/google/cloud/operators/test_dataflow.py
index eacd5fde5b..495287b9af 100644
--- a/tests/providers/google/cloud/operators/test_dataflow.py
+++ b/tests/providers/google/cloud/operators/test_dataflow.py
@@ -24,7 +24,10 @@ from unittest import mock
import pytest
import airflow
-from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
+from airflow.providers.google.cloud.hooks.dataflow import (
+ DEFAULT_DATAFLOW_LOCATION,
+ DataflowJobStatus,
+)
from airflow.providers.google.cloud.operators.dataflow import (
CheckJobRunning,
DataflowCreateJavaJobOperator,
@@ -554,7 +557,7 @@ class TestDataflowTemplatedJobStartOperator:
assert dataflow_mock.called
_, kwargs = dataflow_mock.call_args_list[0]
assert kwargs["variables"]["region"] == TEST_REGION
- assert kwargs["location"] is None
+ assert kwargs["location"] == DEFAULT_DATAFLOW_LOCATION
@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.operators.dataflow.DataflowHook.start_template_dataflow")