sunank200 commented on code in PR #31940:
URL: https://github.com/apache/airflow/pull/31940#discussion_r1245163060
##########
tests/providers/amazon/aws/triggers/test_s3.py:
##########
@@ -75,3 +75,80 @@ async def test_run_pending(self, mock_client,
mock_check_key_async):
assert task.done() is False
asyncio.get_event_loop().stop()
+
+
+class TestS3KeysUnchangedTrigger:
+ def test_serialization(self):
+ """
+ Asserts that the TaskStateTrigger correctly serializes its arguments
+ and classpath.
+ """
+ trigger = S3KeysUnchangedTrigger(
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ inactivity_seconds=0,
+ previous_objects=None,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert classpath ==
"airflow.providers.amazon.aws.triggers.s3.S3KeysUnchangedTrigger"
+ assert kwargs == {
+ "bucket_name": "test_bucket",
+ "prefix": "test",
+ "inactivity_period": 1,
+ "min_objects": 1,
+ "inactivity_seconds": 0,
+ "previous_objects": set(),
+ "allow_delete": 1,
+ "aws_conn_id": "aws_default",
+ "last_activity_time": None,
+ "hook_params": {},
+ }
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+ async def test_run_wait(self, mock_client):
+ """Test if the task is run is in trigger successfully."""
+ mock_client.return_value.check_key.return_value = True
+ trigger = S3KeysUnchangedTrigger(bucket_name="test_bucket",
prefix="test")
+ with mock_client:
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+
+ assert task.done() is True
+ asyncio.get_event_loop().stop()
+
+ def test_run_raise_value_error(self):
+ """
+ Test if the S3KeysUnchangedTrigger raises Value error for negative
inactivity_period.
+ """
+ with pytest.raises(ValueError):
+ S3KeysUnchangedTrigger(bucket_name="test_bucket", prefix="test",
inactivity_period=-100)
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.is_keys_unchanged_async")
+ async def test_run_success(self, mock_is_keys_unchanged, mock_client):
+ """
+ Test if the task is run is in triggerer successfully.
Review Comment:
fixed
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5
##########
tests/providers/amazon/aws/triggers/test_s3.py:
##########
@@ -75,3 +75,80 @@ async def test_run_pending(self, mock_client,
mock_check_key_async):
assert task.done() is False
asyncio.get_event_loop().stop()
+
+
+class TestS3KeysUnchangedTrigger:
+ def test_serialization(self):
+ """
+ Asserts that the TaskStateTrigger correctly serializes its arguments
+ and classpath.
+ """
+ trigger = S3KeysUnchangedTrigger(
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ inactivity_seconds=0,
+ previous_objects=None,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert classpath ==
"airflow.providers.amazon.aws.triggers.s3.S3KeysUnchangedTrigger"
+ assert kwargs == {
+ "bucket_name": "test_bucket",
+ "prefix": "test",
+ "inactivity_period": 1,
+ "min_objects": 1,
+ "inactivity_seconds": 0,
+ "previous_objects": set(),
+ "allow_delete": 1,
+ "aws_conn_id": "aws_default",
+ "last_activity_time": None,
+ "hook_params": {},
+ }
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+ async def test_run_wait(self, mock_client):
+ """Test if the task is run is in trigger successfully."""
+ mock_client.return_value.check_key.return_value = True
+ trigger = S3KeysUnchangedTrigger(bucket_name="test_bucket",
prefix="test")
+ with mock_client:
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+
+ assert task.done() is True
+ asyncio.get_event_loop().stop()
+
+ def test_run_raise_value_error(self):
+ """
+ Test if the S3KeysUnchangedTrigger raises Value error for negative
inactivity_period.
+ """
+ with pytest.raises(ValueError):
+ S3KeysUnchangedTrigger(bucket_name="test_bucket", prefix="test",
inactivity_period=-100)
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.is_keys_unchanged_async")
+ async def test_run_success(self, mock_is_keys_unchanged, mock_client):
+ """
+ Test if the task is run is in triggerer successfully.
+ """
+ mock_is_keys_unchanged.return_value = {"status": "success"}
+ trigger = S3KeysUnchangedTrigger(bucket_name="test_bucket",
prefix="test")
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "success"}) == actual
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.is_keys_unchanged_async")
+ async def test_run_pending(self, mock_is_keys_unchanged, mock_client):
+ """Test if the task is run is in triggerer successfully."""
Review Comment:
fixed
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5
##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
+ async def is_keys_unchanged_async(
+ self,
+ client: AioBaseClient,
+ bucket_name: str,
+ prefix: str,
+ inactivity_period: float = 60 * 60,
+ min_objects: int = 1,
+ previous_objects: set[str] | None = None,
+ inactivity_seconds: int = 0,
+ allow_delete: bool = True,
+ last_activity_time: datetime | None = None,
+ ) -> dict[str, Any]:
+ """
+ Checks whether new objects have been uploaded and the inactivity_period
+ has passed and updates the state of the sensor accordingly.
+
+ :param client: aiobotocore client
+ :param bucket_name: the name of the bucket
+ :param prefix: a key prefix
+ :param inactivity_period: the total seconds of inactivity to designate
+ keys unchanged. Note, this mechanism is not real time and
+ this operator may not return until a poke_interval after this
period
+ has passed with no additional objects sensed.
+ :param min_objects: the minimum number of objects needed for keys
unchanged
+ sensor to be considered valid.
+ :param previous_objects: the set of object ids found during the last
poke.
+ :param inactivity_seconds: number of inactive seconds
+ :param last_activity_time: last activity datetime
+ :param allow_delete: Should this sensor consider objects being deleted
+ between pokes valid behavior. If true a warning message will be
logged
+ when this happens. If false an error will be raised.
+ :return: dictionary with status and message
+ """
+ if previous_objects is None:
Review Comment:
fixed
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]