phanikumv commented on code in PR #31940:
URL: https://github.com/apache/airflow/pull/31940#discussion_r1241932479
##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
+ async def is_keys_unchanged_async(
+ self,
+ client: AioBaseClient,
+ bucket_name: str,
+ prefix: str,
+ inactivity_period: float = 60 * 60,
+ min_objects: int = 1,
+ previous_objects: set[str] | None = None,
+ inactivity_seconds: int = 0,
+ allow_delete: bool = True,
+ last_activity_time: datetime | None = None,
+ ) -> dict[str, Any]:
+ """
+ Checks whether new objects have been uploaded and the inactivity_period
+ has passed and updates the state of the sensor accordingly.
+
+ :param client: aiobotocore client
+ :param bucket_name: the name of the bucket
+ :param prefix: a key prefix
+ :param inactivity_period: the total seconds of inactivity to designate
+ keys unchanged. Note, this mechanism is not real time and
+ this operator may not return until a poke_interval after this
period
+ has passed with no additional objects sensed.
+ :param min_objects: the minimum number of objects needed for keys
unchanged
+ sensor to be considered valid.
+ :param previous_objects: the set of object ids found during the last
poke.
+ :param inactivity_seconds: number of inactive seconds
+ :param last_activity_time: last activity datetime
+ :param allow_delete: Should this sensor consider objects being deleted
+ between pokes valid behavior. If true a warning message will be
logged
+ when this happens. If false an error will be raised.
+ :return: dictionary with status and message
Review Comment:
```suggestion
```
##########
tests/providers/amazon/aws/hooks/test_s3.py:
##########
@@ -680,6 +680,131 @@ async def test_s3__check_key_with_wild_card_async(
)
assert response is False
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_false_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives False response
Review Comment:
docstring needs to be more elaborate
##########
tests/providers/amazon/aws/hooks/test_s3.py:
##########
@@ -680,6 +680,131 @@ async def test_s3__check_key_with_wild_card_async(
)
assert response is False
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_false_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives False response
+ :return:
Review Comment:
```suggestion
```
##########
tests/providers/amazon/aws/hooks/test_s3.py:
##########
@@ -680,6 +680,131 @@ async def test_s3__check_key_with_wild_card_async(
)
assert response is False
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_false_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives False response
+ :return:
+ """
+
+ mock_list_keys.return_value = ["test"]
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ # test for the case when current_objects < previous_objects
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_exception_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
Review Comment:
```suggestion
```
##########
tests/providers/amazon/aws/hooks/test_s3.py:
##########
@@ -680,6 +680,131 @@ async def test_s3__check_key_with_wild_card_async(
)
assert response is False
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_false_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives False response
+ :return:
+ """
+
+ mock_list_keys.return_value = ["test"]
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ # test for the case when current_objects < previous_objects
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_exception_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
+ """
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=False,
+ last_activity_time=None,
+ )
+
+ assert response == {"message": " test_bucket/test between pokes.",
"status": "error"}
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_pending_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
Review Comment:
```suggestion
```
##########
airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -97,3 +98,107 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class S3KeysUnchangedTrigger(BaseTrigger):
+ """
+ S3KeyTrigger is fired as deferred class with params to run the task in
trigger worker.
Review Comment:
```suggestion
S3KeysUnchangedTrigger is fired as deferred class with params to run the
task in trigger worker.
```
##########
tests/providers/amazon/aws/hooks/test_s3.py:
##########
@@ -680,6 +680,131 @@ async def test_s3__check_key_with_wild_card_async(
)
assert response is False
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_false_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives False response
+ :return:
+ """
+
+ mock_list_keys.return_value = ["test"]
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ # test for the case when current_objects < previous_objects
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_exception_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
+ """
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=False,
+ last_activity_time=None,
+ )
+
+ assert response == {"message": " test_bucket/test between pokes.",
"status": "error"}
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_pending_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
+ """
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=0,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=False,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_inactivity_error_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
Review Comment:
```suggestion
```
##########
tests/providers/amazon/aws/triggers/test_s3.py:
##########
@@ -75,3 +75,80 @@ async def test_run_pending(self, mock_client,
mock_check_key_async):
assert task.done() is False
asyncio.get_event_loop().stop()
+
+
+class TestS3KeysUnchangedTrigger:
+ def test_serialization(self):
+ """
+ Asserts that the TaskStateTrigger correctly serializes its arguments
Review Comment:
```suggestion
Asserts that the S3KeysUnchangedTrigger correctly serializes its
arguments
```
##########
tests/providers/amazon/aws/triggers/test_s3.py:
##########
@@ -75,3 +75,80 @@ async def test_run_pending(self, mock_client,
mock_check_key_async):
assert task.done() is False
asyncio.get_event_loop().stop()
+
+
+class TestS3KeysUnchangedTrigger:
+ def test_serialization(self):
+ """
+ Asserts that the TaskStateTrigger correctly serializes its arguments
+ and classpath.
+ """
+ trigger = S3KeysUnchangedTrigger(
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ inactivity_seconds=0,
+ previous_objects=None,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert classpath ==
"airflow.providers.amazon.aws.triggers.s3.S3KeysUnchangedTrigger"
+ assert kwargs == {
+ "bucket_name": "test_bucket",
+ "prefix": "test",
+ "inactivity_period": 1,
+ "min_objects": 1,
+ "inactivity_seconds": 0,
+ "previous_objects": set(),
+ "allow_delete": 1,
+ "aws_conn_id": "aws_default",
+ "last_activity_time": None,
+ "hook_params": {},
+ }
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+ async def test_run_wait(self, mock_client):
+ """Test if the task is run is in trigger successfully."""
+ mock_client.return_value.check_key.return_value = True
+ trigger = S3KeysUnchangedTrigger(bucket_name="test_bucket",
prefix="test")
+ with mock_client:
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+
+ assert task.done() is True
+ asyncio.get_event_loop().stop()
+
+ def test_run_raise_value_error(self):
+ """
+ Test if the S3KeysUnchangedTrigger raises Value error for negative
inactivity_period.
+ """
+ with pytest.raises(ValueError):
+ S3KeysUnchangedTrigger(bucket_name="test_bucket", prefix="test",
inactivity_period=-100)
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.is_keys_unchanged_async")
+ async def test_run_success(self, mock_is_keys_unchanged, mock_client):
+ """
+ Test if the task is run is in triggerer successfully.
Review Comment:
```suggestion
Test if the task is run in triggerer successfully.
```
##########
tests/providers/amazon/aws/triggers/test_s3.py:
##########
@@ -75,3 +75,80 @@ async def test_run_pending(self, mock_client,
mock_check_key_async):
assert task.done() is False
asyncio.get_event_loop().stop()
+
+
+class TestS3KeysUnchangedTrigger:
+ def test_serialization(self):
+ """
+ Asserts that the TaskStateTrigger correctly serializes its arguments
+ and classpath.
+ """
+ trigger = S3KeysUnchangedTrigger(
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ inactivity_seconds=0,
+ previous_objects=None,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert classpath ==
"airflow.providers.amazon.aws.triggers.s3.S3KeysUnchangedTrigger"
+ assert kwargs == {
+ "bucket_name": "test_bucket",
+ "prefix": "test",
+ "inactivity_period": 1,
+ "min_objects": 1,
+ "inactivity_seconds": 0,
+ "previous_objects": set(),
+ "allow_delete": 1,
+ "aws_conn_id": "aws_default",
+ "last_activity_time": None,
+ "hook_params": {},
+ }
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+ async def test_run_wait(self, mock_client):
+ """Test if the task is run is in trigger successfully."""
Review Comment:
```suggestion
"""Test if the task is run in trigger successfully."""
```
##########
tests/providers/amazon/aws/triggers/test_s3.py:
##########
@@ -75,3 +75,80 @@ async def test_run_pending(self, mock_client,
mock_check_key_async):
assert task.done() is False
asyncio.get_event_loop().stop()
+
+
+class TestS3KeysUnchangedTrigger:
+ def test_serialization(self):
+ """
+ Asserts that the TaskStateTrigger correctly serializes its arguments
+ and classpath.
+ """
+ trigger = S3KeysUnchangedTrigger(
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ inactivity_seconds=0,
+ previous_objects=None,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert classpath ==
"airflow.providers.amazon.aws.triggers.s3.S3KeysUnchangedTrigger"
+ assert kwargs == {
+ "bucket_name": "test_bucket",
+ "prefix": "test",
+ "inactivity_period": 1,
+ "min_objects": 1,
+ "inactivity_seconds": 0,
+ "previous_objects": set(),
+ "allow_delete": 1,
+ "aws_conn_id": "aws_default",
+ "last_activity_time": None,
+ "hook_params": {},
+ }
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+ async def test_run_wait(self, mock_client):
+ """Test if the task is run is in trigger successfully."""
+ mock_client.return_value.check_key.return_value = True
+ trigger = S3KeysUnchangedTrigger(bucket_name="test_bucket",
prefix="test")
+ with mock_client:
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+
+ assert task.done() is True
+ asyncio.get_event_loop().stop()
+
+ def test_run_raise_value_error(self):
+ """
+ Test if the S3KeysUnchangedTrigger raises Value error for negative
inactivity_period.
+ """
+ with pytest.raises(ValueError):
+ S3KeysUnchangedTrigger(bucket_name="test_bucket", prefix="test",
inactivity_period=-100)
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.is_keys_unchanged_async")
+ async def test_run_success(self, mock_is_keys_unchanged, mock_client):
+ """
+ Test if the task is run is in triggerer successfully.
+ """
+ mock_is_keys_unchanged.return_value = {"status": "success"}
+ trigger = S3KeysUnchangedTrigger(bucket_name="test_bucket",
prefix="test")
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "success"}) == actual
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.is_keys_unchanged_async")
+ async def test_run_pending(self, mock_is_keys_unchanged, mock_client):
+ """Test if the task is run is in triggerer successfully."""
Review Comment:
```suggestion
"""Test if the task is run in triggerer successfully."""
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]