sunank200 commented on code in PR #31940:
URL: https://github.com/apache/airflow/pull/31940#discussion_r1245163514


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -326,3 +329,36 @@ def is_keys_unchanged(self, current_objects: set[str]) -> 
bool:
 
     def poke(self, context: Context):
         return 
self.is_keys_unchanged(set(self.hook.list_keys(self.bucket_name, 
prefix=self.prefix)))
+
+    def execute(self, context: Context) -> None:
+        """Airflow runs this method on the worker and defers using the 
trigger."""

Review Comment:
   fixed 
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5



##########
airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -97,3 +98,107 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class S3KeysUnchangedTrigger(BaseTrigger):
+    """
+    S3KeyTrigger is fired as deferred class with params to run the task in 
trigger worker.
+
+    :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key``
+        is not provided as a full s3:// url.
+    :param prefix: The prefix being waited on. Relative path from bucket root 
level.
+    :param inactivity_period: The total seconds of inactivity to designate
+        keys unchanged. Note, this mechanism is not real time and
+        this operator may not return until a poke_interval after this period
+        has passed with no additional objects sensed.
+    :param min_objects: The minimum number of objects needed for keys unchanged
+        sensor to be considered valid.
+    :param inactivity_seconds: reference to the seconds of inactivity
+    :param previous_objects: The set of object ids found during the last poke.
+    :param allow_delete: Should this sensor consider objects being deleted
+    :param aws_conn_id: reference to the s3 connection
+    :param last_activity_time: last modified or last active time
+    :param verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+    :param hook_params: params for hook its optional
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        inactivity_seconds: int = 0,
+        previous_objects: set[str] | None = None,
+        allow_delete: bool = True,
+        aws_conn_id: str = "aws_default",
+        last_activity_time: datetime | None = None,
+        verify: bool | str | None = None,
+        **hook_params: Any,
+    ):
+        super().__init__()
+        self.bucket_name = bucket_name
+        self.prefix = prefix
+        if inactivity_period < 0:
+            raise ValueError("inactivity_period must be non-negative")
+        if previous_objects is None:

Review Comment:
   fixed 
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to