dstandish commented on code in PR #30579:
URL: https://github.com/apache/airflow/pull/30579#discussion_r1169162005
##########
airflow/providers/google/cloud/sensors/gcs.py:
##########
@@ -232,6 +234,7 @@ def __init__(
)
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
+ self.deferrable = (deferrable,)
Review Comment:
this seems odd?
##########
airflow/providers/google/cloud/triggers/gcs.py:
##########
@@ -97,3 +99,101 @@ async def _object_exists(self, hook: GCSAsyncHook,
bucket_name: str, object_name
if object_response:
return "success"
return "pending"
+
+
+class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
+ """
+ A trigger that makes an async call to GCS to check whether the object is
updated in a bucket.
+
+ :param bucket: google cloud storage bucket name cloud storage where the
objects are residing.
+ :param object_name: the file or folder present in the bucket
+ :param ts: datetime object
+ :param poke_interval: polling period in seconds to check for file/folder
+ :param google_cloud_conn_id: reference to the Google Connection
+ :param hook_params: dict object that has delegate_to and
impersonation_chain
+ """
+
+ def __init__(
+ self,
+ bucket: str,
+ object_name: str,
+ ts: datetime,
Review Comment:
this variable name should make at least _some_ effort convey what it usage /
meaning / intent is
or, at least not be a 2 letter abbreviation
##########
airflow/providers/google/cloud/triggers/gcs.py:
##########
@@ -97,3 +99,101 @@ async def _object_exists(self, hook: GCSAsyncHook,
bucket_name: str, object_name
if object_response:
return "success"
return "pending"
+
+
+class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
+ """
+ A trigger that makes an async call to GCS to check whether the object is
updated in a bucket.
+
+ :param bucket: google cloud storage bucket name cloud storage where the
objects are residing.
+ :param object_name: the file or folder present in the bucket
+ :param ts: datetime object
Review Comment:
missing description of meaning of param
##########
airflow/providers/google/cloud/triggers/gcs.py:
##########
@@ -97,3 +99,101 @@ async def _object_exists(self, hook: GCSAsyncHook,
bucket_name: str, object_name
if object_response:
return "success"
return "pending"
+
+
+class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
+ """
+ A trigger that makes an async call to GCS to check whether the object is
updated in a bucket.
+
+ :param bucket: google cloud storage bucket name cloud storage where the
objects are residing.
+ :param object_name: the file or folder present in the bucket
+ :param ts: datetime object
+ :param poke_interval: polling period in seconds to check for file/folder
+ :param google_cloud_conn_id: reference to the Google Connection
+ :param hook_params: dict object that has delegate_to and
impersonation_chain
+ """
+
+ def __init__(
+ self,
+ bucket: str,
+ object_name: str,
+ ts: datetime,
+ poke_interval: float,
+ google_cloud_conn_id: str,
+ hook_params: dict[str, Any],
+ ):
+ super().__init__()
+ self.bucket = bucket
+ self.object_name = object_name
+ self.ts = ts
+ self.poke_interval = poke_interval
+ self.google_cloud_conn_id: str = google_cloud_conn_id
+ self.hook_params = hook_params
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ """Serializes GCSCheckBlobUpdateTimeTrigger arguments and classpath."""
+ return (
+
"airflow.providers.google.cloud.triggers.gcs.GCSCheckBlobUpdateTimeTrigger",
+ {
+ "bucket": self.bucket,
+ "object_name": self.object_name,
+ "ts": self.ts,
+ "poke_interval": self.poke_interval,
+ "google_cloud_conn_id": self.google_cloud_conn_id,
+ "hook_params": self.hook_params,
+ },
+ )
+
+ async def run(self) -> AsyncIterator["TriggerEvent"]:
+ """Simple loop until the object updated time is greater than ts
datetime in bucket."""
Review Comment:
```suggestion
"""Loop until the object updated time is greater than ts datetime in
bucket."""
```
we don't nec. need to assert that this loop is simple :) we can let the
reader be the judge of that 😛
##########
airflow/providers/google/cloud/sensors/gcs.py:
##########
@@ -242,6 +245,42 @@ def poke(self, context: Context) -> bool:
)
return hook.is_updated_after(self.bucket, self.object,
self.ts_func(context))
+ def execute(self, context: Context) -> None:
+ """Airflow runs this method on the worker and defers using the
trigger."""
+ if not self.deferrable:
Review Comment:
`is True` ?
##########
airflow/providers/google/cloud/sensors/gcs.py:
##########
@@ -242,6 +245,42 @@ def poke(self, context: Context) -> bool:
)
return hook.is_updated_after(self.bucket, self.object,
self.ts_func(context))
+ def execute(self, context: Context) -> None:
+ """Airflow runs this method on the worker and defers using the
trigger."""
+ if not self.deferrable:
+ super().execute(context)
+ else:
+ self.defer(
+ timeout=timedelta(seconds=self.timeout),
+ trigger=GCSCheckBlobUpdateTimeTrigger(
+ bucket=self.bucket,
+ object_name=self.object,
+ ts=self.ts_func(context),
+ poke_interval=self.poke_interval,
+ google_cloud_conn_id=self.google_cloud_conn_id,
+ hook_params={
+ "delegate_to": self.delegate_to,
+ "impersonation_chain": self.impersonation_chain,
+ },
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: dict[str, Any], event: dict[str, str]
| None = None) -> str:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event:
+ if event["status"] == "success":
+ self.log.info(
+ "Sensor checks update time for object %s in bucket : %s",
self.object, self.bucket
Review Comment:
not sure the meaning of this message?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]