This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a1e1dc60fb handle tzinfo in S3Hook.is_keys_unchanged_async (#36363)
a1e1dc60fb is described below
commit a1e1dc60fb70d102451a6a819ccc78c079b65ddd
Author: Wei Lee <[email protected]>
AuthorDate: Mon Dec 25 19:29:36 2023 +0530
handle tzinfo in S3Hook.is_keys_unchanged_async (#36363)
* fix(providers/amazon): handle tzinfo in S3Hook.is_keys_unchanged_async
* test(providers/amazon): add timezone info to is_keys_unchanged_async test
case
* test(providers/amazon): add test case to test last_activity_time without
tzinfo
---
airflow/providers/amazon/aws/hooks/s3.py | 4 ++-
tests/providers/amazon/aws/hooks/test_s3.py | 53 ++++++++++++++++++++++++++++-
2 files changed, 55 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/s3.py
b/airflow/providers/amazon/aws/hooks/s3.py
index 6bd0b0a750..d75e3e337a 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -713,7 +713,9 @@ class S3Hook(AwsBaseHook):
}
if last_activity_time:
- inactivity_seconds = int((datetime.now() -
last_activity_time).total_seconds())
+ inactivity_seconds = int(
+ (datetime.now(last_activity_time.tzinfo) -
last_activity_time).total_seconds()
+ )
else:
# Handles the first poke where last inactivity time is None.
last_activity_time = datetime.now()
diff --git a/tests/providers/amazon/aws/hooks/test_s3.py
b/tests/providers/amazon/aws/hooks/test_s3.py
index bc4448cebc..b1b52b3557 100644
--- a/tests/providers/amazon/aws/hooks/test_s3.py
+++ b/tests/providers/amazon/aws/hooks/test_s3.py
@@ -22,6 +22,7 @@ import inspect
import os
import re
import unittest
+from datetime import datetime as std_datetime, timezone
from unittest import mock, mock as async_mock
from unittest.mock import MagicMock, Mock, patch
from urllib.parse import parse_qs
@@ -762,7 +763,7 @@ class TestAwsS3Hook:
@pytest.mark.asyncio
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
- async def test_s3_key_hook_is_keys_unchanged_pending_async(self,
mock_list_keys, mock_client):
+ async def test_s3_key_hook_is_keys_unchanged_async_handle_tzinfo(self,
mock_list_keys, mock_client):
"""
Test is_key_unchanged gives AirflowException.
"""
@@ -812,6 +813,56 @@ class TestAwsS3Hook:
"message": "FAILURE: Inactivity Period passed, not enough objects
found in test_bucket/test",
}
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def test_s3_key_hook_is_keys_unchanged_pending_async_without_tzinfo(
+ self, mock_list_keys, mock_client
+ ):
+ """
+ Test is_key_unchanged gives AirflowException.
+ """
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=0,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=False,
+ last_activity_time=std_datetime.now(),
+ )
+ assert response.get("status") == "pending"
+
+ @pytest.mark.asyncio
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+ async def
test_s3_key_hook_is_keys_unchanged_pending_async_with_tzinfo(self,
mock_list_keys, mock_client):
+ """
+ Test is_key_unchanged gives AirflowException.
+ """
+ mock_list_keys.return_value = []
+
+ s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+ response = await s3_hook_async.is_keys_unchanged_async(
+ client=mock_client.return_value,
+ bucket_name="test_bucket",
+ prefix="test",
+ inactivity_period=1,
+ min_objects=0,
+ previous_objects=set(),
+ inactivity_seconds=0,
+ allow_delete=False,
+ last_activity_time=std_datetime.now(timezone.utc),
+ )
+ assert response.get("status") == "pending"
+
def test_load_bytes(self, s3_bucket):
hook = S3Hook()
hook.load_bytes(b"Content", "my_key", s3_bucket)