pankajkoti commented on code in PR #31081:
URL: https://github.com/apache/airflow/pull/31081#discussion_r1198552903
##########
airflow/providers/google/cloud/sensors/gcs.py:
##########
@@ -390,6 +391,7 @@ class GCSUploadSessionCompleteSensor(BaseSensorOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
+ :param deferrable: bool = False
Review Comment:
would be nice to describe this param instead of `bool = False`
##########
airflow/providers/google/cloud/triggers/gcs.py:
##########
@@ -281,3 +282,162 @@ async def _list_blobs_with_prefix(self, hook:
GCSAsyncHook, bucket_name: str, pr
bucket = client.get_bucket(bucket_name)
object_response = await bucket.list_blobs(prefix=prefix)
return object_response
+
+
+class GCSUploadSessionTrigger(GCSPrefixBlobTrigger):
+ """
+ Checks for changes in the number of objects at prefix in Google Cloud
Storage
+ bucket and returns Trigger Event if the inactivity period has passed with
no
+ increase in the number of objects.
+
+ :param bucket: The Google Cloud Storage bucket where the objects are.
+ expected.
+ :param prefix: The name of the prefix to check in the Google cloud
+ storage bucket.
+ :param poke_interval: polling period in seconds to check
+ :param inactivity_period: The total seconds of inactivity to designate
+ an upload session is over. Note, this mechanism is not real time and
+ this operator may not return until a interval after this period
+ has passed with no additional objects sensed.
+ :param min_objects: The minimum number of objects needed for upload session
+ to be considered valid.
+ :param previous_objects: The set of object ids found during the last poke.
+ :param allow_delete: Should this sensor consider objects being deleted
+ between intervals valid behavior. If true a warning message will be
logged
+ when this happens. If false an error will be raised.
+ :param google_cloud_conn_id: The connection ID to use when connecting
+ to Google Cloud Storage.
+ """
+
+ def __init__(
+ self,
+ bucket: str,
+ prefix: str,
+ poke_interval: float,
+ google_cloud_conn_id: str,
+ hook_params: dict[str, Any],
+ inactivity_period: float = 60 * 60,
+ min_objects: int = 1,
+ previous_objects: set[str] | None = None,
+ allow_delete: bool = True,
+ ):
+ super().__init__(
+ bucket=bucket,
+ prefix=prefix,
+ poke_interval=poke_interval,
+ google_cloud_conn_id=google_cloud_conn_id,
+ hook_params=hook_params,
+ )
+ self.inactivity_period = inactivity_period
+ self.min_objects = min_objects
+ self.previous_objects = previous_objects if previous_objects else set()
+ self.inactivity_seconds = 0.0
+ self.allow_delete = allow_delete
+ self.last_activity_time: datetime | None = None
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ """Serializes GCSUploadSessionTrigger arguments and classpath."""
+ return (
+
"airflow.providers.google.cloud.triggers.gcs.GCSUploadSessionTrigger",
+ {
+ "bucket": self.bucket,
+ "prefix": self.prefix,
+ "poke_interval": self.poke_interval,
+ "google_cloud_conn_id": self.google_cloud_conn_id,
+ "hook_params": self.hook_params,
+ "inactivity_period": self.inactivity_period,
+ "min_objects": self.min_objects,
+ "previous_objects": self.previous_objects,
+ "allow_delete": self.allow_delete,
+ },
+ )
+
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ """
+ Simple loop until no change in any new files or deleted in list blob is
+ found for the inactivity_period.
+ """
+ try:
+ hook = self._get_async_hook()
+ while True:
+ list_blobs = await self._list_blobs_with_prefix(
+ hook=hook, bucket_name=self.bucket, prefix=self.prefix
+ )
+ res = self._is_bucket_updated(set(list_blobs))
+ if res["status"] == "success":
+ yield TriggerEvent(res)
+ elif res["status"] == "error":
+ yield TriggerEvent(res)
Review Comment:
```suggestion
if res["status"] in ("success", "error"):
yield TriggerEvent(res)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]