This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2f07c6b717 Replace `datetime.datetime.utcnow` by
`airflow.utils.timezone.utcnow` in core (#35448)
2f07c6b717 is described below
commit 2f07c6b7173bf6909db6c25e812bfa95018d8182
Author: Andrey Anshin <[email protected]>
AuthorDate: Wed Jan 31 14:52:33 2024 +0400
Replace `datetime.datetime.utcnow` by `airflow.utils.timezone.utcnow` in
core (#35448)
---
airflow/secrets/cache.py | 5 +++--
airflow/task/task_runner/cgroup_task_runner.py | 4 ++--
airflow/timetables/interval.py | 6 +++---
airflow/timetables/simple.py | 10 ++++++----
airflow/timetables/trigger.py | 6 +++---
airflow/utils/cli.py | 7 +++----
airflow/utils/db_cleanup.py | 4 +---
airflow/utils/jwt_signer.py | 10 ++++++----
airflow/utils/log/file_processor_handler.py | 5 ++---
airflow/utils/timezone.py | 8 +-------
airflow/www/views.py | 8 ++++----
tests/conftest.py | 4 ++--
tests/dags/test_impersonation_subdag.py | 6 +++---
tests/dags/test_scheduler_dags.py | 7 ++++---
tests/decorators/test_external_python.py | 2 +-
tests/decorators/test_python_virtualenv.py | 2 +-
tests/jobs/test_triggerer_job.py | 4 ++--
tests/models/test_skipmixin.py | 4 ++--
tests/operators/test_bash.py | 2 +-
tests/operators/test_python.py | 4 ++--
tests/serialization/test_serialized_objects.py | 15 ++++++++-------
tests/utils/test_cli_util.py | 5 ++---
tests/utils/test_db_cleanup.py | 6 +++---
tests/utils/test_serve_logs.py | 19 ++++++++++---------
24 files changed, 75 insertions(+), 78 deletions(-)
diff --git a/airflow/secrets/cache.py b/airflow/secrets/cache.py
index 25b315efcd..9902a05f03 100644
--- a/airflow/secrets/cache.py
+++ b/airflow/secrets/cache.py
@@ -21,6 +21,7 @@ import datetime
import multiprocessing
from airflow.configuration import conf
+from airflow.utils import timezone
class SecretCache:
@@ -36,10 +37,10 @@ class SecretCache:
class _CacheValue:
def __init__(self, value: str | None) -> None:
self.value = value
- self.date = datetime.datetime.utcnow()
+ self.date = timezone.utcnow()
def is_expired(self, ttl: datetime.timedelta) -> bool:
- return datetime.datetime.utcnow() - self.date > ttl
+ return timezone.utcnow() - self.date > ttl
_VARIABLE_PREFIX = "__v_"
_CONNECTION_PREFIX = "__c_"
diff --git a/airflow/task/task_runner/cgroup_task_runner.py
b/airflow/task/task_runner/cgroup_task_runner.py
index 7034058c5b..3ec1fccaa5 100644
--- a/airflow/task/task_runner/cgroup_task_runner.py
+++ b/airflow/task/task_runner/cgroup_task_runner.py
@@ -18,7 +18,6 @@
"""Task runner for cgroup to run Airflow task."""
from __future__ import annotations
-import datetime
import os
import uuid
from typing import TYPE_CHECKING
@@ -27,6 +26,7 @@ import psutil
from cgroupspy import trees
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
+from airflow.utils import timezone
from airflow.utils.operator_resources import Resources
from airflow.utils.platform import getuser
from airflow.utils.process_utils import reap_process_group
@@ -137,7 +137,7 @@ class CgroupTaskRunner(BaseTaskRunner):
return
# Create a unique cgroup name
- cgroup_name =
f"airflow/{datetime.datetime.utcnow():%Y-%m-%d}/{uuid.uuid4()}"
+ cgroup_name = f"airflow/{timezone.utcnow():%Y-%m-%d}/{uuid.uuid4()}"
self.mem_cgroup_name = f"memory/{cgroup_name}"
self.cpu_cgroup_name = f"cpu/{cgroup_name}"
diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index 6850268de3..02264a9bef 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -25,7 +25,7 @@ from pendulum import DateTime
from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
-from airflow.utils.timezone import convert_to_utc
+from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow
if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
@@ -146,7 +146,7 @@ class CronDataIntervalTimetable(CronMixin,
_DataIntervalTimetable):
If the next schedule should start *right now*, we want the data
interval
that start now, not the one that ends now.
"""
- current_time = DateTime.utcnow()
+ current_time = coerce_datetime(utcnow())
last_start = self._get_prev(current_time)
next_start = self._get_next(last_start)
if next_start == current_time: # Current time is on interval boundary.
@@ -257,7 +257,7 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
This is slightly different from the cron version at terminal values.
"""
- round_current_time = self._round(DateTime.utcnow())
+ round_current_time = self._round(coerce_datetime(utcnow()))
new_start = self._get_prev(round_current_time)
if earliest is None:
return new_start
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index f215b81bd1..f8ee8474af 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -19,11 +19,11 @@ from __future__ import annotations
import operator
from typing import TYPE_CHECKING, Any, Collection
-from pendulum import DateTime
-
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+from airflow.utils import timezone
if TYPE_CHECKING:
+ from pendulum import DateTime
from sqlalchemy import Session
from airflow.models.dataset import DatasetEvent
@@ -134,10 +134,12 @@ class ContinuousTimetable(_TrivialTimetable):
return None
if last_automated_data_interval is not None: # has already run once
start = last_automated_data_interval.end
- end = DateTime.utcnow()
+ end = timezone.coerce_datetime(timezone.utcnow())
else: # first run
start = restriction.earliest
- end = max(restriction.earliest, DateTime.utcnow()) # won't run
any earlier than start_date
+ end = max(
+ restriction.earliest,
timezone.coerce_datetime(timezone.utcnow())
+ ) # won't run any earlier than start_date
if restriction.latest is not None and end > restriction.latest:
return None
diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py
index 2a0df645da..c602c3bc12 100644
--- a/airflow/timetables/trigger.py
+++ b/airflow/timetables/trigger.py
@@ -19,13 +19,13 @@ from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Any
-from pendulum import DateTime
-
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+from airflow.utils import timezone
if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
+ from pendulum import DateTime
from pendulum.tz.timezone import FixedTimezone, Timezone
from airflow.timetables.base import TimeRestriction
@@ -98,7 +98,7 @@ class CronTriggerTimetable(CronMixin, Timetable):
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
- start_time_candidates = [self._align_to_prev(DateTime.utcnow())]
+ start_time_candidates =
[self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
if restriction.earliest is not None:
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 9134d16855..03aa290e8b 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -27,7 +27,6 @@ import threading
import traceback
import warnings
from argparse import Namespace
-from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, TypeVar, cast
@@ -36,7 +35,7 @@ from sqlalchemy import select
from airflow import settings
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
-from airflow.utils import cli_action_loggers
+from airflow.utils import cli_action_loggers, timezone
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.platform import getuser, is_terminal_support_colors
from airflow.utils.session import NEW_SESSION, provide_session
@@ -116,7 +115,7 @@ def action_cli(func=None, check_db=True):
metrics["error"] = e
raise
finally:
- metrics["end_datetime"] = datetime.utcnow()
+ metrics["end_datetime"] = timezone.utcnow()
cli_action_loggers.on_post_execution(**metrics)
return cast(T, wrapper)
@@ -155,7 +154,7 @@ def _build_metrics(func_name, namespace):
metrics = {
"sub_command": func_name,
- "start_datetime": datetime.utcnow(),
+ "start_datetime": timezone.utcnow(),
"full_command": f"{full_command}",
"user": getuser(),
}
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 3b207253ec..0b8c6c35e9 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -152,14 +152,12 @@ def _dump_table_to_file(*, target_table, file_path,
export_format, session):
def _do_delete(*, query, orm_model, skip_archive, session):
- from datetime import datetime
-
import re2
print("Performing Delete...")
# using bulk delete
# create a new table and copy the rows there
- timestamp_str = re2.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14]
+ timestamp_str = re2.sub(r"[^\d]", "", timezone.utcnow().isoformat())[:14]
target_table_name =
f"{ARCHIVE_TABLE_PREFIX}{orm_model.name}__{timestamp_str}"
print(f"Moving data to table {target_table_name}")
bind = session.get_bind()
diff --git a/airflow/utils/jwt_signer.py b/airflow/utils/jwt_signer.py
index c52b391e35..fe4811eb82 100644
--- a/airflow/utils/jwt_signer.py
+++ b/airflow/utils/jwt_signer.py
@@ -16,11 +16,13 @@
# under the License.
from __future__ import annotations
-from datetime import datetime, timedelta
+from datetime import timedelta
from typing import Any
import jwt
+from airflow.utils import timezone
+
class JWTSigner:
"""
@@ -56,9 +58,9 @@ class JWTSigner:
"""
jwt_dict = {
"aud": self._audience,
- "iat": datetime.utcnow(),
- "nbf": datetime.utcnow(),
- "exp": datetime.utcnow() +
timedelta(seconds=self._expiration_time_in_seconds),
+ "iat": timezone.utcnow(),
+ "nbf": timezone.utcnow(),
+ "exp": timezone.utcnow() +
timedelta(seconds=self._expiration_time_in_seconds),
}
jwt_dict.update(extra_payload)
token = jwt.encode(
diff --git a/airflow/utils/log/file_processor_handler.py
b/airflow/utils/log/file_processor_handler.py
index eade2eadfa..f1a9bacfcb 100644
--- a/airflow/utils/log/file_processor_handler.py
+++ b/airflow/utils/log/file_processor_handler.py
@@ -23,6 +23,7 @@ from datetime import datetime
from pathlib import Path
from airflow import settings
+from airflow.utils import timezone
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
@@ -102,9 +103,7 @@ class FileProcessorHandler(logging.Handler):
return self.filename_template.format(filename=ctx["filename"])
def _get_log_directory(self):
- now = datetime.utcnow()
-
- return os.path.join(self.base_log_folder, now.strftime("%Y-%m-%d"))
+ return os.path.join(self.base_log_folder,
timezone.utcnow().strftime("%Y-%m-%d"))
def _symlink_latest_log_directory(self):
"""
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 8ac9a49e0e..152ef35954 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -60,13 +60,7 @@ def is_naive(value):
def utcnow() -> dt.datetime:
"""Get the current date and time in UTC."""
- # pendulum utcnow() is not used as that sets a TimezoneInfo object
- # instead of a Timezone. This is not picklable and also creates issues
- # when using replace()
- result = dt.datetime.utcnow()
- result = result.replace(tzinfo=utc)
-
- return result
+ return dt.datetime.now(tz=utc)
def utc_epoch() -> dt.datetime:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 09355bec77..783a904e2d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2975,7 +2975,7 @@ class Airflow(AirflowBaseView):
for (date, count) in dates.items()
)
- now = DateTime.utcnow()
+ now = timezone.utcnow()
data = {
"dag_states": data_dag_states,
"start_date": (dag.start_date or now).date().isoformat(),
@@ -3561,7 +3561,7 @@ class Airflow(AirflowBaseView):
select(DagRun.run_type, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
- func.coalesce(DagRun.end_date, datetime.datetime.utcnow())
<= end_date,
+ func.coalesce(DagRun.end_date, timezone.utcnow()) <=
end_date,
)
.group_by(DagRun.run_type)
).all()
@@ -3570,7 +3570,7 @@ class Airflow(AirflowBaseView):
select(DagRun.state, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
- func.coalesce(DagRun.end_date, datetime.datetime.utcnow())
<= end_date,
+ func.coalesce(DagRun.end_date, timezone.utcnow()) <=
end_date,
)
.group_by(DagRun.state)
).all()
@@ -3581,7 +3581,7 @@ class Airflow(AirflowBaseView):
.join(TaskInstance.dag_run)
.where(
DagRun.start_date >= start_date,
- func.coalesce(DagRun.end_date, datetime.datetime.utcnow())
<= end_date,
+ func.coalesce(DagRun.end_date, timezone.utcnow()) <=
end_date,
)
.group_by(TaskInstance.state)
).all()
diff --git a/tests/conftest.py b/tests/conftest.py
index 9176ab9628..43a5492380 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -23,7 +23,7 @@ import subprocess
import sys
import warnings
from contextlib import ExitStack, suppress
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING
@@ -584,7 +584,7 @@ def frozen_sleep(monkeypatch):
def fake_sleep(seconds):
nonlocal traveller
- utcnow = datetime.utcnow()
+ utcnow = datetime.now(tz=timezone.utc)
if traveller is not None:
traveller.stop()
traveller = time_machine.travel(utcnow + timedelta(seconds=seconds))
diff --git a/tests/dags/test_impersonation_subdag.py
b/tests/dags/test_impersonation_subdag.py
index 541aee0816..ae4d8e6732 100644
--- a/tests/dags/test_impersonation_subdag.py
+++ b/tests/dags/test_impersonation_subdag.py
@@ -18,14 +18,14 @@
from __future__ import annotations
import warnings
-from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
+from airflow.utils import timezone
-DEFAULT_DATE = datetime(2016, 1, 1)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
default_args = {"owner": "airflow", "start_date": DEFAULT_DATE, "run_as_user":
"airflow_test_user"}
@@ -33,7 +33,7 @@ dag = DAG(dag_id="impersonation_subdag",
default_args=default_args)
def print_today():
- print(f"Today is {datetime.utcnow()}")
+ print(f"Today is {timezone.utcnow()}")
subdag = DAG("impersonation_subdag.test_subdag_operation",
default_args=default_args)
diff --git a/tests/dags/test_scheduler_dags.py
b/tests/dags/test_scheduler_dags.py
index b78801adf9..e1b1bddc85 100644
--- a/tests/dags/test_scheduler_dags.py
+++ b/tests/dags/test_scheduler_dags.py
@@ -17,16 +17,17 @@
# under the License.
from __future__ import annotations
-from datetime import datetime, timedelta
+from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
+from airflow.utils import timezone
-DEFAULT_DATE = datetime(2016, 1, 1)
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
-dag1 = DAG(dag_id="test_start_date_scheduling", start_date=datetime.utcnow() +
timedelta(days=1))
+dag1 = DAG(dag_id="test_start_date_scheduling", start_date=timezone.utcnow() +
timedelta(days=1))
dag1_task1 = EmptyOperator(task_id="dummy", dag=dag1, owner="airflow")
dag2 = DAG(dag_id="test_task_start_date_scheduling", start_date=DEFAULT_DATE)
diff --git a/tests/decorators/test_external_python.py
b/tests/decorators/test_external_python.py
index 27d8b0ed10..78b325b53d 100644
--- a/tests/decorators/test_external_python.py
+++ b/tests/decorators/test_external_python.py
@@ -139,7 +139,7 @@ class TestExternalPythonDecorator:
return None
with dag_maker():
- ret = f(datetime.datetime.utcnow())
+ ret = f(datetime.datetime.now(tz=datetime.timezone.utc))
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
diff --git a/tests/decorators/test_python_virtualenv.py
b/tests/decorators/test_python_virtualenv.py
index a069aee8b1..09631aafe8 100644
--- a/tests/decorators/test_python_virtualenv.py
+++ b/tests/decorators/test_python_virtualenv.py
@@ -201,7 +201,7 @@ class TestPythonVirtualenvDecorator:
return None
with dag_maker():
- ret = f(datetime.datetime.utcnow())
+ ret = f(datetime.datetime.now(tz=datetime.timezone.utc))
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index c38fbb3d0b..18e1dd7fb9 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -431,7 +431,7 @@ def test_trigger_from_dead_triggerer(session,
create_task_instance):
session.add(trigger_orm)
ti_orm = create_task_instance(
task_id="ti_orm",
- execution_date=datetime.datetime.utcnow(),
+ execution_date=timezone.utcnow(),
run_id="orm_run_id",
)
ti_orm.trigger_id = trigger_orm.id
@@ -458,7 +458,7 @@ def test_trigger_from_expired_triggerer(session,
create_task_instance):
session.add(trigger_orm)
ti_orm = create_task_instance(
task_id="ti_orm",
- execution_date=datetime.datetime.utcnow(),
+ execution_date=timezone.utcnow(),
run_id="orm_run_id",
)
ti_orm.trigger_id = trigger_orm.id
diff --git a/tests/models/test_skipmixin.py b/tests/models/test_skipmixin.py
index 2e4f8ad0d8..f69f588bc1 100644
--- a/tests/models/test_skipmixin.py
+++ b/tests/models/test_skipmixin.py
@@ -55,7 +55,7 @@ class TestSkipMixin:
@patch("airflow.utils.timezone.utcnow")
def test_skip(self, mock_now, dag_maker):
session = settings.Session()
- now =
datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone("UTC"))
+ now = datetime.datetime.now(tz=datetime.timezone.utc)
mock_now.return_value = now
with dag_maker("dag"):
tasks = [EmptyOperator(task_id="task")]
@@ -77,7 +77,7 @@ class TestSkipMixin:
@patch("airflow.utils.timezone.utcnow")
def test_skip_none_dagrun(self, mock_now, dag_maker):
session = settings.Session()
- now =
datetime.datetime.utcnow().replace(tzinfo=pendulum.timezone("UTC"))
+ now = datetime.datetime.now(tz=pendulum.timezone("UTC"))
mock_now.return_value = now
with dag_maker(
"dag",
diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py
index 4903081747..b7629c6043 100644
--- a/tests/operators/test_bash.py
+++ b/tests/operators/test_bash.py
@@ -67,7 +67,7 @@ class TestBashOperator:
"""
Test that env variables are exported correctly to the task bash
environment.
"""
- utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
+ utc_now = datetime.now(tz=timezone.utc)
expected = (
f"{expected_airflow_home}\n"
"AWESOME_PYTHONPATH\n"
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 6c73e847bb..ba008ee1ca 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -26,7 +26,7 @@ import sys
import tempfile
import warnings
from collections import namedtuple
-from datetime import date, datetime, timedelta
+from datetime import date, datetime, timedelta, timezone as _timezone
from functools import partial
from subprocess import CalledProcessError
from tempfile import TemporaryDirectory
@@ -801,7 +801,7 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
def f(_):
return None
- self.run_as_task(f, op_args=[datetime.utcnow()])
+ self.run_as_task(f, op_args=[datetime.now(tz=_timezone.utc)])
def test_context(self):
def f(templates_dict):
diff --git a/tests/serialization/test_serialized_objects.py
b/tests/serialization/test_serialized_objects.py
index a40e0d01ea..29a07c8ab4 100644
--- a/tests/serialization/test_serialized_objects.py
+++ b/tests/serialization/test_serialized_objects.py
@@ -44,6 +44,7 @@ from airflow.serialization.pydantic.job import JobPydantic
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.pydantic.tasklog import LogTemplatePydantic
from airflow.settings import _ENABLE_AIP_44
+from airflow.utils import timezone
from airflow.utils.operator_resources import Resources
from airflow.utils.state import DagRunState, State
from airflow.utils.task_group import TaskGroup
@@ -113,14 +114,14 @@ TI_WITH_START_DAY = TaskInstance(
run_id="fake_run",
state=State.RUNNING,
)
-TI_WITH_START_DAY.start_date = datetime.utcnow()
+TI_WITH_START_DAY.start_date = timezone.utcnow()
DAG_RUN = DagRun(
dag_id="test_dag_id",
run_id="test_dag_run_id",
run_type=DagRunType.MANUAL,
- execution_date=datetime.utcnow(),
- start_date=datetime.utcnow(),
+ execution_date=timezone.utcnow(),
+ start_date=timezone.utcnow(),
external_trigger=True,
state=DagRunState.SUCCESS,
)
@@ -140,7 +141,7 @@ def equal_time(a: datetime, b: datetime) -> bool:
[
("test_str", None, equals),
(1, None, equals),
- (datetime.utcnow(), DAT.DATETIME, equal_time),
+ (timezone.utcnow(), DAT.DATETIME, equal_time),
(timedelta(minutes=2), DAT.TIMEDELTA, equals),
(Timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name),
(relativedelta.relativedelta(hours=+1), DAT.RELATIVEDELTA, lambda a,
b: a.hours == b.hours),
@@ -151,7 +152,7 @@ def equal_time(a: datetime, b: datetime) -> bool:
(
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
- name="test", annotations={"test": "annotation"},
creation_timestamp=datetime.utcnow()
+ name="test", annotations={"test": "annotation"},
creation_timestamp=timezone.utcnow()
)
),
DAT.POD,
@@ -162,7 +163,7 @@ def equal_time(a: datetime, b: datetime) -> bool:
"fake-dag",
schedule="*/10 * * * *",
default_args={"depends_on_past": True},
- start_date=datetime.utcnow(),
+ start_date=timezone.utcnow(),
catchup=False,
),
DAT.DAG,
@@ -241,7 +242,7 @@ def test_backcompat_deserialize_connection(conn_uri):
"input, pydantic_class, encoded_type, cmp_func",
[
(
- Job(state=State.RUNNING, latest_heartbeat=datetime.utcnow()),
+ Job(state=State.RUNNING, latest_heartbeat=timezone.utcnow()),
JobPydantic,
DAT.BASE_JOB,
lambda a, b: equal_time(a.latest_heartbeat, b.latest_heartbeat),
diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py
index cad05ab3dc..03056d3e20 100644
--- a/tests/utils/test_cli_util.py
+++ b/tests/utils/test_cli_util.py
@@ -23,7 +23,6 @@ import os
import sys
from argparse import Namespace
from contextlib import contextmanager
-from datetime import datetime
from pathlib import Path
from unittest import mock
@@ -56,7 +55,7 @@ class TestCliUtil:
for k, v in expected.items():
assert v == metrics.get(k)
- assert metrics.get("start_datetime") <= datetime.utcnow()
+ assert metrics.get("start_datetime") <= timezone.utcnow()
assert metrics.get("full_command")
def test_fail_function(self):
@@ -147,7 +146,7 @@ class TestCliUtil:
log = session.query(Log).order_by(Log.dttm.desc()).first()
- assert metrics.get("start_datetime") <= datetime.utcnow()
+ assert metrics.get("start_datetime") <= timezone.utcnow()
command: str = json.loads(log.extra).get("full_command")
# Replace single quotes to double quotes to avoid json decode error
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 7c600d237d..4716870462 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -18,7 +18,6 @@
from __future__ import annotations
from contextlib import suppress
-from datetime import datetime
from importlib import import_module
from io import StringIO
from pathlib import Path
@@ -34,6 +33,7 @@ from sqlalchemy.ext.declarative import DeclarativeMeta
from airflow.exceptions import AirflowException
from airflow.models import DagModel, DagRun, TaskInstance
from airflow.operators.python import PythonOperator
+from airflow.utils import timezone
from airflow.utils.db_cleanup import (
ARCHIVE_TABLE_PREFIX,
CreateTableAs,
@@ -354,13 +354,13 @@ class TestDBCleanup:
Ensure every table we have configured (and that is present in the db)
can be cleaned successfully.
For example, this checks that the recency column is actually a column.
"""
- run_cleanup(clean_before_timestamp=datetime.utcnow(), dry_run=True)
+ run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True)
assert "Encountered error when attempting to clean table" not in
caplog.text
# Lets check we have the right error message just in case
caplog.clear()
with patch("airflow.utils.db_cleanup._cleanup_table",
side_effect=OperationalError("oops", {}, None)):
- run_cleanup(clean_before_timestamp=datetime.utcnow(),
table_names=["task_instance"], dry_run=True)
+ run_cleanup(clean_before_timestamp=timezone.utcnow(),
table_names=["task_instance"], dry_run=True)
assert "Encountered error when attempting to clean table" in
caplog.text
@pytest.mark.parametrize(
diff --git a/tests/utils/test_serve_logs.py b/tests/utils/test_serve_logs.py
index 574686a415..b6756502c1 100644
--- a/tests/utils/test_serve_logs.py
+++ b/tests/utils/test_serve_logs.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-import datetime
+from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING
@@ -25,6 +25,7 @@ import pytest
import time_machine
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
+from airflow.utils import timezone
from airflow.utils.jwt_signer import JWTSigner
from airflow.utils.serve_logs import create_app
from tests.test_utils.config import conf_vars
@@ -135,7 +136,7 @@ class TestServeLogs:
)
def test_forbidden_future(self, client: FlaskClient, signer):
- with time_machine.travel(datetime.datetime.utcnow() +
datetime.timedelta(seconds=3600)):
+ with time_machine.travel(timezone.utcnow() + timedelta(seconds=3600)):
token = signer.generate_signed_token({"filename": "sample.log"})
assert (
client.get(
@@ -148,7 +149,7 @@ class TestServeLogs:
)
def test_ok_with_short_future_skew(self, client: FlaskClient, signer):
- with time_machine.travel(datetime.datetime.utcnow() +
datetime.timedelta(seconds=1)):
+ with time_machine.travel(timezone.utcnow() + timedelta(seconds=1)):
token = signer.generate_signed_token({"filename": "sample.log"})
assert (
client.get(
@@ -161,7 +162,7 @@ class TestServeLogs:
)
def test_ok_with_short_past_skew(self, client: FlaskClient, signer):
- with time_machine.travel(datetime.datetime.utcnow() -
datetime.timedelta(seconds=31)):
+ with time_machine.travel(timezone.utcnow() - timedelta(seconds=31)):
token = signer.generate_signed_token({"filename": "sample.log"})
assert (
client.get(
@@ -174,7 +175,7 @@ class TestServeLogs:
)
def test_forbidden_with_long_future_skew(self, client: FlaskClient,
signer):
- with time_machine.travel(datetime.datetime.utcnow() +
datetime.timedelta(seconds=10)):
+ with time_machine.travel(timezone.utcnow() + timedelta(seconds=10)):
token = signer.generate_signed_token({"filename": "sample.log"})
assert (
client.get(
@@ -187,7 +188,7 @@ class TestServeLogs:
)
def test_forbidden_with_long_past_skew(self, client: FlaskClient, signer):
- with time_machine.travel(datetime.datetime.utcnow() -
datetime.timedelta(seconds=40)):
+ with time_machine.travel(timezone.utcnow() - timedelta(seconds=40)):
token = signer.generate_signed_token({"filename": "sample.log"})
assert (
client.get(
@@ -214,9 +215,9 @@ class TestServeLogs:
def test_missing_claims(self, claim_to_remove: str, client: FlaskClient,
secret_key):
jwt_dict = {
"aud": "task-instance-logs",
- "iat": datetime.datetime.utcnow(),
- "nbf": datetime.datetime.utcnow(),
- "exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=30),
+ "iat": timezone.utcnow(),
+ "nbf": timezone.utcnow(),
+ "exp": timezone.utcnow() + timedelta(seconds=30),
}
del jwt_dict[claim_to_remove]
jwt_dict.update({"filename": "sample.log"})