This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 42f8d048d2 Replace usage of `datetime.utcnow` and
`datetime.utcfromtimestamp` in providers (#37138)
42f8d048d2 is described below
commit 42f8d048d2dccfcf59a44e00e9b1e8a3e63090a0
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri Feb 16 03:07:32 2024 +0400
Replace usage of `datetime.utcnow` and `datetime.utcfromtimestamp` in
providers (#37138)
* Replace usage of `datetime.utcnow` and `datetime.utcfromtimestamp` in
providers
* Apply suggested changes
Co-authored-by: Tzu-ping Chung <[email protected]>
* Remove redundand context
Co-authored-by: Tzu-ping Chung <[email protected]>
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
.../amazon/aws/log/cloudwatch_task_handler.py | 4 +--
.../providers/amazon/aws/utils/task_log_fetcher.py | 4 +--
.../providers/google/cloud/transfers/s3_to_gcs.py | 4 +--
.../providers/google/common/hooks/base_google.py | 2 +-
kubernetes_tests/test_base.py | 4 +--
tests/providers/amazon/aws/hooks/test_sagemaker.py | 4 +--
.../amazon/aws/log/test_cloudwatch_task_handler.py | 4 +--
.../celery/executors/test_celery_executor.py | 4 +--
.../executors/test_kubernetes_executor.py | 8 ++---
tests/providers/google/cloud/sensors/test_gcs.py | 42 +++++++++-------------
tests/providers/google/cloud/triggers/test_gcs.py | 4 +--
.../example_cloud_storage_transfer_service_aws.py | 4 +--
.../example_cloud_storage_transfer_service_gcp.py | 4 +--
13 files changed, 41 insertions(+), 51 deletions(-)
diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index 3a801093ca..865b30233d 100644
--- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from datetime import date, datetime, timedelta
+from datetime import date, datetime, timedelta, timezone
from functools import cached_property
from typing import TYPE_CHECKING, Any
@@ -163,7 +163,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
return "\n".join(self._event_to_str(event) for event in events)
def _event_to_str(self, event: dict) -> str:
- event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0)
+ event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0,
tz=timezone.utc)
formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
message = event["message"]
return f"[{formatted_event_dt}] {message}"
diff --git a/airflow/providers/amazon/aws/utils/task_log_fetcher.py
b/airflow/providers/amazon/aws/utils/task_log_fetcher.py
index 63110edada..a4cad6c099 100644
--- a/airflow/providers/amazon/aws/utils/task_log_fetcher.py
+++ b/airflow/providers/amazon/aws/utils/task_log_fetcher.py
@@ -18,7 +18,7 @@
from __future__ import annotations
import time
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from threading import Event, Thread
from typing import TYPE_CHECKING, Generator
@@ -87,7 +87,7 @@ class AwsTaskLogFetcher(Thread):
@staticmethod
def event_to_str(event: dict) -> str:
- event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0)
+ event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0,
tz=timezone.utc)
formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
message = event["message"]
return f"[{formatted_event_dt}] {message}"
diff --git a/airflow/providers/google/cloud/transfers/s3_to_gcs.py
b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
index cc745059a1..92dc97b4cc 100644
--- a/airflow/providers/google/cloud/transfers/s3_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from datetime import datetime
+from datetime import datetime, timezone
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any, Sequence
@@ -276,7 +276,7 @@ class S3ToGCSOperator(S3ListOperator):
)
def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook,
s3_hook: S3Hook) -> list[str]:
- now = datetime.utcnow()
+ now = datetime.now(tz=timezone.utc)
one_time_schedule = {"day": now.day, "month": now.month, "year":
now.year}
gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
diff --git a/airflow/providers/google/common/hooks/base_google.py
b/airflow/providers/google/common/hooks/base_google.py
index a83524422a..bee2798267 100644
--- a/airflow/providers/google/common/hooks/base_google.py
+++ b/airflow/providers/google/common/hooks/base_google.py
@@ -674,7 +674,7 @@ class _CredentialsToken(Token):
self.access_token = cast(str, self.credentials.token)
self.access_token_duration = 3600
- self.access_token_acquired_at = datetime.datetime.utcnow()
+ self.access_token_acquired_at =
datetime.datetime.now(tz=datetime.timezone.utc)
self.acquiring = None
diff --git a/kubernetes_tests/test_base.py b/kubernetes_tests/test_base.py
index 715bbbb237..17d1c99543 100644
--- a/kubernetes_tests/test_base.py
+++ b/kubernetes_tests/test_base.py
@@ -20,7 +20,7 @@ import os
import subprocess
import tempfile
import time
-from datetime import datetime
+from datetime import datetime, timezone
from pathlib import Path
from subprocess import check_call, check_output
@@ -77,7 +77,7 @@ class BaseK8STest:
with open(output_file_path, "w") as output_file:
print("=" * 80, file=output_file)
print(f"Describe resources for namespace {namespace}",
file=output_file)
- print(f"Datetime: {datetime.utcnow()}", file=output_file)
+ print(f"Datetime: {datetime.now(tz=timezone.utc)}",
file=output_file)
print("=" * 80, file=output_file)
print("Describing pods", file=output_file)
print("-" * 80, file=output_file)
diff --git a/tests/providers/amazon/aws/hooks/test_sagemaker.py
b/tests/providers/amazon/aws/hooks/test_sagemaker.py
index 7c3549c9b7..5cdf6438ca 100644
--- a/tests/providers/amazon/aws/hooks/test_sagemaker.py
+++ b/tests/providers/amazon/aws/hooks/test_sagemaker.py
@@ -18,7 +18,7 @@
from __future__ import annotations
import time
-from datetime import datetime
+from datetime import datetime, timezone
from unittest import mock
from unittest.mock import patch
@@ -528,7 +528,7 @@ class TestSageMakerHook:
def test_secondary_training_status_message_status_changed(self):
now = datetime.now(tzlocal())
SECONDARY_STATUS_DESCRIPTION_1["LastModifiedTime"] = now
- expected_time =
datetime.utcfromtimestamp(time.mktime(now.timetuple())).strftime("%Y-%m-%d
%H:%M:%S")
+ expected_time = now.astimezone(tz=timezone.utc).strftime("%Y-%m-%d
%H:%M:%S")
expected = f"{expected_time} {status} - {message}"
assert (
secondary_training_status_message(SECONDARY_STATUS_DESCRIPTION_1,
SECONDARY_STATUS_DESCRIPTION_2)
diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
index 7ceb423481..4f9ecb1664 100644
--- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -19,7 +19,7 @@ from __future__ import annotations
import logging
import time
-from datetime import datetime as dt, timedelta
+from datetime import datetime as dt, timedelta, timezone
from unittest import mock
from unittest.mock import ANY, Mock, call
@@ -40,7 +40,7 @@ from tests.test_utils.config import conf_vars
def get_time_str(time_in_milliseconds):
- dt_time = dt.utcfromtimestamp(time_in_milliseconds / 1000.0)
+ dt_time = dt.fromtimestamp(time_in_milliseconds / 1000.0, tz=timezone.utc)
return dt_time.strftime("%Y-%m-%d %H:%M:%S,000")
diff --git a/tests/providers/celery/executors/test_celery_executor.py
b/tests/providers/celery/executors/test_celery_executor.py
index 450312bd63..a32d710048 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -22,7 +22,7 @@ import logging
import os
import signal
import sys
-from datetime import datetime, timedelta
+from datetime import timedelta
from unittest import mock
# leave this it is used by the test worker
@@ -183,7 +183,7 @@ class TestCeleryExecutor:
@pytest.mark.backend("mysql", "postgres")
def test_try_adopt_task_instances_none(self):
- start_date = datetime.utcnow() - timedelta(days=2)
+ start_date = timezone.utcnow() - timedelta(days=2)
with DAG("test_try_adopt_task_instances_none"):
task_1 = BaseOperator(task_id="task_1", start_date=start_date)
diff --git
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index d74f4636c8..d00342fd98 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -573,7 +573,7 @@ class TestKubernetesExecutor:
try:
assert executor.event_buffer == {}
executor.execute_async(
- key=("dag", "task", datetime.utcnow(), 1),
+ key=("dag", "task", timezone.utcnow(), 1),
queue=None,
command=["airflow", "tasks", "run", "true", "some_parameter"],
executor_config=k8s.V1Pod(
@@ -1540,7 +1540,7 @@ class TestKubernetesJobWatcher:
def test_process_status_pending_deleted(self):
self.events.append({"type": "DELETED", "object": self.pod})
- self.pod.metadata.deletion_timestamp = datetime.utcnow()
+ self.pod.metadata.deletion_timestamp = timezone.utcnow()
self._run()
self.assert_watcher_queue_called_once_with_state(State.FAILED)
@@ -1570,7 +1570,7 @@ class TestKubernetesJobWatcher:
def test_process_status_succeeded_dedup_timestamp(self):
self.pod.status.phase = "Succeeded"
- self.pod.metadata.deletion_timestamp = datetime.utcnow()
+ self.pod.metadata.deletion_timestamp = timezone.utcnow()
self.events.append({"type": "MODIFIED", "object": self.pod})
self._run()
@@ -1604,7 +1604,7 @@ class TestKubernetesJobWatcher:
def test_process_status_running_deleted(self):
self.pod.status.phase = "Running"
- self.pod.metadata.deletion_timestamp = datetime.utcnow()
+ self.pod.metadata.deletion_timestamp = timezone.utcnow()
self.events.append({"type": "DELETED", "object": self.pod})
self._run()
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py
b/tests/providers/google/cloud/sensors/test_gcs.py
index 37697ff58d..641b2052fd 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from datetime import datetime, timedelta, timezone
+from datetime import datetime, timedelta
from unittest import mock
import pendulum
@@ -45,6 +45,7 @@ from airflow.providers.google.cloud.triggers.gcs import (
GCSPrefixBlobTrigger,
GCSUploadSessionTrigger,
)
+from airflow.utils import timezone
TEST_BUCKET = "TEST_BUCKET"
@@ -67,15 +68,6 @@ TEST_INACTIVITY_PERIOD = 5
TEST_MIN_OBJECTS = 1
[email protected]()
-def context():
- """
- Creates an empty context.
- """
- context = {"data_interval_end": datetime.utcnow()}
- yield context
-
-
def next_time_side_effect():
"""
This each time this is called mock a time 10 seconds later
@@ -159,7 +151,7 @@ class TestGoogleCloudStorageObjectSensor:
)
mock_hook.return_value.exists.return_value = False
with pytest.raises(TaskDeferred) as exc:
- task.execute(context)
+ task.execute({})
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not
a GCSBlobTrigger"
@pytest.mark.parametrize(
@@ -213,7 +205,7 @@ class TestGoogleCloudStorageObjectAsyncSensor:
)
mock_hook.return_value.exists.return_value = False
with pytest.raises(TaskDeferred) as exc:
- task.execute(context)
+ task.execute({})
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not
a GCSBlobTrigger"
@pytest.mark.parametrize(
@@ -329,20 +321,20 @@ class TestGCSObjectUpdateAsyncSensor:
@pytest.mark.parametrize(
"soft_fail, expected_exception", ((False, AirflowException), (True,
AirflowSkipException))
)
- def test_gcs_object_update_async_sensor_execute_failure(self, context,
soft_fail, expected_exception):
+ def test_gcs_object_update_async_sensor_execute_failure(self, soft_fail,
expected_exception):
"""Tests that an AirflowException is raised in case of error event"""
self.OPERATOR.soft_fail = soft_fail
with pytest.raises(expected_exception):
self.OPERATOR.execute_complete(
- context=context, event={"status": "error", "message": "test
failure message"}
+ context={}, event={"status": "error", "message": "test failure
message"}
)
- def test_gcs_object_update_async_sensor_execute_complete(self, context):
+ def test_gcs_object_update_async_sensor_execute_complete(self):
"""Asserts that logging occurs as expected"""
with mock.patch.object(self.OPERATOR.log, "info") as mock_log_info:
self.OPERATOR.execute_complete(
- context=context, event={"status": "success", "message": "Job
completed"}
+ context={}, event={"status": "success", "message": "Job
completed"}
)
mock_log_info.assert_called_with(
"Checking last updated time for object %s in bucket : %s",
TEST_OBJECT, TEST_BUCKET
@@ -462,21 +454,21 @@ class TestGCSObjectsWithPrefixExistenceAsyncSensor:
"soft_fail, expected_exception", ((False, AirflowException), (True,
AirflowSkipException))
)
def test_gcs_object_with_prefix_existence_async_sensor_execute_failure(
- self, context, soft_fail, expected_exception
+ self, soft_fail, expected_exception
):
"""Tests that an AirflowException is raised in case of error event"""
self.OPERATOR.soft_fail = soft_fail
with pytest.raises(expected_exception):
self.OPERATOR.execute_complete(
- context=context, event={"status": "error", "message": "test
failure message"}
+ context={}, event={"status": "error", "message": "test failure
message"}
)
- def
test_gcs_object_with_prefix_existence_async_sensor_execute_complete(self,
context):
+ def
test_gcs_object_with_prefix_existence_async_sensor_execute_complete(self):
"""Asserts that logging occurs as expected"""
with mock.patch.object(self.OPERATOR.log, "info") as mock_log_info:
self.OPERATOR.execute_complete(
- context=context,
+ context={},
event={"status": "success", "message": "Job completed",
"matches": [TEST_OBJECT]},
)
mock_log_info.assert_called_with("Resuming from trigger and checking
status")
@@ -609,18 +601,16 @@ class TestGCSUploadSessionCompleteAsyncSensor:
@pytest.mark.parametrize(
"soft_fail, expected_exception", ((False, AirflowException), (True,
AirflowSkipException))
)
- def test_gcs_upload_session_complete_sensor_execute_failure(self, context,
soft_fail, expected_exception):
+ def test_gcs_upload_session_complete_sensor_execute_failure(self,
soft_fail, expected_exception):
"""Tests that an AirflowException is raised in case of error event"""
self.OPERATOR.soft_fail = soft_fail
with pytest.raises(expected_exception):
self.OPERATOR.execute_complete(
- context=context, event={"status": "error", "message": "test
failure message"}
+ context={}, event={"status": "error", "message": "test failure
message"}
)
- def test_gcs_upload_session_complete_async_sensor_execute_complete(self,
context):
+ def test_gcs_upload_session_complete_async_sensor_execute_complete(self):
"""Asserts that execute complete is completed as expected"""
- assert self.OPERATOR.execute_complete(
- context=context, event={"status": "success", "message": "success"}
- )
+ assert self.OPERATOR.execute_complete(context={}, event={"status":
"success", "message": "success"})
diff --git a/tests/providers/google/cloud/triggers/test_gcs.py
b/tests/providers/google/cloud/triggers/test_gcs.py
index 3c4bc9031a..97a5597d97 100644
--- a/tests/providers/google/cloud/triggers/test_gcs.py
+++ b/tests/providers/google/cloud/triggers/test_gcs.py
@@ -18,7 +18,7 @@
from __future__ import annotations
import asyncio
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from typing import Any
from unittest import mock
from unittest.mock import AsyncMock
@@ -41,7 +41,7 @@ TEST_PREFIX = "TEST_PREFIX"
TEST_GCP_CONN_ID = "TEST_GCP_CONN_ID"
TEST_POLLING_INTERVAL = 3.0
TEST_HOOK_PARAMS: dict[str, Any] = {}
-TEST_TS_OBJECT = datetime.utcnow()
+TEST_TS_OBJECT = datetime.now(tz=timezone.utc)
TEST_INACTIVITY_PERIOD = 5.0
diff --git
a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
index dee9ce7282..ba64050997 100644
---
a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
+++
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
@@ -22,7 +22,7 @@ from __future__ import annotations
import os
from copy import deepcopy
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator,
S3DeleteBucketOperator
@@ -86,7 +86,7 @@ aws_to_gcs_transfer_body = {
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
- START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=1)).time(),
+ START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) +
timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
diff --git
a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
index 67159b8747..b173abfffa 100644
---
a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
+++
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
@@ -23,7 +23,7 @@ Example Airflow DAG that demonstrates interactions with
Google Cloud Transfer.
from __future__ import annotations
import os
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from pathlib import Path
from airflow.models.dag import DAG
@@ -83,7 +83,7 @@ gcs_to_gcs_transfer_body = {
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
- START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(),
+ START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) +
timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},