This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5a3be7256b Handling project location param on async BigQuery dts
trigger (#29786)
5a3be7256b is described below
commit 5a3be7256b2a848524d3635d7907b6829a583101
Author: Changhoon Oh <[email protected]>
AuthorDate: Thu Mar 16 06:51:24 2023 +0900
Handling project location param on async BigQuery dts trigger (#29786)
---
airflow/providers/google/cloud/hooks/bigquery_dts.py | 13 ++++++++++++-
airflow/providers/google/cloud/operators/bigquery_dts.py | 4 ++++
airflow/providers/google/cloud/triggers/bigquery_dts.py | 1 +
3 files changed, 17 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery_dts.py
b/airflow/providers/google/cloud/hooks/bigquery_dts.py
index 5c7ec5bf06..044e89e6b5 100644
--- a/airflow/providers/google/cloud/hooks/bigquery_dts.py
+++ b/airflow/providers/google/cloud/hooks/bigquery_dts.py
@@ -304,11 +304,16 @@ class
AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
sync_hook = await self.get_sync_hook()
return sync_hook.project_id
+ async def _get_project_location(self) -> str:
+ sync_hook = await self.get_sync_hook()
+ return sync_hook.location
+
async def get_transfer_run(
self,
config_id: str,
run_id: str,
project_id: str | None,
+ location: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
@@ -321,6 +326,7 @@ class
AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
:param project_id: The BigQuery project id where the transfer
configuration should be
created. If set to None or missing, the default project_id from
the Google Cloud connection
is used.
+ :param location: BigQuery Transfer Service location for regional
transfers.
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:param timeout: The amount of time, in seconds, to wait for the
request to
@@ -330,8 +336,13 @@ class
AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
:return: An
``google.cloud.bigquery_datatransfer_v1.types.TransferRun`` instance.
"""
project_id = project_id or (await self._get_project_id())
+ location = location or (await self._get_project_location())
+ name = f"projects/{project_id}"
+ if location:
+ name += f"/locations/{location}"
+ name += f"/transferConfigs/{config_id}/runs/{run_id}"
+
client = await self._get_conn()
- name =
f"projects/{project_id}/transferConfigs/{config_id}/runs/{run_id}"
transfer_run = await client.get_transfer_run(
name=name,
retry=retry,
diff --git a/airflow/providers/google/cloud/operators/bigquery_dts.py
b/airflow/providers/google/cloud/operators/bigquery_dts.py
index 9712658a76..d230e6c157 100644
--- a/airflow/providers/google/cloud/operators/bigquery_dts.py
+++ b/airflow/providers/google/cloud/operators/bigquery_dts.py
@@ -306,6 +306,10 @@ class
BigQueryDataTransferServiceStartTransferRunsOperator(GoogleCloudBaseOperat
def execute(self, context: Context):
self.log.info("Submitting manual transfer for %s",
self.transfer_config_id)
+
+ if self.requested_run_time and
isinstance(self.requested_run_time.get("seconds"), str):
+ self.requested_run_time["seconds"] =
int(self.requested_run_time["seconds"])
+
response = self.hook.start_manual_transfer_runs(
transfer_config_id=self.transfer_config_id,
requested_time_range=self.requested_time_range,
diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py
b/airflow/providers/google/cloud/triggers/bigquery_dts.py
index 2b4e57e9d9..a16585aaf4 100644
--- a/airflow/providers/google/cloud/triggers/bigquery_dts.py
+++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py
@@ -101,6 +101,7 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
project_id=self.project_id,
config_id=self.config_id,
run_id=self.run_id,
+ location=self.location,
)
state = transfer_run.state
self.log.info("Current state is %s", state)