dstandish commented on code in PR #30579:
URL: https://github.com/apache/airflow/pull/30579#discussion_r1169162005


##########
airflow/providers/google/cloud/sensors/gcs.py:
##########
@@ -232,6 +234,7 @@ def __init__(
             )
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
+        self.deferrable = (deferrable,)

Review Comment:
   this seems odd?



##########
airflow/providers/google/cloud/triggers/gcs.py:
##########
@@ -97,3 +99,101 @@ async def _object_exists(self, hook: GCSAsyncHook, 
bucket_name: str, object_name
             if object_response:
                 return "success"
             return "pending"
+
+
+class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
+    """
+    A trigger that makes an async call to GCS to check whether the object is 
updated in a bucket.
+
+    :param bucket: google cloud storage bucket name cloud storage where the 
objects are residing.
+    :param object_name: the file or folder present in the bucket
+    :param ts: datetime object
+    :param poke_interval: polling period in seconds to check for file/folder
+    :param google_cloud_conn_id: reference to the Google Connection
+    :param hook_params: dict object that has delegate_to and 
impersonation_chain
+    """
+
+    def __init__(
+        self,
+        bucket: str,
+        object_name: str,
+        ts: datetime,

Review Comment:
   this variable name should make at least _some_ effort convey what it usage / 
meaning / intent is
   
   or, at least not be a 2 letter abbreviation



##########
airflow/providers/google/cloud/triggers/gcs.py:
##########
@@ -97,3 +99,101 @@ async def _object_exists(self, hook: GCSAsyncHook, 
bucket_name: str, object_name
             if object_response:
                 return "success"
             return "pending"
+
+
+class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
+    """
+    A trigger that makes an async call to GCS to check whether the object is 
updated in a bucket.
+
+    :param bucket: google cloud storage bucket name cloud storage where the 
objects are residing.
+    :param object_name: the file or folder present in the bucket
+    :param ts: datetime object

Review Comment:
   missing description of meaning of param



##########
airflow/providers/google/cloud/triggers/gcs.py:
##########
@@ -97,3 +99,101 @@ async def _object_exists(self, hook: GCSAsyncHook, 
bucket_name: str, object_name
             if object_response:
                 return "success"
             return "pending"
+
+
+class GCSCheckBlobUpdateTimeTrigger(BaseTrigger):
+    """
+    A trigger that makes an async call to GCS to check whether the object is 
updated in a bucket.
+
+    :param bucket: google cloud storage bucket name cloud storage where the 
objects are residing.
+    :param object_name: the file or folder present in the bucket
+    :param ts: datetime object
+    :param poke_interval: polling period in seconds to check for file/folder
+    :param google_cloud_conn_id: reference to the Google Connection
+    :param hook_params: dict object that has delegate_to and 
impersonation_chain
+    """
+
+    def __init__(
+        self,
+        bucket: str,
+        object_name: str,
+        ts: datetime,
+        poke_interval: float,
+        google_cloud_conn_id: str,
+        hook_params: dict[str, Any],
+    ):
+        super().__init__()
+        self.bucket = bucket
+        self.object_name = object_name
+        self.ts = ts
+        self.poke_interval = poke_interval
+        self.google_cloud_conn_id: str = google_cloud_conn_id
+        self.hook_params = hook_params
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes GCSCheckBlobUpdateTimeTrigger arguments and classpath."""
+        return (
+            
"airflow.providers.google.cloud.triggers.gcs.GCSCheckBlobUpdateTimeTrigger",
+            {
+                "bucket": self.bucket,
+                "object_name": self.object_name,
+                "ts": self.ts,
+                "poke_interval": self.poke_interval,
+                "google_cloud_conn_id": self.google_cloud_conn_id,
+                "hook_params": self.hook_params,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:
+        """Simple loop until the object updated time is greater than ts 
datetime in bucket."""

Review Comment:
   ```suggestion
           """Loop until the object updated time is greater than ts datetime in 
bucket."""
   ```
   
   we don't nec. need to assert that this loop is simple :) we can let the 
reader be the judge of that 😛 



##########
airflow/providers/google/cloud/sensors/gcs.py:
##########
@@ -242,6 +245,42 @@ def poke(self, context: Context) -> bool:
         )
         return hook.is_updated_after(self.bucket, self.object, 
self.ts_func(context))
 
+    def execute(self, context: Context) -> None:
+        """Airflow runs this method on the worker and defers using the 
trigger."""
+        if not self.deferrable:

Review Comment:
   `is True` ?



##########
airflow/providers/google/cloud/sensors/gcs.py:
##########
@@ -242,6 +245,42 @@ def poke(self, context: Context) -> bool:
         )
         return hook.is_updated_after(self.bucket, self.object, 
self.ts_func(context))
 
+    def execute(self, context: Context) -> None:
+        """Airflow runs this method on the worker and defers using the 
trigger."""
+        if not self.deferrable:
+            super().execute(context)
+        else:
+            self.defer(
+                timeout=timedelta(seconds=self.timeout),
+                trigger=GCSCheckBlobUpdateTimeTrigger(
+                    bucket=self.bucket,
+                    object_name=self.object,
+                    ts=self.ts_func(context),
+                    poke_interval=self.poke_interval,
+                    google_cloud_conn_id=self.google_cloud_conn_id,
+                    hook_params={
+                        "delegate_to": self.delegate_to,
+                        "impersonation_chain": self.impersonation_chain,
+                    },
+                ),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context: dict[str, Any], event: dict[str, str] 
| None = None) -> str:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was
+        successful.
+        """
+        if event:
+            if event["status"] == "success":
+                self.log.info(
+                    "Sensor checks update time for object %s in bucket : %s", 
self.object, self.bucket

Review Comment:
   not sure the meaning of this message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to