This is an automated email from the ASF dual-hosted git repository.
bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8756128338e fix(airflowctl): timezone aware and consistent
serialisation for datetime (#55282)
8756128338e is described below
commit 8756128338e01e3397ee445300de69e3ee94f8d8
Author: Bugra Ozturk <[email protected]>
AuthorDate: Wed Sep 10 17:15:23 2025 +0200
fix(airflowctl): timezone aware and consistent serialisation for datetime
(#55282)
* fix(airflowctl): Introduce _date_safe_dict_from_pydantic for consistent
serialization and make datetime timezone aware via shared code
* fix(airflowctl): Improve docstring clarity for
_date_safe_dict_from_pydantic function
* feat(airflowctl): Add shared timezones dependency and configuration
* fix(airflowctl): Update import path for timezone parsing to align with
shared module structure and add documentation for datetime usage
* fix(airflowctl): Update example in documentation and improve argument
type handling in CLI configuration
* fix(airflowctl): Enhance type handling in CLI configuration and update
test cases for argument types
* Update airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
---
airflow-ctl/docs/howto/index.rst | 12 ++++-
airflow-ctl/pyproject.toml | 11 +++++
airflow-ctl/src/airflowctl/_shared/timezones | 1 +
airflow-ctl/src/airflowctl/api/operations.py | 54 ++++++++++++++++------
airflow-ctl/src/airflowctl/ctl/cli_config.py | 21 +++++----
.../tests/airflow_ctl/ctl/test_cli_config.py | 13 ++----
6 files changed, 80 insertions(+), 32 deletions(-)
diff --git a/airflow-ctl/docs/howto/index.rst b/airflow-ctl/docs/howto/index.rst
index 602c0174e6e..e655c0aa4eb 100644
--- a/airflow-ctl/docs/howto/index.rst
+++ b/airflow-ctl/docs/howto/index.rst
@@ -66,7 +66,18 @@ In both cases token is securely stored in the keyring
backend. Only configuratio
is the API URL and the environment name. The token is stored in the keyring
backend and is not persisted in the
configuration file. The keyring backend is used to securely store the token
and is not accessible to the user.
+Datetime Usage
+''''''''''''''
+For datetime parameters, date should be timezone aware and in ISO format.
+For example: ``2025-10-10T10:00:00+00:00``
+Let's take example of triggering a DAG run with a logical date, run after and
a note.
+.. code-block:: bash
+
+ airflowctl dagrun trigger --dag-id="example_bash_operator"
--logical-date="2025-09-06T00:00:00+00:00"
--run-after="2025-09-06T00:00:00+00:00" --note="Triggered from airflowctl"
+
+More Usage and Help Pictures
+''''''''''''''''''''''''''''
For more information use
.. code-block:: bash
@@ -88,7 +99,6 @@ You can use the command ``airflowctl --help`` to see the list
of available comma
:width: 60%
:alt: airflowctl Help
-
All Available Group Command References
--------------------------------------
diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml
index 611ca1cf09b..d91596b69cd 100644
--- a/airflow-ctl/pyproject.toml
+++ b/airflow-ctl/pyproject.toml
@@ -38,6 +38,9 @@ dependencies = [
"structlog>=25.2.0",
"uuid6>=2024.7.10",
"tabulate>=0.9.0",
+ # Start of shared timezones dependencies
+ "pendulum>=3.1.0",
+ # End of shared timezones dependencies
]
classifiers = [
@@ -62,6 +65,9 @@ path = "src/airflowctl/__init__.py"
[tool.hatch.build.targets.wheel]
packages = ["src/airflowctl"]
+[tool.hatch.build.targets.sdist.force-include]
+"../shared/timezones/src/airflow_shared/timezones" =
"src/airflow/sdk/_shared/timezones"
+
[tool.ruff]
extend = "../pyproject.toml"
src = ["src"]
@@ -191,3 +197,8 @@ required-version = ">=0.6.3"
[tool.uv.sources]
apache-airflow-devel-common = { workspace = true }
+
+[tool.airflow]
+shared_distributions = [
+ "apache-airflow-shared-timezones",
+]
diff --git a/airflow-ctl/src/airflowctl/_shared/timezones
b/airflow-ctl/src/airflowctl/_shared/timezones
new file mode 120000
index 00000000000..8d7034fa713
--- /dev/null
+++ b/airflow-ctl/src/airflowctl/_shared/timezones
@@ -0,0 +1 @@
+../../../../shared/timezones/src/airflow_shared/timezones
\ No newline at end of file
diff --git a/airflow-ctl/src/airflowctl/api/operations.py
b/airflow-ctl/src/airflowctl/api/operations.py
index fe56661d2d0..dd97dd54a5e 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -128,6 +128,20 @@ def _check_flag_and_exit_if_server_response_error(func):
return wrapped
+def _date_safe_dict_from_pydantic(obj: T) -> dict:
+ """
+ Convert a Pydantic model to a dict to be sent to the API server.
+
+ It meant to be used for any pydantic method which will be sent to API.
+
+ :param obj: Pydantic model
+ :return: dict
+ """
+ import json
+
+ return json.loads(obj.model_dump_json())
+
+
class BaseOperations:
"""
Base class for operations.
@@ -229,7 +243,9 @@ class AssetsOperations(BaseOperations):
) -> AssetEventResponse | ServerResponseError:
"""Create an asset event."""
try:
- self.response = self.client.post("assets/events",
json=asset_event_body.model_dump())
+ self.response = self.client.post(
+ "assets/events",
json=_date_safe_dict_from_pydantic(asset_event_body)
+ )
return
AssetEventResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -299,7 +315,7 @@ class BackfillsOperations(BaseOperations):
def create(self, backfill: BackfillPostBody) -> BackfillResponse |
ServerResponseError:
"""Create a backfill."""
try:
- self.response = self.client.post("backfills",
data=backfill.model_dump())
+ self.response = self.client.post("backfills",
data=_date_safe_dict_from_pydantic(backfill))
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -307,7 +323,9 @@ class BackfillsOperations(BaseOperations):
def create_dry_run(self, backfill: BackfillPostBody) -> BackfillResponse |
ServerResponseError:
"""Create a dry run backfill."""
try:
- self.response = self.client.post("backfills/dry_run",
data=backfill.model_dump())
+ self.response = self.client.post(
+ "backfills/dry_run",
data=_date_safe_dict_from_pydantic(backfill)
+ )
return BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -391,7 +409,7 @@ class ConnectionsOperations(BaseOperations):
) -> ConnectionResponse | ServerResponseError:
"""Create a connection."""
try:
- self.response = self.client.post("connections",
json=connection.model_dump())
+ self.response = self.client.post("connections",
json=_date_safe_dict_from_pydantic(connection))
return
ConnectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -399,7 +417,7 @@ class ConnectionsOperations(BaseOperations):
def bulk(self, connections: BulkBodyConnectionBody) -> BulkResponse |
ServerResponseError:
"""CRUD multiple connections."""
try:
- self.response = self.client.patch("connections",
json=connections.model_dump())
+ self.response = self.client.patch("connections",
json=_date_safe_dict_from_pydantic(connections))
return BulkResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -427,7 +445,7 @@ class ConnectionsOperations(BaseOperations):
"""Update a connection."""
try:
self.response = self.client.patch(
- f"connections/{connection.connection_id}",
json=connection.model_dump()
+ f"connections/{connection.connection_id}",
json=_date_safe_dict_from_pydantic(connection)
)
return
ConnectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
@@ -439,7 +457,9 @@ class ConnectionsOperations(BaseOperations):
) -> ConnectionTestResponse | ServerResponseError:
"""Test a connection."""
try:
- self.response = self.client.post("connections/test",
json=connection.model_dump())
+ self.response = self.client.post(
+ "connections/test",
json=_date_safe_dict_from_pydantic(connection)
+ )
return
ConnectionTestResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -557,7 +577,9 @@ class DagRunOperations(BaseOperations):
if trigger_dag_run.conf is None:
trigger_dag_run.conf = {}
try:
- self.response = self.client.post(f"dags/{dag_id}/dagRuns",
json=trigger_dag_run.model_dump())
+ self.response = self.client.post(
+ f"dags/{dag_id}/dagRuns",
json=_date_safe_dict_from_pydantic(trigger_dag_run)
+ )
return DAGRunResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -592,7 +614,7 @@ class PoolsOperations(BaseOperations):
def create(self, pool: PoolBody) -> PoolResponse | ServerResponseError:
"""Create a pool."""
try:
- self.response = self.client.post("pools", json=pool.model_dump())
+ self.response = self.client.post("pools",
json=_date_safe_dict_from_pydantic(pool))
return PoolResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -600,7 +622,7 @@ class PoolsOperations(BaseOperations):
def bulk(self, pools: BulkBodyPoolBody) -> BulkResponse |
ServerResponseError:
"""CRUD multiple pools."""
try:
- self.response = self.client.patch("pools", json=pools.model_dump())
+ self.response = self.client.patch("pools",
json=_date_safe_dict_from_pydantic(pools))
return BulkResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -616,7 +638,9 @@ class PoolsOperations(BaseOperations):
def update(self, pool_body: PoolPatchBody) -> PoolResponse |
ServerResponseError:
"""Update a pool."""
try:
- self.response = self.client.patch(f"pools/{pool_body.pool}",
json=pool_body.model_dump())
+ self.response = self.client.patch(
+ f"pools/{pool_body.pool}",
json=_date_safe_dict_from_pydantic(pool_body)
+ )
return PoolResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -648,7 +672,7 @@ class VariablesOperations(BaseOperations):
def create(self, variable: VariableBody) -> VariableResponse |
ServerResponseError:
"""Create a variable."""
try:
- self.response = self.client.post("variables",
json=variable.model_dump())
+ self.response = self.client.post("variables",
json=_date_safe_dict_from_pydantic(variable))
return VariableResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -656,7 +680,7 @@ class VariablesOperations(BaseOperations):
def bulk(self, variables: BulkBodyVariableBody) -> BulkResponse |
ServerResponseError:
"""CRUD multiple variables."""
try:
- self.response = self.client.patch("variables",
json=variables.model_dump())
+ self.response = self.client.patch("variables",
json=_date_safe_dict_from_pydantic(variables))
return BulkResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
@@ -672,7 +696,9 @@ class VariablesOperations(BaseOperations):
def update(self, variable: VariableBody) -> VariableResponse |
ServerResponseError:
"""Update a variable."""
try:
- self.response = self.client.patch(f"variables/{variable.key}",
json=variable.model_dump())
+ self.response = self.client.patch(
+ f"variables/{variable.key}",
json=_date_safe_dict_from_pydantic(variable)
+ )
return VariableResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py
b/airflow-ctl/src/airflowctl/ctl/cli_config.py
index 085fe5abec4..a4212ceef15 100644
--- a/airflow-ctl/src/airflowctl/ctl/cli_config.py
+++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py
@@ -22,7 +22,6 @@ from __future__ import annotations
import argparse
import ast
-import datetime
import getpass
import inspect
import os
@@ -37,6 +36,7 @@ import httpx
import rich
import airflowctl.api.datamodels.generated as generated_datamodels
+from airflowctl._shared.timezones.timezone import parse as parsedate
from airflowctl.api.client import NEW_API_CLIENT, Client, ClientKind,
provide_api_client
from airflowctl.api.operations import BaseOperations, ServerResponseError
from airflowctl.ctl.console_formatting import AirflowConsole
@@ -445,7 +445,7 @@ class CommandFactory:
return type_name in primitive_types
@staticmethod
- def _python_type_from_string(type_name: str) -> type:
+ def _python_type_from_string(type_name: str | type) -> type | Callable:
"""
Return the corresponding Python *type* for a primitive type name
string.
@@ -455,7 +455,9 @@ class CommandFactory:
leading to type errors or unexpected behaviour when invoking the REST
API.
"""
- mapping: dict[str, type] = {
+ if "|" in str(type_name):
+ type_name = [t.strip() for t in str(type_name).split("|") if
t.strip() != "None"].pop()
+ mapping: dict[str, type | Callable] = {
"int": int,
"float": float,
"bool": bool,
@@ -465,16 +467,18 @@ class CommandFactory:
"dict": dict,
"tuple": tuple,
"set": set,
- "datetime.datetime": datetime.datetime,
+ "datetime.datetime": parsedate,
}
# Default to ``str`` to preserve previous behaviour for any
unrecognised
# type names while still allowing the CLI to function.
+ if isinstance(type_name, type):
+ type_name = type_name.__name__
return mapping.get(type_name, str)
@staticmethod
def _create_arg(
arg_flags: tuple,
- arg_type: type,
+ arg_type: type | Callable,
arg_help: str,
arg_action: argparse.BooleanOptionalAction | None,
arg_dest: str | None = None,
@@ -507,7 +511,7 @@ class CommandFactory:
commands.append(
self._create_arg(
arg_flags=("--" +
self._sanitize_arg_parameter_key(field),),
- arg_type=field_type.annotation,
+
arg_type=self._python_type_from_string(field_type.annotation),
arg_action=argparse.BooleanOptionalAction if
field_type.annotation is bool else None, # type: ignore
arg_help=f"{field} for {parameter_key} operation",
arg_default=False if field_type.annotation is bool
else None,
@@ -522,7 +526,7 @@ class CommandFactory:
commands.append(
self._create_arg(
arg_flags=("--" +
self._sanitize_arg_parameter_key(field),),
- arg_type=annotation,
+ arg_type=self._python_type_from_string(annotation),
arg_action=argparse.BooleanOptionalAction if
annotation is bool else None, # type: ignore
arg_help=f"{field} for {parameter_key} operation",
arg_default=False if annotation is bool else None,
@@ -537,12 +541,11 @@ class CommandFactory:
for parameter in operation.get("parameters"):
for parameter_key, parameter_type in parameter.items():
if self._is_primitive_type(type_name=parameter_type):
- python_type =
self._python_type_from_string(parameter_type)
is_bool = parameter_type == "bool"
args.append(
self._create_arg(
arg_flags=("--" +
self._sanitize_arg_parameter_key(parameter_key),),
- arg_type=None if is_bool else python_type,
+
arg_type=self._python_type_from_string(parameter_type),
arg_action=argparse.BooleanOptionalAction if
is_bool else None,
arg_help=f"{parameter_key} for
{operation.get('name')} operation in {operation.get('parent').name}",
arg_default=False if is_bool else None,
diff --git a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
index 1e4f5728a25..f5694d5b4bc 100644
--- a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
+++ b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
@@ -17,14 +17,11 @@
from __future__ import annotations
-import datetime
from argparse import BooleanOptionalAction
from textwrap import dedent
-from typing import Any
import pytest
-from airflowctl.api.datamodels.generated import ReprocessBehavior
from airflowctl.ctl.cli_config import ActionCommand, CommandFactory,
GroupCommand, merge_commands
@@ -59,7 +56,7 @@ def test_args_create():
"help": "from_date for backfill operation",
"action": None,
"default": None,
- "type": datetime.datetime,
+ "type": str,
"dest": None,
},
),
@@ -69,7 +66,7 @@ def test_args_create():
"help": "to_date for backfill operation",
"action": None,
"default": None,
- "type": datetime.datetime,
+ "type": str,
"dest": None,
},
),
@@ -89,7 +86,7 @@ def test_args_create():
"help": "dag_run_conf for backfill operation",
"action": None,
"default": None,
- "type": dict[str, Any],
+ "type": dict,
"dest": None,
},
),
@@ -99,7 +96,7 @@ def test_args_create():
"help": "reprocess_behavior for backfill operation",
"action": None,
"default": None,
- "type": ReprocessBehavior,
+ "type": str,
"dest": None,
},
),
@@ -228,7 +225,7 @@ class TestCommandFactory:
class BackfillsOperations(BaseOperations):
def create(self, backfill: BackfillPostBody) ->
BackfillResponse | ServerResponseError:
try:
- self.response = self.client.post("backfills",
data=backfill.model_dump())
+ self.response = self.client.post("backfills",
json=_date_safe_dict_from_pydantic(backfill))
return
BackfillResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e