sunank200 commented on code in PR #31940:
URL: https://github.com/apache/airflow/pull/31940#discussion_r1245162485
##########
tests/providers/amazon/aws/hooks/test_s3.py:
##########
@@ -680,6 +680,131 @@ async def test_s3__check_key_with_wild_card_async(
)
assert response is False
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_false_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives False response
+ :return:
+ """
+
+ mock_list_keys.return_value = ["test"]
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ # test for the case when current_objects < previous_objects
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_exception_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
+ """
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=False,
+ last_activity_time=None,
+ )
+
+ assert response == {"message": " test_bucket/test between pokes.",
"status": "error"}
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_pending_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
Review Comment:
fixed
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5
##########
tests/providers/amazon/aws/hooks/test_s3.py:
##########
@@ -680,6 +680,131 @@ async def test_s3__check_key_with_wild_card_async(
)
assert response is False
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_false_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives False response
+ :return:
+ """
+
+ mock_list_keys.return_value = ["test"]
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ # test for the case when current_objects < previous_objects
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=True,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_exception_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
+ """
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=1,
+ previous_objects=set("test"),
+ inactivity_seconds=0,
+ allow_delete=False,
+ last_activity_time=None,
+ )
+
+ assert response == {"message": " test_bucket/test between pokes.",
"status": "error"}
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_pending_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
+ """
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=0,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=False,
+ last_activity_time=None,
+ )
+
+ assert response.get("status") == "pending"
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_inactivity_error_async(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException
+ :return:
Review Comment:
fixed
https://github.com/apache/airflow/pull/31940/commits/f9bd957404d9b5b45ceb33d00a1c37b8670f8cf5
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]