dimberman commented on code in PR #29735:
URL: https://github.com/apache/airflow/pull/29735#discussion_r1117167452
##########
airflow/providers/google/cloud/triggers/bigquery.py:
##########
@@ -529,3 +529,67 @@ async def _table_exists(
if err.status == 404:
return False
raise err
+
+
+class BigQueryTablePartitionExistenceTrigger(BigQueryTableExistenceTrigger):
+ """
+ Initialize the BigQuery Table Partition Existence Trigger with needed
parameters
+ :param partition_id: The name of the partition to check the existence of.
+ :param project_id: Google Cloud Project where the job is running
+ :param dataset_id: The dataset ID of the requested table.
+ :param table_id: The table ID of the requested table.
+ :param gcp_conn_id: Reference to google cloud connection id
+ :param hook_params: params for hook
+ :param polling_interval_seconds: polling period in seconds to check for
the status
+ """
+
+ def __init__(self, partition_id: str, **kwargs):
+ super().__init__(**kwargs)
+ self.partition_id = partition_id
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ """Serializes BigQueryTablePartitionExistenceTrigger arguments and
classpath."""
+ return (
+
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTablePartitionExistenceTrigger",
+ {
+ "partition_id": self.partition_id,
+ "dataset_id": self.dataset_id,
+ "project_id": self.project_id,
+ "table_id": self.table_id,
+ "gcp_conn_id": self.gcp_conn_id,
+ "poll_interval": self.poll_interval,
+ "hook_params": self.hook_params,
+ },
+ )
+
+ async def run(self) -> AsyncIterator["TriggerEvent"]: # type:
ignore[override]
+ """Will run until the table exists in the Google Big Query."""
+ hook = BigQueryAsyncHook(gcp_conn_id=self.gcp_conn_id)
+ job_id = None
+ while True:
+ if job_id is not None:
+ status = await hook.get_job_status(job_id=job_id,
project_id=self.project_id)
+ if status == "success":
+ query_results = await hook.get_job_output(job_id=job_id,
project_id=self.project_id)
+ records = hook.get_records(query_results)
+ if records:
+ records = [row[0] for row in records]
+ if self.partition_id in records:
+ yield TriggerEvent(
+ {
+ "status": "success",
+ "message": f"Partition:
{self.partition_id} in table: {self.table_id}",
+ }
+ )
+ return
Review Comment:
Can you break this out into a helper function?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]