Lee-W commented on code in PR #31940:
URL: https://github.com/apache/airflow/pull/31940#discussion_r1241918268


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -326,3 +329,36 @@ def is_keys_unchanged(self, current_objects: set[str]) -> 
bool:
 
     def poke(self, context: Context):
         return 
self.is_keys_unchanged(set(self.hook.list_keys(self.bucket_name, 
prefix=self.prefix)))
+
+    def execute(self, context: Context) -> None:
+        """Airflow runs this method on the worker and defers using the 
trigger."""

Review Comment:
   I'm not sure, but it looks like we only run this method in deferred mode to 
me



##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
 
         return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
 
+    async def is_keys_unchanged_async(
+        self,
+        client: AioBaseClient,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        previous_objects: set[str] | None = None,
+        inactivity_seconds: int = 0,
+        allow_delete: bool = True,
+        last_activity_time: datetime | None = None,
+    ) -> dict[str, Any]:
+        """
+        Checks whether new objects have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param client: aiobotocore client
+        :param bucket_name: the name of the bucket
+        :param prefix: a key prefix
+        :param inactivity_period:  the total seconds of inactivity to designate
+            keys unchanged. Note, this mechanism is not real time and
+            this operator may not return until a poke_interval after this 
period
+            has passed with no additional objects sensed.
+        :param min_objects: the minimum number of objects needed for keys 
unchanged
+            sensor to be considered valid.
+        :param previous_objects: the set of object ids found during the last 
poke.
+        :param inactivity_seconds: number of inactive seconds
+        :param last_activity_time: last activity datetime
+        :param allow_delete: Should this sensor consider objects being deleted
+            between pokes valid behavior. If true a warning message will be 
logged
+            when this happens. If false an error will be raised.
+        :return: dictionary with status and message
+        """
+        if previous_objects is None:
+            previous_objects = set()
+        list_keys = await self._list_keys_async(client=client, 
bucket_name=bucket_name, prefix=prefix)
+        current_objects = set(list_keys)
+        current_num_objects = len(current_objects)
+        if current_num_objects > len(previous_objects):
+            # When new objects arrived, reset the inactivity_seconds
+            # and update previous_objects for the next poke.
+            self.log.info(
+                "New objects found at %s, resetting last_activity_time.",
+                os.path.join(bucket_name, prefix),
+            )
+            self.log.debug("New objects: %s", current_objects - 
previous_objects)
+            last_activity_time = datetime.now()
+            inactivity_seconds = 0
+            previous_objects = current_objects
+            return {
+                "status": "pending",
+                "previous_objects": previous_objects,
+                "last_activity_time": last_activity_time,
+                "inactivity_seconds": inactivity_seconds,
+            }
+
+        if len(previous_objects) - len(current_objects):
+            # During the last poke interval objects were deleted.
+            if allow_delete:
+                deleted_objects = previous_objects - current_objects
+                previous_objects = current_objects
+                last_activity_time = datetime.now()
+                self.log.info(
+                    "Objects were deleted during the last poke interval. 
Updating the "
+                    "file counter and resetting last_activity_time:\n%s",
+                    deleted_objects,
+                )
+                return {
+                    "status": "pending",
+                    "previous_objects": previous_objects,
+                    "last_activity_time": last_activity_time,
+                    "inactivity_seconds": inactivity_seconds,
+                }
+
+            return {
+                "status": "error",
+                "message": f" {os.path.join(bucket_name, prefix)} between 
pokes.",

Review Comment:
   Maybe "Some files deleted in {os.path.join(bucket_name, prefix)} between 
pokes."?



##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
 
         return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
 
+    async def is_keys_unchanged_async(
+        self,
+        client: AioBaseClient,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        previous_objects: set[str] | None = None,
+        inactivity_seconds: int = 0,
+        allow_delete: bool = True,
+        last_activity_time: datetime | None = None,
+    ) -> dict[str, Any]:
+        """
+        Checks whether new objects have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param client: aiobotocore client
+        :param bucket_name: the name of the bucket
+        :param prefix: a key prefix
+        :param inactivity_period:  the total seconds of inactivity to designate
+            keys unchanged. Note, this mechanism is not real time and
+            this operator may not return until a poke_interval after this 
period
+            has passed with no additional objects sensed.
+        :param min_objects: the minimum number of objects needed for keys 
unchanged
+            sensor to be considered valid.
+        :param previous_objects: the set of object ids found during the last 
poke.
+        :param inactivity_seconds: number of inactive seconds
+        :param last_activity_time: last activity datetime
+        :param allow_delete: Should this sensor consider objects being deleted
+            between pokes valid behavior. If true a warning message will be 
logged
+            when this happens. If false an error will be raised.
+        :return: dictionary with status and message
+        """
+        if previous_objects is None:

Review Comment:
   nitpick
   
   ```suggestion
           if not previous_objects:
   ```



##########
airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -97,3 +98,107 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class S3KeysUnchangedTrigger(BaseTrigger):
+    """
+    S3KeyTrigger is fired as deferred class with params to run the task in 
trigger worker.
+
+    :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key``
+        is not provided as a full s3:// url.
+    :param prefix: The prefix being waited on. Relative path from bucket root 
level.
+    :param inactivity_period: The total seconds of inactivity to designate
+        keys unchanged. Note, this mechanism is not real time and
+        this operator may not return until a poke_interval after this period
+        has passed with no additional objects sensed.
+    :param min_objects: The minimum number of objects needed for keys unchanged
+        sensor to be considered valid.
+    :param inactivity_seconds: reference to the seconds of inactivity
+    :param previous_objects: The set of object ids found during the last poke.
+    :param allow_delete: Should this sensor consider objects being deleted
+    :param aws_conn_id: reference to the s3 connection
+    :param last_activity_time: last modified or last active time
+    :param verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+    :param hook_params: params for hook its optional
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        inactivity_seconds: int = 0,
+        previous_objects: set[str] | None = None,
+        allow_delete: bool = True,
+        aws_conn_id: str = "aws_default",
+        last_activity_time: datetime | None = None,
+        verify: bool | str | None = None,
+        **hook_params: Any,
+    ):
+        super().__init__()
+        self.bucket_name = bucket_name
+        self.prefix = prefix
+        if inactivity_period < 0:
+            raise ValueError("inactivity_period must be non-negative")
+        if previous_objects is None:

Review Comment:
   nitpick
   
   ```suggestion
           if not previous_objects:
   ```



##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
 
         return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
 
+    async def is_keys_unchanged_async(
+        self,
+        client: AioBaseClient,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        previous_objects: set[str] | None = None,
+        inactivity_seconds: int = 0,
+        allow_delete: bool = True,
+        last_activity_time: datetime | None = None,
+    ) -> dict[str, Any]:
+        """
+        Checks whether new objects have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param client: aiobotocore client
+        :param bucket_name: the name of the bucket
+        :param prefix: a key prefix
+        :param inactivity_period:  the total seconds of inactivity to designate
+            keys unchanged. Note, this mechanism is not real time and
+            this operator may not return until a poke_interval after this 
period
+            has passed with no additional objects sensed.
+        :param min_objects: the minimum number of objects needed for keys 
unchanged
+            sensor to be considered valid.
+        :param previous_objects: the set of object ids found during the last 
poke.
+        :param inactivity_seconds: number of inactive seconds
+        :param last_activity_time: last activity datetime
+        :param allow_delete: Should this sensor consider objects being deleted
+            between pokes valid behavior. If true a warning message will be 
logged
+            when this happens. If false an error will be raised.
+        :return: dictionary with status and message
+        """
+        if previous_objects is None:
+            previous_objects = set()
+        list_keys = await self._list_keys_async(client=client, 
bucket_name=bucket_name, prefix=prefix)
+        current_objects = set(list_keys)
+        current_num_objects = len(current_objects)
+        if current_num_objects > len(previous_objects):
+            # When new objects arrived, reset the inactivity_seconds
+            # and update previous_objects for the next poke.
+            self.log.info(
+                "New objects found at %s, resetting last_activity_time.",
+                os.path.join(bucket_name, prefix),
+            )
+            self.log.debug("New objects: %s", current_objects - 
previous_objects)
+            last_activity_time = datetime.now()
+            inactivity_seconds = 0
+            previous_objects = current_objects
+            return {
+                "status": "pending",
+                "previous_objects": previous_objects,
+                "last_activity_time": last_activity_time,
+                "inactivity_seconds": inactivity_seconds,
+            }
+
+        if len(previous_objects) - len(current_objects):
+            # During the last poke interval objects were deleted.
+            if allow_delete:
+                deleted_objects = previous_objects - current_objects
+                previous_objects = current_objects
+                last_activity_time = datetime.now()
+                self.log.info(
+                    "Objects were deleted during the last poke interval. 
Updating the "
+                    "file counter and resetting last_activity_time:\n%s",
+                    deleted_objects,
+                )
+                return {
+                    "status": "pending",
+                    "previous_objects": previous_objects,
+                    "last_activity_time": last_activity_time,
+                    "inactivity_seconds": inactivity_seconds,
+                }
+
+            return {
+                "status": "error",
+                "message": f" {os.path.join(bucket_name, prefix)} between 
pokes.",

Review Comment:
   nitpick
   
   ```suggestion
                   "message": f"{os.path.join(bucket_name, prefix)} between 
pokes.",
   ```
   
   also is there any description missing?



##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
 
         return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
 
+    async def is_keys_unchanged_async(
+        self,
+        client: AioBaseClient,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        previous_objects: set[str] | None = None,
+        inactivity_seconds: int = 0,
+        allow_delete: bool = True,
+        last_activity_time: datetime | None = None,
+    ) -> dict[str, Any]:
+        """
+        Checks whether new objects have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param client: aiobotocore client
+        :param bucket_name: the name of the bucket
+        :param prefix: a key prefix
+        :param inactivity_period:  the total seconds of inactivity to designate
+            keys unchanged. Note, this mechanism is not real time and
+            this operator may not return until a poke_interval after this 
period
+            has passed with no additional objects sensed.
+        :param min_objects: the minimum number of objects needed for keys 
unchanged
+            sensor to be considered valid.
+        :param previous_objects: the set of object ids found during the last 
poke.
+        :param inactivity_seconds: number of inactive seconds
+        :param last_activity_time: last activity datetime
+        :param allow_delete: Should this sensor consider objects being deleted
+            between pokes valid behavior. If true a warning message will be 
logged
+            when this happens. If false an error will be raised.
+        :return: dictionary with status and message
+        """
+        if previous_objects is None:
+            previous_objects = set()
+        list_keys = await self._list_keys_async(client=client, 
bucket_name=bucket_name, prefix=prefix)
+        current_objects = set(list_keys)
+        current_num_objects = len(current_objects)
+        if current_num_objects > len(previous_objects):
+            # When new objects arrived, reset the inactivity_seconds
+            # and update previous_objects for the next poke.
+            self.log.info(
+                "New objects found at %s, resetting last_activity_time.",
+                os.path.join(bucket_name, prefix),
+            )
+            self.log.debug("New objects: %s", current_objects - 
previous_objects)
+            last_activity_time = datetime.now()
+            inactivity_seconds = 0
+            previous_objects = current_objects
+            return {
+                "status": "pending",
+                "previous_objects": previous_objects,
+                "last_activity_time": last_activity_time,
+                "inactivity_seconds": inactivity_seconds,
+            }
+
+        if len(previous_objects) - len(current_objects):
+            # During the last poke interval objects were deleted.
+            if allow_delete:
+                deleted_objects = previous_objects - current_objects
+                previous_objects = current_objects
+                last_activity_time = datetime.now()
+                self.log.info(
+                    "Objects were deleted during the last poke interval. 
Updating the "
+                    "file counter and resetting last_activity_time:\n%s",
+                    deleted_objects,
+                )
+                return {
+                    "status": "pending",
+                    "previous_objects": previous_objects,
+                    "last_activity_time": last_activity_time,
+                    "inactivity_seconds": inactivity_seconds,
+                }
+
+            return {
+                "status": "error",
+                "message": f" {os.path.join(bucket_name, prefix)} between 
pokes.",
+            }
+
+        if last_activity_time:
+            inactivity_seconds = int((datetime.now() - 
last_activity_time).total_seconds())
+        else:
+            # Handles the first poke where last inactivity time is None.
+            last_activity_time = datetime.now()
+            inactivity_seconds = 0
+
+        if inactivity_seconds >= inactivity_period:
+            path = os.path.join(bucket_name, prefix)
+
+            if current_num_objects >= min_objects:
+                success_message = (
+                    "SUCCESS: Sensor found %s objects at %s. "
+                    "Waited at least %s seconds, with no new objects uploaded."
+                )
+                self.log.info(success_message, current_num_objects, path, 
inactivity_period)
+                return {
+                    "status": "success",
+                    "message": success_message % (current_num_objects, path, 
inactivity_period),
+                }

Review Comment:
   As these variables aren't changed, it might make sense for us to use 
f-string and render it all at once
   
   ```suggestion
                   success_message = (
                       f"SUCCESS: Sensor found {current_num_objects} objects at 
{path}. "
                       "Waited at least {inactivity_period} seconds, with no 
new objects uploaded."
                   )
                   self.log.info(success_message)
                   return {
                       "status": "success",
                       "message": success_message,
                   }
   ```



##########
airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -97,3 +98,107 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class S3KeysUnchangedTrigger(BaseTrigger):
+    """
+    S3KeyTrigger is fired as deferred class with params to run the task in 
trigger worker.
+
+    :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key``
+        is not provided as a full s3:// url.
+    :param prefix: The prefix being waited on. Relative path from bucket root 
level.
+    :param inactivity_period: The total seconds of inactivity to designate
+        keys unchanged. Note, this mechanism is not real time and
+        this operator may not return until a poke_interval after this period
+        has passed with no additional objects sensed.
+    :param min_objects: The minimum number of objects needed for keys unchanged
+        sensor to be considered valid.
+    :param inactivity_seconds: reference to the seconds of inactivity
+    :param previous_objects: The set of object ids found during the last poke.
+    :param allow_delete: Should this sensor consider objects being deleted
+    :param aws_conn_id: reference to the s3 connection
+    :param last_activity_time: last modified or last active time
+    :param verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+    :param hook_params: params for hook its optional
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        inactivity_seconds: int = 0,
+        previous_objects: set[str] | None = None,
+        allow_delete: bool = True,
+        aws_conn_id: str = "aws_default",
+        last_activity_time: datetime | None = None,
+        verify: bool | str | None = None,
+        **hook_params: Any,
+    ):
+        super().__init__()
+        self.bucket_name = bucket_name
+        self.prefix = prefix
+        if inactivity_period < 0:
+            raise ValueError("inactivity_period must be non-negative")
+        if previous_objects is None:
+            previous_objects = set()
+        self.inactivity_period = inactivity_period
+        self.min_objects = min_objects
+        self.previous_objects = previous_objects
+        self.inactivity_seconds = inactivity_seconds
+        self.allow_delete = allow_delete
+        self.aws_conn_id = aws_conn_id
+        self.last_activity_time: datetime | None = last_activity_time

Review Comment:
   Why do we annotate the type again here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to