This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 42f8d048d2 Replace usage of `datetime.utcnow` and 
`datetime.utcfromtimestamp` in providers (#37138)
42f8d048d2 is described below

commit 42f8d048d2dccfcf59a44e00e9b1e8a3e63090a0
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri Feb 16 03:07:32 2024 +0400

    Replace usage of `datetime.utcnow` and `datetime.utcfromtimestamp` in 
providers (#37138)
    
    * Replace usage of `datetime.utcnow` and `datetime.utcfromtimestamp` in 
providers
    
    * Apply suggested changes
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    
    * Remove redundand context
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 .../amazon/aws/log/cloudwatch_task_handler.py      |  4 +--
 .../providers/amazon/aws/utils/task_log_fetcher.py |  4 +--
 .../providers/google/cloud/transfers/s3_to_gcs.py  |  4 +--
 .../providers/google/common/hooks/base_google.py   |  2 +-
 kubernetes_tests/test_base.py                      |  4 +--
 tests/providers/amazon/aws/hooks/test_sagemaker.py |  4 +--
 .../amazon/aws/log/test_cloudwatch_task_handler.py |  4 +--
 .../celery/executors/test_celery_executor.py       |  4 +--
 .../executors/test_kubernetes_executor.py          |  8 ++---
 tests/providers/google/cloud/sensors/test_gcs.py   | 42 +++++++++-------------
 tests/providers/google/cloud/triggers/test_gcs.py  |  4 +--
 .../example_cloud_storage_transfer_service_aws.py  |  4 +--
 .../example_cloud_storage_transfer_service_gcp.py  |  4 +--
 13 files changed, 41 insertions(+), 51 deletions(-)

diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py 
b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index 3a801093ca..865b30233d 100644
--- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -17,7 +17,7 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import date, datetime, timedelta
+from datetime import date, datetime, timedelta, timezone
 from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
@@ -163,7 +163,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
         return "\n".join(self._event_to_str(event) for event in events)
 
     def _event_to_str(self, event: dict) -> str:
-        event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0)
+        event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, 
tz=timezone.utc)
         formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
         message = event["message"]
         return f"[{formatted_event_dt}] {message}"
diff --git a/airflow/providers/amazon/aws/utils/task_log_fetcher.py 
b/airflow/providers/amazon/aws/utils/task_log_fetcher.py
index 63110edada..a4cad6c099 100644
--- a/airflow/providers/amazon/aws/utils/task_log_fetcher.py
+++ b/airflow/providers/amazon/aws/utils/task_log_fetcher.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 import time
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
 from threading import Event, Thread
 from typing import TYPE_CHECKING, Generator
 
@@ -87,7 +87,7 @@ class AwsTaskLogFetcher(Thread):
 
     @staticmethod
     def event_to_str(event: dict) -> str:
-        event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0)
+        event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, 
tz=timezone.utc)
         formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
         message = event["message"]
         return f"[{formatted_event_dt}] {message}"
diff --git a/airflow/providers/google/cloud/transfers/s3_to_gcs.py 
b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
index cc745059a1..92dc97b4cc 100644
--- a/airflow/providers/google/cloud/transfers/s3_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
@@ -17,7 +17,7 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime
+from datetime import datetime, timezone
 from tempfile import NamedTemporaryFile
 from typing import TYPE_CHECKING, Any, Sequence
 
@@ -276,7 +276,7 @@ class S3ToGCSOperator(S3ListOperator):
         )
 
     def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, 
s3_hook: S3Hook) -> list[str]:
-        now = datetime.utcnow()
+        now = datetime.now(tz=timezone.utc)
         one_time_schedule = {"day": now.day, "month": now.month, "year": 
now.year}
 
         gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
diff --git a/airflow/providers/google/common/hooks/base_google.py 
b/airflow/providers/google/common/hooks/base_google.py
index a83524422a..bee2798267 100644
--- a/airflow/providers/google/common/hooks/base_google.py
+++ b/airflow/providers/google/common/hooks/base_google.py
@@ -674,7 +674,7 @@ class _CredentialsToken(Token):
 
         self.access_token = cast(str, self.credentials.token)
         self.access_token_duration = 3600
-        self.access_token_acquired_at = datetime.datetime.utcnow()
+        self.access_token_acquired_at = 
datetime.datetime.now(tz=datetime.timezone.utc)
         self.acquiring = None
 
 
diff --git a/kubernetes_tests/test_base.py b/kubernetes_tests/test_base.py
index 715bbbb237..17d1c99543 100644
--- a/kubernetes_tests/test_base.py
+++ b/kubernetes_tests/test_base.py
@@ -20,7 +20,7 @@ import os
 import subprocess
 import tempfile
 import time
-from datetime import datetime
+from datetime import datetime, timezone
 from pathlib import Path
 from subprocess import check_call, check_output
 
@@ -77,7 +77,7 @@ class BaseK8STest:
         with open(output_file_path, "w") as output_file:
             print("=" * 80, file=output_file)
             print(f"Describe resources for namespace {namespace}", 
file=output_file)
-            print(f"Datetime: {datetime.utcnow()}", file=output_file)
+            print(f"Datetime: {datetime.now(tz=timezone.utc)}", 
file=output_file)
             print("=" * 80, file=output_file)
             print("Describing pods", file=output_file)
             print("-" * 80, file=output_file)
diff --git a/tests/providers/amazon/aws/hooks/test_sagemaker.py 
b/tests/providers/amazon/aws/hooks/test_sagemaker.py
index 7c3549c9b7..5cdf6438ca 100644
--- a/tests/providers/amazon/aws/hooks/test_sagemaker.py
+++ b/tests/providers/amazon/aws/hooks/test_sagemaker.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 import time
-from datetime import datetime
+from datetime import datetime, timezone
 from unittest import mock
 from unittest.mock import patch
 
@@ -528,7 +528,7 @@ class TestSageMakerHook:
     def test_secondary_training_status_message_status_changed(self):
         now = datetime.now(tzlocal())
         SECONDARY_STATUS_DESCRIPTION_1["LastModifiedTime"] = now
-        expected_time = 
datetime.utcfromtimestamp(time.mktime(now.timetuple())).strftime("%Y-%m-%d 
%H:%M:%S")
+        expected_time = now.astimezone(tz=timezone.utc).strftime("%Y-%m-%d 
%H:%M:%S")
         expected = f"{expected_time} {status} - {message}"
         assert (
             secondary_training_status_message(SECONDARY_STATUS_DESCRIPTION_1, 
SECONDARY_STATUS_DESCRIPTION_2)
diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py 
b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
index 7ceb423481..4f9ecb1664 100644
--- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 import logging
 import time
-from datetime import datetime as dt, timedelta
+from datetime import datetime as dt, timedelta, timezone
 from unittest import mock
 from unittest.mock import ANY, Mock, call
 
@@ -40,7 +40,7 @@ from tests.test_utils.config import conf_vars
 
 
 def get_time_str(time_in_milliseconds):
-    dt_time = dt.utcfromtimestamp(time_in_milliseconds / 1000.0)
+    dt_time = dt.fromtimestamp(time_in_milliseconds / 1000.0, tz=timezone.utc)
     return dt_time.strftime("%Y-%m-%d %H:%M:%S,000")
 
 
diff --git a/tests/providers/celery/executors/test_celery_executor.py 
b/tests/providers/celery/executors/test_celery_executor.py
index 450312bd63..a32d710048 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -22,7 +22,7 @@ import logging
 import os
 import signal
 import sys
-from datetime import datetime, timedelta
+from datetime import timedelta
 from unittest import mock
 
 # leave this it is used by the test worker
@@ -183,7 +183,7 @@ class TestCeleryExecutor:
 
     @pytest.mark.backend("mysql", "postgres")
     def test_try_adopt_task_instances_none(self):
-        start_date = datetime.utcnow() - timedelta(days=2)
+        start_date = timezone.utcnow() - timedelta(days=2)
 
         with DAG("test_try_adopt_task_instances_none"):
             task_1 = BaseOperator(task_id="task_1", start_date=start_date)
diff --git 
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index d74f4636c8..d00342fd98 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -573,7 +573,7 @@ class TestKubernetesExecutor:
         try:
             assert executor.event_buffer == {}
             executor.execute_async(
-                key=("dag", "task", datetime.utcnow(), 1),
+                key=("dag", "task", timezone.utcnow(), 1),
                 queue=None,
                 command=["airflow", "tasks", "run", "true", "some_parameter"],
                 executor_config=k8s.V1Pod(
@@ -1540,7 +1540,7 @@ class TestKubernetesJobWatcher:
 
     def test_process_status_pending_deleted(self):
         self.events.append({"type": "DELETED", "object": self.pod})
-        self.pod.metadata.deletion_timestamp = datetime.utcnow()
+        self.pod.metadata.deletion_timestamp = timezone.utcnow()
 
         self._run()
         self.assert_watcher_queue_called_once_with_state(State.FAILED)
@@ -1570,7 +1570,7 @@ class TestKubernetesJobWatcher:
 
     def test_process_status_succeeded_dedup_timestamp(self):
         self.pod.status.phase = "Succeeded"
-        self.pod.metadata.deletion_timestamp = datetime.utcnow()
+        self.pod.metadata.deletion_timestamp = timezone.utcnow()
         self.events.append({"type": "MODIFIED", "object": self.pod})
 
         self._run()
@@ -1604,7 +1604,7 @@ class TestKubernetesJobWatcher:
 
     def test_process_status_running_deleted(self):
         self.pod.status.phase = "Running"
-        self.pod.metadata.deletion_timestamp = datetime.utcnow()
+        self.pod.metadata.deletion_timestamp = timezone.utcnow()
         self.events.append({"type": "DELETED", "object": self.pod})
 
         self._run()
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py 
b/tests/providers/google/cloud/sensors/test_gcs.py
index 37697ff58d..641b2052fd 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -17,7 +17,7 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime, timedelta, timezone
+from datetime import datetime, timedelta
 from unittest import mock
 
 import pendulum
@@ -45,6 +45,7 @@ from airflow.providers.google.cloud.triggers.gcs import (
     GCSPrefixBlobTrigger,
     GCSUploadSessionTrigger,
 )
+from airflow.utils import timezone
 
 TEST_BUCKET = "TEST_BUCKET"
 
@@ -67,15 +68,6 @@ TEST_INACTIVITY_PERIOD = 5
 TEST_MIN_OBJECTS = 1
 
 
[email protected]()
-def context():
-    """
-    Creates an empty context.
-    """
-    context = {"data_interval_end": datetime.utcnow()}
-    yield context
-
-
 def next_time_side_effect():
     """
     This each time this is called mock a time 10 seconds later
@@ -159,7 +151,7 @@ class TestGoogleCloudStorageObjectSensor:
         )
         mock_hook.return_value.exists.return_value = False
         with pytest.raises(TaskDeferred) as exc:
-            task.execute(context)
+            task.execute({})
         assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not 
a GCSBlobTrigger"
 
     @pytest.mark.parametrize(
@@ -213,7 +205,7 @@ class TestGoogleCloudStorageObjectAsyncSensor:
             )
         mock_hook.return_value.exists.return_value = False
         with pytest.raises(TaskDeferred) as exc:
-            task.execute(context)
+            task.execute({})
         assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not 
a GCSBlobTrigger"
 
     @pytest.mark.parametrize(
@@ -329,20 +321,20 @@ class TestGCSObjectUpdateAsyncSensor:
     @pytest.mark.parametrize(
         "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
     )
-    def test_gcs_object_update_async_sensor_execute_failure(self, context, 
soft_fail, expected_exception):
+    def test_gcs_object_update_async_sensor_execute_failure(self, soft_fail, 
expected_exception):
         """Tests that an AirflowException is raised in case of error event"""
         self.OPERATOR.soft_fail = soft_fail
         with pytest.raises(expected_exception):
             self.OPERATOR.execute_complete(
-                context=context, event={"status": "error", "message": "test 
failure message"}
+                context={}, event={"status": "error", "message": "test failure 
message"}
             )
 
-    def test_gcs_object_update_async_sensor_execute_complete(self, context):
+    def test_gcs_object_update_async_sensor_execute_complete(self):
         """Asserts that logging occurs as expected"""
 
         with mock.patch.object(self.OPERATOR.log, "info") as mock_log_info:
             self.OPERATOR.execute_complete(
-                context=context, event={"status": "success", "message": "Job 
completed"}
+                context={}, event={"status": "success", "message": "Job 
completed"}
             )
         mock_log_info.assert_called_with(
             "Checking last updated time for object %s in bucket : %s", 
TEST_OBJECT, TEST_BUCKET
@@ -462,21 +454,21 @@ class TestGCSObjectsWithPrefixExistenceAsyncSensor:
         "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
     )
     def test_gcs_object_with_prefix_existence_async_sensor_execute_failure(
-        self, context, soft_fail, expected_exception
+        self, soft_fail, expected_exception
     ):
         """Tests that an AirflowException is raised in case of error event"""
         self.OPERATOR.soft_fail = soft_fail
         with pytest.raises(expected_exception):
             self.OPERATOR.execute_complete(
-                context=context, event={"status": "error", "message": "test 
failure message"}
+                context={}, event={"status": "error", "message": "test failure 
message"}
             )
 
-    def 
test_gcs_object_with_prefix_existence_async_sensor_execute_complete(self, 
context):
+    def 
test_gcs_object_with_prefix_existence_async_sensor_execute_complete(self):
         """Asserts that logging occurs as expected"""
 
         with mock.patch.object(self.OPERATOR.log, "info") as mock_log_info:
             self.OPERATOR.execute_complete(
-                context=context,
+                context={},
                 event={"status": "success", "message": "Job completed", 
"matches": [TEST_OBJECT]},
             )
         mock_log_info.assert_called_with("Resuming from trigger and checking 
status")
@@ -609,18 +601,16 @@ class TestGCSUploadSessionCompleteAsyncSensor:
     @pytest.mark.parametrize(
         "soft_fail, expected_exception", ((False, AirflowException), (True, 
AirflowSkipException))
     )
-    def test_gcs_upload_session_complete_sensor_execute_failure(self, context, 
soft_fail, expected_exception):
+    def test_gcs_upload_session_complete_sensor_execute_failure(self, 
soft_fail, expected_exception):
         """Tests that an AirflowException is raised in case of error event"""
 
         self.OPERATOR.soft_fail = soft_fail
         with pytest.raises(expected_exception):
             self.OPERATOR.execute_complete(
-                context=context, event={"status": "error", "message": "test 
failure message"}
+                context={}, event={"status": "error", "message": "test failure 
message"}
             )
 
-    def test_gcs_upload_session_complete_async_sensor_execute_complete(self, 
context):
+    def test_gcs_upload_session_complete_async_sensor_execute_complete(self):
         """Asserts that execute complete is completed as expected"""
 
-        assert self.OPERATOR.execute_complete(
-            context=context, event={"status": "success", "message": "success"}
-        )
+        assert self.OPERATOR.execute_complete(context={}, event={"status": 
"success", "message": "success"})
diff --git a/tests/providers/google/cloud/triggers/test_gcs.py 
b/tests/providers/google/cloud/triggers/test_gcs.py
index 3c4bc9031a..97a5597d97 100644
--- a/tests/providers/google/cloud/triggers/test_gcs.py
+++ b/tests/providers/google/cloud/triggers/test_gcs.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 import asyncio
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
 from typing import Any
 from unittest import mock
 from unittest.mock import AsyncMock
@@ -41,7 +41,7 @@ TEST_PREFIX = "TEST_PREFIX"
 TEST_GCP_CONN_ID = "TEST_GCP_CONN_ID"
 TEST_POLLING_INTERVAL = 3.0
 TEST_HOOK_PARAMS: dict[str, Any] = {}
-TEST_TS_OBJECT = datetime.utcnow()
+TEST_TS_OBJECT = datetime.now(tz=timezone.utc)
 
 
 TEST_INACTIVITY_PERIOD = 5.0
diff --git 
a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
 
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
index dee9ce7282..ba64050997 100644
--- 
a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
+++ 
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 
 import os
 from copy import deepcopy
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
 
 from airflow.models.dag import DAG
 from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, 
S3DeleteBucketOperator
@@ -86,7 +86,7 @@ aws_to_gcs_transfer_body = {
     SCHEDULE: {
         SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
         SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
-        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=1)).time(),
+        START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + 
timedelta(minutes=1)).time(),
     },
     TRANSFER_SPEC: {
         AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
diff --git 
a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
 
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
index 67159b8747..b173abfffa 100644
--- 
a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
+++ 
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
@@ -23,7 +23,7 @@ Example Airflow DAG that demonstrates interactions with 
Google Cloud Transfer.
 from __future__ import annotations
 
 import os
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
 from pathlib import Path
 
 from airflow.models.dag import DAG
@@ -83,7 +83,7 @@ gcs_to_gcs_transfer_body = {
     SCHEDULE: {
         SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
         SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
-        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(),
+        START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + 
timedelta(seconds=120)).time(),
     },
     TRANSFER_SPEC: {
         GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},

Reply via email to