sunank200 commented on code in PR #31940:
URL: https://github.com/apache/airflow/pull/31940#discussion_r1245161690
##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
+ async def is_keys_unchanged_async(
+ self,
+ client: AioBaseClient,
+ bucket_name: str,
+ prefix: str,
+ inactivity_period: float = 60 * 60,
+ min_objects: int = 1,
+ previous_objects: set[str] | None = None,
+ inactivity_seconds: int = 0,
+ allow_delete: bool = True,
+ last_activity_time: datetime | None = None,
+ ) -> dict[str, Any]:
+ """
+ Checks whether new objects have been uploaded and the inactivity_period
+ has passed and updates the state of the sensor accordingly.
+
+ :param client: aiobotocore client
+ :param bucket_name: the name of the bucket
+ :param prefix: a key prefix
+ :param inactivity_period: the total seconds of inactivity to designate
+ keys unchanged. Note, this mechanism is not real time and
+ this operator may not return until a poke_interval after this
period
+ has passed with no additional objects sensed.
+ :param min_objects: the minimum number of objects needed for keys
unchanged
+ sensor to be considered valid.
+ :param previous_objects: the set of object ids found during the last
poke.
+ :param inactivity_seconds: number of inactive seconds
+ :param last_activity_time: last activity datetime
+ :param allow_delete: Should this sensor consider objects being deleted
+ between pokes valid behavior. If true a warning message will be
logged
+ when this happens. If false an error will be raised.
+ :return: dictionary with status and message
Review Comment:
fixed
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5
##########
tests/providers/amazon/aws/hooks/test_s3.py:
##########
@@ -680,6 +680,131 @@ async def test_s3__check_key_with_wild_card_async(
)
assert response is False
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_false_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives False response
+ :return:
Review Comment:
fixed
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5
##########
airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -97,3 +98,107 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class S3KeysUnchangedTrigger(BaseTrigger):
+ """
+ S3KeyTrigger is fired as deferred class with params to run the task in
trigger worker.
Review Comment:
fixed
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]