This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f07c6b717 Replace `datetime.datetime.utcnow` by 
`airflow.utils.timezone.utcnow` in core (#35448)
2f07c6b717 is described below

commit 2f07c6b7173bf6909db6c25e812bfa95018d8182
Author: Andrey Anshin <[email protected]>
AuthorDate: Wed Jan 31 14:52:33 2024 +0400

    Replace `datetime.datetime.utcnow` by `airflow.utils.timezone.utcnow` in 
core (#35448)
---
 airflow/secrets/cache.py                       |  5 +++--
 airflow/task/task_runner/cgroup_task_runner.py |  4 ++--
 airflow/timetables/interval.py                 |  6 +++---
 airflow/timetables/simple.py                   | 10 ++++++----
 airflow/timetables/trigger.py                  |  6 +++---
 airflow/utils/cli.py                           |  7 +++----
 airflow/utils/db_cleanup.py                    |  4 +---
 airflow/utils/jwt_signer.py                    | 10 ++++++----
 airflow/utils/log/file_processor_handler.py    |  5 ++---
 airflow/utils/timezone.py                      |  8 +-------
 airflow/www/views.py                           |  8 ++++----
 tests/conftest.py                              |  4 ++--
 tests/dags/test_impersonation_subdag.py        |  6 +++---
 tests/dags/test_scheduler_dags.py              |  7 ++++---
 tests/decorators/test_external_python.py       |  2 +-
 tests/decorators/test_python_virtualenv.py     |  2 +-
 tests/jobs/test_triggerer_job.py               |  4 ++--
 tests/models/test_skipmixin.py                 |  4 ++--
 tests/operators/test_bash.py                   |  2 +-
 tests/operators/test_python.py                 |  4 ++--
 tests/serialization/test_serialized_objects.py | 15 ++++++++-------
 tests/utils/test_cli_util.py                   |  5 ++---
 tests/utils/test_db_cleanup.py                 |  6 +++---
 tests/utils/test_serve_logs.py                 | 19 ++++++++++---------
 24 files changed, 75 insertions(+), 78 deletions(-)

diff --git a/airflow/secrets/cache.py b/airflow/secrets/cache.py
index 25b315efcd..9902a05f03 100644
--- a/airflow/secrets/cache.py
+++ b/airflow/secrets/cache.py
@@ -21,6 +21,7 @@ import datetime
 import multiprocessing
 
 from airflow.configuration import conf
+from airflow.utils import timezone
 
 
 class SecretCache:
@@ -36,10 +37,10 @@ class SecretCache:
     class _CacheValue:
         def __init__(self, value: str | None) -> None:
             self.value = value
-            self.date = datetime.datetime.utcnow()
+            self.date = timezone.utcnow()
 
         def is_expired(self, ttl: datetime.timedelta) -> bool:
-            return datetime.datetime.utcnow() - self.date > ttl
+            return timezone.utcnow() - self.date > ttl
 
     _VARIABLE_PREFIX = "__v_"
     _CONNECTION_PREFIX = "__c_"
diff --git a/airflow/task/task_runner/cgroup_task_runner.py 
b/airflow/task/task_runner/cgroup_task_runner.py
index 7034058c5b..3ec1fccaa5 100644
--- a/airflow/task/task_runner/cgroup_task_runner.py
+++ b/airflow/task/task_runner/cgroup_task_runner.py
@@ -18,7 +18,6 @@
 """Task runner for cgroup to run Airflow task."""
 from __future__ import annotations
 
-import datetime
 import os
 import uuid
 from typing import TYPE_CHECKING
@@ -27,6 +26,7 @@ import psutil
 from cgroupspy import trees
 
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
+from airflow.utils import timezone
 from airflow.utils.operator_resources import Resources
 from airflow.utils.platform import getuser
 from airflow.utils.process_utils import reap_process_group
@@ -137,7 +137,7 @@ class CgroupTaskRunner(BaseTaskRunner):
             return
 
         # Create a unique cgroup name
-        cgroup_name = 
f"airflow/{datetime.datetime.utcnow():%Y-%m-%d}/{uuid.uuid4()}"
+        cgroup_name = f"airflow/{timezone.utcnow():%Y-%m-%d}/{uuid.uuid4()}"
 
         self.mem_cgroup_name = f"memory/{cgroup_name}"
         self.cpu_cgroup_name = f"cpu/{cgroup_name}"
diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index 6850268de3..02264a9bef 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -25,7 +25,7 @@ from pendulum import DateTime
 from airflow.exceptions import AirflowTimetableInvalid
 from airflow.timetables._cron import CronMixin
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
-from airflow.utils.timezone import convert_to_utc
+from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow
 
 if TYPE_CHECKING:
     from airflow.timetables.base import TimeRestriction
@@ -146,7 +146,7 @@ class CronDataIntervalTimetable(CronMixin, 
_DataIntervalTimetable):
         If the next schedule should start *right now*, we want the data 
interval
         that start now, not the one that ends now.
         """
-        current_time = DateTime.utcnow()
+        current_time = coerce_datetime(utcnow())
         last_start = self._get_prev(current_time)
         next_start = self._get_next(last_start)
         if next_start == current_time:  # Current time is on interval boundary.
@@ -257,7 +257,7 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
 
         This is slightly different from the cron version at terminal values.
         """
-        round_current_time = self._round(DateTime.utcnow())
+        round_current_time = self._round(coerce_datetime(utcnow()))
         new_start = self._get_prev(round_current_time)
         if earliest is None:
             return new_start
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index f215b81bd1..f8ee8474af 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -19,11 +19,11 @@ from __future__ import annotations
 import operator
 from typing import TYPE_CHECKING, Any, Collection
 
-from pendulum import DateTime
-
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+from airflow.utils import timezone
 
 if TYPE_CHECKING:
+    from pendulum import DateTime
     from sqlalchemy import Session
 
     from airflow.models.dataset import DatasetEvent
@@ -134,10 +134,12 @@ class ContinuousTimetable(_TrivialTimetable):
             return None
         if last_automated_data_interval is not None:  # has already run once
             start = last_automated_data_interval.end
-            end = DateTime.utcnow()
+            end = timezone.coerce_datetime(timezone.utcnow())
         else:  # first run
             start = restriction.earliest
-            end = max(restriction.earliest, DateTime.utcnow())  # won't run 
any earlier than start_date
+            end = max(
+                restriction.earliest, 
timezone.coerce_datetime(timezone.utcnow())
+            )  # won't run any earlier than start_date
 
         if restriction.latest is not None and end > restriction.latest:
             return None
diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py
index 2a0df645da..c602c3bc12 100644
--- a/airflow/timetables/trigger.py
+++ b/airflow/timetables/trigger.py
@@ -19,13 +19,13 @@ from __future__ import annotations
 import datetime
 from typing import TYPE_CHECKING, Any
 
-from pendulum import DateTime
-
 from airflow.timetables._cron import CronMixin
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+from airflow.utils import timezone
 
 if TYPE_CHECKING:
     from dateutil.relativedelta import relativedelta
+    from pendulum import DateTime
     from pendulum.tz.timezone import FixedTimezone, Timezone
 
     from airflow.timetables.base import TimeRestriction
@@ -98,7 +98,7 @@ class CronTriggerTimetable(CronMixin, Timetable):
             else:
                 next_start_time = self._align_to_next(restriction.earliest)
         else:
-            start_time_candidates = [self._align_to_prev(DateTime.utcnow())]
+            start_time_candidates = 
[self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
             if last_automated_data_interval is not None:
                 
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
             if restriction.earliest is not None:
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 9134d16855..03aa290e8b 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -27,7 +27,6 @@ import threading
 import traceback
 import warnings
 from argparse import Namespace
-from datetime import datetime
 from pathlib import Path
 from typing import TYPE_CHECKING, Callable, TypeVar, cast
 
@@ -36,7 +35,7 @@ from sqlalchemy import select
 
 from airflow import settings
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
-from airflow.utils import cli_action_loggers
+from airflow.utils import cli_action_loggers, timezone
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 from airflow.utils.platform import getuser, is_terminal_support_colors
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -116,7 +115,7 @@ def action_cli(func=None, check_db=True):
                 metrics["error"] = e
                 raise
             finally:
-                metrics["end_datetime"] = datetime.utcnow()
+                metrics["end_datetime"] = timezone.utcnow()
                 cli_action_loggers.on_post_execution(**metrics)
 
         return cast(T, wrapper)
@@ -155,7 +154,7 @@ def _build_metrics(func_name, namespace):
 
     metrics = {
         "sub_command": func_name,
-        "start_datetime": datetime.utcnow(),
+        "start_datetime": timezone.utcnow(),
         "full_command": f"{full_command}",
         "user": getuser(),
     }
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 3b207253ec..0b8c6c35e9 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -152,14 +152,12 @@ def _dump_table_to_file(*, target_table, file_path, 
export_format, session):
 
 
 def _do_delete(*, query, orm_model, skip_archive, session):
-    from datetime import datetime
-
     import re2
 
     print("Performing Delete...")
     # using bulk delete
     # create a new table and copy the rows there
-    timestamp_str = re2.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14]
+    timestamp_str = re2.sub(r"[^\d]", "", timezone.utcnow().isoformat())[:14]
     target_table_name = 
f"{ARCHIVE_TABLE_PREFIX}{orm_model.name}__{timestamp_str}"
     print(f"Moving data to table {target_table_name}")
     bind = session.get_bind()
diff --git a/airflow/utils/jwt_signer.py b/airflow/utils/jwt_signer.py
index c52b391e35..fe4811eb82 100644
--- a/airflow/utils/jwt_signer.py
+++ b/airflow/utils/jwt_signer.py
@@ -16,11 +16,13 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime, timedelta
+from datetime import timedelta
 from typing import Any
 
 import jwt
 
+from airflow.utils import timezone
+
 
 class JWTSigner:
     """
@@ -56,9 +58,9 @@ class JWTSigner:
         """
         jwt_dict = {
             "aud": self._audience,
-            "iat": datetime.utcnow(),
-            "nbf": datetime.utcnow(),
-            "exp": datetime.utcnow() + 
timedelta(seconds=self._expiration_time_in_seconds),
+            "iat": timezone.utcnow(),
+            "nbf": timezone.utcnow(),
+            "exp": timezone.utcnow() + 
timedelta(seconds=self._expiration_time_in_seconds),
         }
         jwt_dict.update(extra_payload)
         token = jwt.encode(
diff --git a/airflow/utils/log/file_processor_handler.py 
b/airflow/utils/log/file_processor_handler.py
index eade2eadfa..f1a9bacfcb 100644
--- a/airflow/utils/log/file_processor_handler.py
+++ b/airflow/utils/log/file_processor_handler.py
@@ -23,6 +23,7 @@ from datetime import datetime
 from pathlib import Path
 
 from airflow import settings
+from airflow.utils import timezone
 from airflow.utils.helpers import parse_template_string
 from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
@@ -102,9 +103,7 @@ class FileProcessorHandler(logging.Handler):
         return self.filename_template.format(filename=ctx["filename"])
 
     def _get_log_directory(self):
-        now = datetime.utcnow()
-
-        return os.path.join(self.base_log_folder, now.strftime("%Y-%m-%d"))
+        return os.path.join(self.base_log_folder, 
timezone.utcnow().strftime("%Y-%m-%d"))
 
     def _symlink_latest_log_directory(self):
         """
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 8ac9a49e0e..152ef35954 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -60,13 +60,7 @@ def is_naive(value):
 
 def utcnow() -> dt.datetime:
     """Get the current date and time in UTC."""
-    # pendulum utcnow() is not used as that sets a TimezoneInfo object
-    # instead of a Timezone. This is not picklable and also creates issues
-    # when using replace()
-    result = dt.datetime.utcnow()
-    result = result.replace(tzinfo=utc)
-
-    return result
+    return dt.datetime.now(tz=utc)
 
 
 def utc_epoch() -> dt.datetime:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 09355bec77..783a904e2d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2975,7 +2975,7 @@ class Airflow(AirflowBaseView):
                 for (date, count) in dates.items()
             )
 
-        now = DateTime.utcnow()
+        now = timezone.utcnow()
         data = {
             "dag_states": data_dag_states,
             "start_date": (dag.start_date or now).date().isoformat(),
@@ -3561,7 +3561,7 @@ class Airflow(AirflowBaseView):
                 select(DagRun.run_type, func.count(DagRun.run_id))
                 .where(
                     DagRun.start_date >= start_date,
-                    func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) 
<= end_date,
+                    func.coalesce(DagRun.end_date, timezone.utcnow()) <= 
end_date,
                 )
                 .group_by(DagRun.run_type)
             ).all()
@@ -3570,7 +3570,7 @@ class Airflow(AirflowBaseView):
                 select(DagRun.state, func.count(DagRun.run_id))
                 .where(
                     DagRun.start_date >= start_date,
-                    func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) 
<= end_date,
+                    func.coalesce(DagRun.end_date, timezone.utcnow()) <= 
end_date,
                 )
                 .group_by(DagRun.state)
             ).all()
@@ -3581,7 +3581,7 @@ class Airflow(AirflowBaseView):
                 .join(TaskInstance.dag_run)
                 .where(
                     DagRun.start_date >= start_date,
-                    func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) 
<= end_date,
+                    func.coalesce(DagRun.end_date, timezone.utcnow()) <= 
end_date,
                 )
                 .group_by(TaskInstance.state)
             ).all()
diff --git a/tests/conftest.py b/tests/conftest.py
index 9176ab9628..43a5492380 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -23,7 +23,7 @@ import subprocess
 import sys
 import warnings
 from contextlib import ExitStack, suppress
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
 from pathlib import Path
 from typing import TYPE_CHECKING
 
@@ -584,7 +584,7 @@ def frozen_sleep(monkeypatch):
 
     def fake_sleep(seconds):
         nonlocal traveller
-        utcnow = datetime.utcnow()
+        utcnow = datetime.now(tz=timezone.utc)
         if traveller is not None:
             traveller.stop()
         traveller = time_machine.travel(utcnow + timedelta(seconds=seconds))
diff --git a/tests/dags/test_impersonation_subdag.py 
b/tests/dags/test_impersonation_subdag.py
index 541aee0816..ae4d8e6732 100644
--- a/tests/dags/test_impersonation_subdag.py
+++ b/tests/dags/test_impersonation_subdag.py
@@ -18,14 +18,14 @@
 from __future__ import annotations
 
 import warnings
-from datetime import datetime
 
 from airflow.models.dag import DAG
 from airflow.operators.bash import BashOperator
 from airflow.operators.python import PythonOperator
 from airflow.operators.subdag import SubDagOperator
+from airflow.utils import timezone
 
-DEFAULT_DATE = datetime(2016, 1, 1)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 default_args = {"owner": "airflow", "start_date": DEFAULT_DATE, "run_as_user": 
"airflow_test_user"}
 
@@ -33,7 +33,7 @@ dag = DAG(dag_id="impersonation_subdag", 
default_args=default_args)
 
 
 def print_today():
-    print(f"Today is {datetime.utcnow()}")
+    print(f"Today is {timezone.utcnow()}")
 
 
 subdag = DAG("impersonation_subdag.test_subdag_operation", 
default_args=default_args)
diff --git a/tests/dags/test_scheduler_dags.py 
b/tests/dags/test_scheduler_dags.py
index b78801adf9..e1b1bddc85 100644
--- a/tests/dags/test_scheduler_dags.py
+++ b/tests/dags/test_scheduler_dags.py
@@ -17,16 +17,17 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime, timedelta
+from datetime import timedelta
 
 from airflow.models.dag import DAG
 from airflow.operators.empty import EmptyOperator
+from airflow.utils import timezone
 
-DEFAULT_DATE = datetime(2016, 1, 1)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 # DAG tests backfill with pooled tasks
 # Previously backfill would queue the task but never run it
-dag1 = DAG(dag_id="test_start_date_scheduling", start_date=datetime.utcnow() + 
timedelta(days=1))
+dag1 = DAG(dag_id="test_start_date_scheduling", start_date=timezone.utcnow() + 
timedelta(days=1))
 dag1_task1 = EmptyOperator(task_id="dummy", dag=dag1, owner="airflow")
 
 dag2 = DAG(dag_id="test_task_start_date_scheduling", start_date=DEFAULT_DATE)
diff --git a/tests/decorators/test_external_python.py 
b/tests/decorators/test_external_python.py
index 27d8b0ed10..78b325b53d 100644
--- a/tests/decorators/test_external_python.py
+++ b/tests/decorators/test_external_python.py
@@ -139,7 +139,7 @@ class TestExternalPythonDecorator:
             return None
 
         with dag_maker():
-            ret = f(datetime.datetime.utcnow())
+            ret = f(datetime.datetime.now(tz=datetime.timezone.utc))
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
diff --git a/tests/decorators/test_python_virtualenv.py 
b/tests/decorators/test_python_virtualenv.py
index a069aee8b1..09631aafe8 100644
--- a/tests/decorators/test_python_virtualenv.py
+++ b/tests/decorators/test_python_virtualenv.py
@@ -201,7 +201,7 @@ class TestPythonVirtualenvDecorator:
             return None
 
         with dag_maker():
-            ret = f(datetime.datetime.utcnow())
+            ret = f(datetime.datetime.now(tz=datetime.timezone.utc))
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index c38fbb3d0b..18e1dd7fb9 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -431,7 +431,7 @@ def test_trigger_from_dead_triggerer(session, 
create_task_instance):
     session.add(trigger_orm)
     ti_orm = create_task_instance(
         task_id="ti_orm",
-        execution_date=datetime.datetime.utcnow(),
+        execution_date=timezone.utcnow(),
         run_id="orm_run_id",
     )
     ti_orm.trigger_id = trigger_orm.id
@@ -458,7 +458,7 @@ def test_trigger_from_expired_triggerer(session, 
create_task_instance):
     session.add(trigger_orm)
     ti_orm = create_task_instance(
         task_id="ti_orm",
-        execution_date=datetime.datetime.utcnow(),
+        execution_date=timezone.utcnow(),
         run_id="orm_run_id",
     )
     ti_orm.trigger_id = trigger_orm.id
diff --git a/tests/models/test_skipmixin.py b/tests/models/test_skipmixin.py
index 2e4f8ad0d8..f69f588bc1 100644
--- a/tests/models/test_skipmixin.py
+++ b/tests/models/test_skipmixin.py
@@ -55,7 +55,7 @@ class TestSkipMixin:
     @patch("airflow.utils.timezone.utcnow")
     def test_skip(self, mock_now, dag_maker):
         session = settings.Session()
-        now = 
datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone("UTC"))
+        now = datetime.datetime.now(tz=datetime.timezone.utc)
         mock_now.return_value = now
         with dag_maker("dag"):
             tasks = [EmptyOperator(task_id="task")]
@@ -77,7 +77,7 @@ class TestSkipMixin:
     @patch("airflow.utils.timezone.utcnow")
     def test_skip_none_dagrun(self, mock_now, dag_maker):
         session = settings.Session()
-        now = 
datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone("UTC"))
+        now = datetime.datetime.now(tz=pendulum.timezone("UTC"))
         mock_now.return_value = now
         with dag_maker(
             "dag",
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index 4903081747..b7629c6043 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -67,7 +67,7 @@ class TestBashOperator:
         """
         Test that env variables are exported correctly to the task bash 
environment.
         """
-        utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
+        utc_now = datetime.now(tz=timezone.utc)
         expected = (
             f"{expected_airflow_home}\n"
             "AWESOME_PYTHONPATH\n"
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 6c73e847bb..ba008ee1ca 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -26,7 +26,7 @@ import sys
 import tempfile
 import warnings
 from collections import namedtuple
-from datetime import date, datetime, timedelta
+from datetime import date, datetime, timedelta, timezone as _timezone
 from functools import partial
 from subprocess import CalledProcessError
 from tempfile import TemporaryDirectory
@@ -801,7 +801,7 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
         def f(_):
             return None
 
-        self.run_as_task(f, op_args=[datetime.utcnow()])
+        self.run_as_task(f, op_args=[datetime.now(tz=_timezone.utc)])
 
     def test_context(self):
         def f(templates_dict):
diff --git a/tests/serialization/test_serialized_objects.py 
b/tests/serialization/test_serialized_objects.py
index a40e0d01ea..29a07c8ab4 100644
--- a/tests/serialization/test_serialized_objects.py
+++ b/tests/serialization/test_serialized_objects.py
@@ -44,6 +44,7 @@ from airflow.serialization.pydantic.job import JobPydantic
 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
 from airflow.serialization.pydantic.tasklog import LogTemplatePydantic
 from airflow.settings import _ENABLE_AIP_44
+from airflow.utils import timezone
 from airflow.utils.operator_resources import Resources
 from airflow.utils.state import DagRunState, State
 from airflow.utils.task_group import TaskGroup
@@ -113,14 +114,14 @@ TI_WITH_START_DAY = TaskInstance(
     run_id="fake_run",
     state=State.RUNNING,
 )
-TI_WITH_START_DAY.start_date = datetime.utcnow()
+TI_WITH_START_DAY.start_date = timezone.utcnow()
 
 DAG_RUN = DagRun(
     dag_id="test_dag_id",
     run_id="test_dag_run_id",
     run_type=DagRunType.MANUAL,
-    execution_date=datetime.utcnow(),
-    start_date=datetime.utcnow(),
+    execution_date=timezone.utcnow(),
+    start_date=timezone.utcnow(),
     external_trigger=True,
     state=DagRunState.SUCCESS,
 )
@@ -140,7 +141,7 @@ def equal_time(a: datetime, b: datetime) -> bool:
     [
         ("test_str", None, equals),
         (1, None, equals),
-        (datetime.utcnow(), DAT.DATETIME, equal_time),
+        (timezone.utcnow(), DAT.DATETIME, equal_time),
         (timedelta(minutes=2), DAT.TIMEDELTA, equals),
         (Timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name),
         (relativedelta.relativedelta(hours=+1), DAT.RELATIVEDELTA, lambda a, 
b: a.hours == b.hours),
@@ -151,7 +152,7 @@ def equal_time(a: datetime, b: datetime) -> bool:
         (
             k8s.V1Pod(
                 metadata=k8s.V1ObjectMeta(
-                    name="test", annotations={"test": "annotation"}, 
creation_timestamp=datetime.utcnow()
+                    name="test", annotations={"test": "annotation"}, 
creation_timestamp=timezone.utcnow()
                 )
             ),
             DAT.POD,
@@ -162,7 +163,7 @@ def equal_time(a: datetime, b: datetime) -> bool:
                 "fake-dag",
                 schedule="*/10 * * * *",
                 default_args={"depends_on_past": True},
-                start_date=datetime.utcnow(),
+                start_date=timezone.utcnow(),
                 catchup=False,
             ),
             DAT.DAG,
@@ -241,7 +242,7 @@ def test_backcompat_deserialize_connection(conn_uri):
     "input, pydantic_class, encoded_type, cmp_func",
     [
         (
-            Job(state=State.RUNNING, latest_heartbeat=datetime.utcnow()),
+            Job(state=State.RUNNING, latest_heartbeat=timezone.utcnow()),
             JobPydantic,
             DAT.BASE_JOB,
             lambda a, b: equal_time(a.latest_heartbeat, b.latest_heartbeat),
diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py
index cad05ab3dc..03056d3e20 100644
--- a/tests/utils/test_cli_util.py
+++ b/tests/utils/test_cli_util.py
@@ -23,7 +23,6 @@ import os
 import sys
 from argparse import Namespace
 from contextlib import contextmanager
-from datetime import datetime
 from pathlib import Path
 from unittest import mock
 
@@ -56,7 +55,7 @@ class TestCliUtil:
         for k, v in expected.items():
             assert v == metrics.get(k)
 
-        assert metrics.get("start_datetime") <= datetime.utcnow()
+        assert metrics.get("start_datetime") <= timezone.utcnow()
         assert metrics.get("full_command")
 
     def test_fail_function(self):
@@ -147,7 +146,7 @@ class TestCliUtil:
 
             log = session.query(Log).order_by(Log.dttm.desc()).first()
 
-        assert metrics.get("start_datetime") <= datetime.utcnow()
+        assert metrics.get("start_datetime") <= timezone.utcnow()
 
         command: str = json.loads(log.extra).get("full_command")
         # Replace single quotes to double quotes to avoid json decode error
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 7c600d237d..4716870462 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 from contextlib import suppress
-from datetime import datetime
 from importlib import import_module
 from io import StringIO
 from pathlib import Path
@@ -34,6 +33,7 @@ from sqlalchemy.ext.declarative import DeclarativeMeta
 from airflow.exceptions import AirflowException
 from airflow.models import DagModel, DagRun, TaskInstance
 from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
 from airflow.utils.db_cleanup import (
     ARCHIVE_TABLE_PREFIX,
     CreateTableAs,
@@ -354,13 +354,13 @@ class TestDBCleanup:
         Ensure every table we have configured (and that is present in the db) 
can be cleaned successfully.
         For example, this checks that the recency column is actually a column.
         """
-        run_cleanup(clean_before_timestamp=datetime.utcnow(), dry_run=True)
+        run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True)
         assert "Encountered error when attempting to clean table" not in 
caplog.text
 
         # Lets check we have the right error message just in case
         caplog.clear()
         with patch("airflow.utils.db_cleanup._cleanup_table", 
side_effect=OperationalError("oops", {}, None)):
-            run_cleanup(clean_before_timestamp=datetime.utcnow(), 
table_names=["task_instance"], dry_run=True)
+            run_cleanup(clean_before_timestamp=timezone.utcnow(), 
table_names=["task_instance"], dry_run=True)
         assert "Encountered error when attempting to clean table" in 
caplog.text
 
     @pytest.mark.parametrize(
diff --git a/tests/utils/test_serve_logs.py b/tests/utils/test_serve_logs.py
index 574686a415..b6756502c1 100644
--- a/tests/utils/test_serve_logs.py
+++ b/tests/utils/test_serve_logs.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-import datetime
+from datetime import timedelta
 from pathlib import Path
 from typing import TYPE_CHECKING
 
@@ -25,6 +25,7 @@ import pytest
 import time_machine
 
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.utils import timezone
 from airflow.utils.jwt_signer import JWTSigner
 from airflow.utils.serve_logs import create_app
 from tests.test_utils.config import conf_vars
@@ -135,7 +136,7 @@ class TestServeLogs:
         )
 
     def test_forbidden_future(self, client: FlaskClient, signer):
-        with time_machine.travel(datetime.datetime.utcnow() + 
datetime.timedelta(seconds=3600)):
+        with time_machine.travel(timezone.utcnow() + timedelta(seconds=3600)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -148,7 +149,7 @@ class TestServeLogs:
         )
 
     def test_ok_with_short_future_skew(self, client: FlaskClient, signer):
-        with time_machine.travel(datetime.datetime.utcnow() + 
datetime.timedelta(seconds=1)):
+        with time_machine.travel(timezone.utcnow() + timedelta(seconds=1)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -161,7 +162,7 @@ class TestServeLogs:
         )
 
     def test_ok_with_short_past_skew(self, client: FlaskClient, signer):
-        with time_machine.travel(datetime.datetime.utcnow() - 
datetime.timedelta(seconds=31)):
+        with time_machine.travel(timezone.utcnow() - timedelta(seconds=31)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -174,7 +175,7 @@ class TestServeLogs:
         )
 
     def test_forbidden_with_long_future_skew(self, client: FlaskClient, 
signer):
-        with time_machine.travel(datetime.datetime.utcnow() + 
datetime.timedelta(seconds=10)):
+        with time_machine.travel(timezone.utcnow() + timedelta(seconds=10)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -187,7 +188,7 @@ class TestServeLogs:
         )
 
     def test_forbidden_with_long_past_skew(self, client: FlaskClient, signer):
-        with time_machine.travel(datetime.datetime.utcnow() - 
datetime.timedelta(seconds=40)):
+        with time_machine.travel(timezone.utcnow() - timedelta(seconds=40)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -214,9 +215,9 @@ class TestServeLogs:
     def test_missing_claims(self, claim_to_remove: str, client: FlaskClient, 
secret_key):
         jwt_dict = {
             "aud": "task-instance-logs",
-            "iat": datetime.datetime.utcnow(),
-            "nbf": datetime.datetime.utcnow(),
-            "exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=30),
+            "iat": timezone.utcnow(),
+            "nbf": timezone.utcnow(),
+            "exp": timezone.utcnow() + timedelta(seconds=30),
         }
         del jwt_dict[claim_to_remove]
         jwt_dict.update({"filename": "sample.log"})

Reply via email to