This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a3be7256b Handling project location param on async BigQuery dts 
trigger (#29786)
5a3be7256b is described below

commit 5a3be7256b2a848524d3635d7907b6829a583101
Author: Changhoon Oh <[email protected]>
AuthorDate: Thu Mar 16 06:51:24 2023 +0900

    Handling project location param on async BigQuery dts trigger (#29786)
---
 airflow/providers/google/cloud/hooks/bigquery_dts.py     | 13 ++++++++++++-
 airflow/providers/google/cloud/operators/bigquery_dts.py |  4 ++++
 airflow/providers/google/cloud/triggers/bigquery_dts.py  |  1 +
 3 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/hooks/bigquery_dts.py 
b/airflow/providers/google/cloud/hooks/bigquery_dts.py
index 5c7ec5bf06..044e89e6b5 100644
--- a/airflow/providers/google/cloud/hooks/bigquery_dts.py
+++ b/airflow/providers/google/cloud/hooks/bigquery_dts.py
@@ -304,11 +304,16 @@ class 
AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
         sync_hook = await self.get_sync_hook()
         return sync_hook.project_id
 
+    async def _get_project_location(self) -> str:
+        sync_hook = await self.get_sync_hook()
+        return sync_hook.location
+
     async def get_transfer_run(
         self,
         config_id: str,
         run_id: str,
         project_id: str | None,
+        location: str | None = None,
         retry: Retry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
@@ -321,6 +326,7 @@ class 
AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
         :param project_id: The BigQuery project id where the transfer 
configuration should be
             created. If set to None or missing, the default project_id from 
the Google Cloud connection
             is used.
+        :param location: BigQuery Transfer Service location for regional 
transfers.
         :param retry: A retry object used to retry requests. If `None` is
             specified, requests will not be retried.
         :param timeout: The amount of time, in seconds, to wait for the 
request to
@@ -330,8 +336,13 @@ class 
AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
         :return: An 
``google.cloud.bigquery_datatransfer_v1.types.TransferRun`` instance.
         """
         project_id = project_id or (await self._get_project_id())
+        location = location or (await self._get_project_location())
+        name = f"projects/{project_id}"
+        if location:
+            name += f"/locations/{location}"
+        name += f"/transferConfigs/{config_id}/runs/{run_id}"
+
         client = await self._get_conn()
-        name = 
f"projects/{project_id}/transferConfigs/{config_id}/runs/{run_id}"
         transfer_run = await client.get_transfer_run(
             name=name,
             retry=retry,
diff --git a/airflow/providers/google/cloud/operators/bigquery_dts.py 
b/airflow/providers/google/cloud/operators/bigquery_dts.py
index 9712658a76..d230e6c157 100644
--- a/airflow/providers/google/cloud/operators/bigquery_dts.py
+++ b/airflow/providers/google/cloud/operators/bigquery_dts.py
@@ -306,6 +306,10 @@ class 
BigQueryDataTransferServiceStartTransferRunsOperator(GoogleCloudBaseOperat
 
     def execute(self, context: Context):
         self.log.info("Submitting manual transfer for %s", 
self.transfer_config_id)
+
+        if self.requested_run_time and 
isinstance(self.requested_run_time.get("seconds"), str):
+            self.requested_run_time["seconds"] = 
int(self.requested_run_time["seconds"])
+
         response = self.hook.start_manual_transfer_runs(
             transfer_config_id=self.transfer_config_id,
             requested_time_range=self.requested_time_range,
diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py 
b/airflow/providers/google/cloud/triggers/bigquery_dts.py
index 2b4e57e9d9..a16585aaf4 100644
--- a/airflow/providers/google/cloud/triggers/bigquery_dts.py
+++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py
@@ -101,6 +101,7 @@ class BigQueryDataTransferRunTrigger(BaseTrigger):
                     project_id=self.project_id,
                     config_id=self.config_id,
                     run_id=self.run_id,
+                    location=self.location,
                 )
                 state = transfer_run.state
                 self.log.info("Current state is %s", state)

Reply via email to