This is an automated email from the ASF dual-hosted git repository.

bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8756128338e fix(airflowctl): timezone aware and consistent 
serialisation for datetime (#55282)
8756128338e is described below

commit 8756128338e01e3397ee445300de69e3ee94f8d8
Author: Bugra Ozturk <[email protected]>
AuthorDate: Wed Sep 10 17:15:23 2025 +0200

    fix(airflowctl): timezone aware and consistent serialisation for datetime 
(#55282)
    
    * fix(airflowctl): Introduce _date_safe_dict_from_pydantic for consistent 
serialization and make datetime timezone aware via shared code
    
    * fix(airflowctl): Improve docstring clarity for 
_date_safe_dict_from_pydantic function
    
    * feat(airflowctl): Add shared timezones dependency and configuration
    
    * fix(airflowctl): Update import path for timezone parsing to align with 
shared module structure and add documentation for datetime usage
    
    * fix(airflowctl): Update example in documentation and improve argument 
type handling in CLI configuration
    
    * fix(airflowctl): Enhance type handling in CLI configuration and update 
test cases for argument types
    
    * Update airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
---
 airflow-ctl/docs/howto/index.rst                   | 12 ++++-
 airflow-ctl/pyproject.toml                         | 11 +++++
 airflow-ctl/src/airflowctl/_shared/timezones       |  1 +
 airflow-ctl/src/airflowctl/api/operations.py       | 54 ++++++++++++++++------
 airflow-ctl/src/airflowctl/ctl/cli_config.py       | 21 +++++----
 .../tests/airflow_ctl/ctl/test_cli_config.py       | 13 ++----
 6 files changed, 80 insertions(+), 32 deletions(-)

diff --git a/airflow-ctl/docs/howto/index.rst b/airflow-ctl/docs/howto/index.rst
index 602c0174e6e..e655c0aa4eb 100644
--- a/airflow-ctl/docs/howto/index.rst
+++ b/airflow-ctl/docs/howto/index.rst
@@ -66,7 +66,18 @@ In both cases token is securely stored in the keyring 
backend. Only configuratio
 is the API URL and the environment name. The token is stored in the keyring 
backend and is not persisted in the
 configuration file. The keyring backend is used to securely store the token 
and is not accessible to the user.
 
+Datetime Usage
+''''''''''''''
+For datetime parameters, date should be timezone aware and in ISO format.
+For example: ``2025-10-10T10:00:00+00:00``
+Let's take example of triggering a DAG run with a logical date, run after and 
a note.
 
+.. code-block:: bash
+
+  airflowctl dagrun trigger --dag-id="example_bash_operator" 
--logical-date="2025-09-06T00:00:00+00:00" 
--run-after="2025-09-06T00:00:00+00:00" --note="Triggered from airflowctl"
+
+More Usage and Help Pictures
+''''''''''''''''''''''''''''
 For more information use
 
 .. code-block:: bash
@@ -88,7 +99,6 @@ You can use the command ``airflowctl --help`` to see the list 
of available comma
   :width: 60%
   :alt: airflowctl Help
 
-
 All Available Group Command References
 --------------------------------------
 
diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml
index 611ca1cf09b..d91596b69cd 100644
--- a/airflow-ctl/pyproject.toml
+++ b/airflow-ctl/pyproject.toml
@@ -38,6 +38,9 @@ dependencies = [
     "structlog>=25.2.0",
     "uuid6>=2024.7.10",
     "tabulate>=0.9.0",
+    # Start of shared timezones dependencies
+    "pendulum>=3.1.0",
+    # End of shared timezones dependencies
 ]
 
 classifiers = [
@@ -62,6 +65,9 @@ path = "src/airflowctl/__init__.py"
 [tool.hatch.build.targets.wheel]
 packages = ["src/airflowctl"]
 
+[tool.hatch.build.targets.sdist.force-include]
+"../shared/timezones/src/airflow_shared/timezones" = 
"src/airflow/sdk/_shared/timezones"
+
 [tool.ruff]
 extend = "../pyproject.toml"
 src = ["src"]
@@ -191,3 +197,8 @@ required-version = ">=0.6.3"
 
 [tool.uv.sources]
 apache-airflow-devel-common = { workspace = true }
+
+[tool.airflow]
+shared_distributions = [
+    "apache-airflow-shared-timezones",
+]
diff --git a/airflow-ctl/src/airflowctl/_shared/timezones 
b/airflow-ctl/src/airflowctl/_shared/timezones
new file mode 120000
index 00000000000..8d7034fa713
--- /dev/null
+++ b/airflow-ctl/src/airflowctl/_shared/timezones
@@ -0,0 +1 @@
+../../../../shared/timezones/src/airflow_shared/timezones
\ No newline at end of file
diff --git a/airflow-ctl/src/airflowctl/api/operations.py 
b/airflow-ctl/src/airflowctl/api/operations.py
index fe56661d2d0..dd97dd54a5e 100644
--- a/airflow-ctl/src/airflowctl/api/operations.py
+++ b/airflow-ctl/src/airflowctl/api/operations.py
@@ -128,6 +128,20 @@ def _check_flag_and_exit_if_server_response_error(func):
     return wrapped
 
 
+def _date_safe_dict_from_pydantic(obj: T) -> dict:
+    """
+    Convert a Pydantic model to a dict to be sent to the API server.
+
+    It meant to be used for any pydantic method which will be sent to API.
+
+    :param obj: Pydantic model
+    :return: dict
+    """
+    import json
+
+    return json.loads(obj.model_dump_json())
+
+
 class BaseOperations:
     """
     Base class for operations.
@@ -229,7 +243,9 @@ class AssetsOperations(BaseOperations):
     ) -> AssetEventResponse | ServerResponseError:
         """Create an asset event."""
         try:
-            self.response = self.client.post("assets/events", 
json=asset_event_body.model_dump())
+            self.response = self.client.post(
+                "assets/events", 
json=_date_safe_dict_from_pydantic(asset_event_body)
+            )
             return 
AssetEventResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -299,7 +315,7 @@ class BackfillsOperations(BaseOperations):
     def create(self, backfill: BackfillPostBody) -> BackfillResponse | 
ServerResponseError:
         """Create a backfill."""
         try:
-            self.response = self.client.post("backfills", 
data=backfill.model_dump())
+            self.response = self.client.post("backfills", 
data=_date_safe_dict_from_pydantic(backfill))
             return BackfillResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -307,7 +323,9 @@ class BackfillsOperations(BaseOperations):
     def create_dry_run(self, backfill: BackfillPostBody) -> BackfillResponse | 
ServerResponseError:
         """Create a dry run backfill."""
         try:
-            self.response = self.client.post("backfills/dry_run", 
data=backfill.model_dump())
+            self.response = self.client.post(
+                "backfills/dry_run", 
data=_date_safe_dict_from_pydantic(backfill)
+            )
             return BackfillResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -391,7 +409,7 @@ class ConnectionsOperations(BaseOperations):
     ) -> ConnectionResponse | ServerResponseError:
         """Create a connection."""
         try:
-            self.response = self.client.post("connections", 
json=connection.model_dump())
+            self.response = self.client.post("connections", 
json=_date_safe_dict_from_pydantic(connection))
             return 
ConnectionResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -399,7 +417,7 @@ class ConnectionsOperations(BaseOperations):
     def bulk(self, connections: BulkBodyConnectionBody) -> BulkResponse | 
ServerResponseError:
         """CRUD multiple connections."""
         try:
-            self.response = self.client.patch("connections", 
json=connections.model_dump())
+            self.response = self.client.patch("connections", 
json=_date_safe_dict_from_pydantic(connections))
             return BulkResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -427,7 +445,7 @@ class ConnectionsOperations(BaseOperations):
         """Update a connection."""
         try:
             self.response = self.client.patch(
-                f"connections/{connection.connection_id}", 
json=connection.model_dump()
+                f"connections/{connection.connection_id}", 
json=_date_safe_dict_from_pydantic(connection)
             )
             return 
ConnectionResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
@@ -439,7 +457,9 @@ class ConnectionsOperations(BaseOperations):
     ) -> ConnectionTestResponse | ServerResponseError:
         """Test a connection."""
         try:
-            self.response = self.client.post("connections/test", 
json=connection.model_dump())
+            self.response = self.client.post(
+                "connections/test", 
json=_date_safe_dict_from_pydantic(connection)
+            )
             return 
ConnectionTestResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -557,7 +577,9 @@ class DagRunOperations(BaseOperations):
         if trigger_dag_run.conf is None:
             trigger_dag_run.conf = {}
         try:
-            self.response = self.client.post(f"dags/{dag_id}/dagRuns", 
json=trigger_dag_run.model_dump())
+            self.response = self.client.post(
+                f"dags/{dag_id}/dagRuns", 
json=_date_safe_dict_from_pydantic(trigger_dag_run)
+            )
             return DAGRunResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -592,7 +614,7 @@ class PoolsOperations(BaseOperations):
     def create(self, pool: PoolBody) -> PoolResponse | ServerResponseError:
         """Create a pool."""
         try:
-            self.response = self.client.post("pools", json=pool.model_dump())
+            self.response = self.client.post("pools", 
json=_date_safe_dict_from_pydantic(pool))
             return PoolResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -600,7 +622,7 @@ class PoolsOperations(BaseOperations):
     def bulk(self, pools: BulkBodyPoolBody) -> BulkResponse | 
ServerResponseError:
         """CRUD multiple pools."""
         try:
-            self.response = self.client.patch("pools", json=pools.model_dump())
+            self.response = self.client.patch("pools", 
json=_date_safe_dict_from_pydantic(pools))
             return BulkResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -616,7 +638,9 @@ class PoolsOperations(BaseOperations):
     def update(self, pool_body: PoolPatchBody) -> PoolResponse | 
ServerResponseError:
         """Update a pool."""
         try:
-            self.response = self.client.patch(f"pools/{pool_body.pool}", 
json=pool_body.model_dump())
+            self.response = self.client.patch(
+                f"pools/{pool_body.pool}", 
json=_date_safe_dict_from_pydantic(pool_body)
+            )
             return PoolResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -648,7 +672,7 @@ class VariablesOperations(BaseOperations):
     def create(self, variable: VariableBody) -> VariableResponse | 
ServerResponseError:
         """Create a variable."""
         try:
-            self.response = self.client.post("variables", 
json=variable.model_dump())
+            self.response = self.client.post("variables", 
json=_date_safe_dict_from_pydantic(variable))
             return VariableResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -656,7 +680,7 @@ class VariablesOperations(BaseOperations):
     def bulk(self, variables: BulkBodyVariableBody) -> BulkResponse | 
ServerResponseError:
         """CRUD multiple variables."""
         try:
-            self.response = self.client.patch("variables", 
json=variables.model_dump())
+            self.response = self.client.patch("variables", 
json=_date_safe_dict_from_pydantic(variables))
             return BulkResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
@@ -672,7 +696,9 @@ class VariablesOperations(BaseOperations):
     def update(self, variable: VariableBody) -> VariableResponse | 
ServerResponseError:
         """Update a variable."""
         try:
-            self.response = self.client.patch(f"variables/{variable.key}", 
json=variable.model_dump())
+            self.response = self.client.patch(
+                f"variables/{variable.key}", 
json=_date_safe_dict_from_pydantic(variable)
+            )
             return VariableResponse.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py 
b/airflow-ctl/src/airflowctl/ctl/cli_config.py
index 085fe5abec4..a4212ceef15 100644
--- a/airflow-ctl/src/airflowctl/ctl/cli_config.py
+++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py
@@ -22,7 +22,6 @@ from __future__ import annotations
 
 import argparse
 import ast
-import datetime
 import getpass
 import inspect
 import os
@@ -37,6 +36,7 @@ import httpx
 import rich
 
 import airflowctl.api.datamodels.generated as generated_datamodels
+from airflowctl._shared.timezones.timezone import parse as parsedate
 from airflowctl.api.client import NEW_API_CLIENT, Client, ClientKind, 
provide_api_client
 from airflowctl.api.operations import BaseOperations, ServerResponseError
 from airflowctl.ctl.console_formatting import AirflowConsole
@@ -445,7 +445,7 @@ class CommandFactory:
         return type_name in primitive_types
 
     @staticmethod
-    def _python_type_from_string(type_name: str) -> type:
+    def _python_type_from_string(type_name: str | type) -> type | Callable:
         """
         Return the corresponding Python *type* for a primitive type name 
string.
 
@@ -455,7 +455,9 @@ class CommandFactory:
         leading to type errors or unexpected behaviour when invoking the REST
         API.
         """
-        mapping: dict[str, type] = {
+        if "|" in str(type_name):
+            type_name = [t.strip() for t in str(type_name).split("|") if 
t.strip() != "None"].pop()
+        mapping: dict[str, type | Callable] = {
             "int": int,
             "float": float,
             "bool": bool,
@@ -465,16 +467,18 @@ class CommandFactory:
             "dict": dict,
             "tuple": tuple,
             "set": set,
-            "datetime.datetime": datetime.datetime,
+            "datetime.datetime": parsedate,
         }
         # Default to ``str`` to preserve previous behaviour for any 
unrecognised
         # type names while still allowing the CLI to function.
+        if isinstance(type_name, type):
+            type_name = type_name.__name__
         return mapping.get(type_name, str)
 
     @staticmethod
     def _create_arg(
         arg_flags: tuple,
-        arg_type: type,
+        arg_type: type | Callable,
         arg_help: str,
         arg_action: argparse.BooleanOptionalAction | None,
         arg_dest: str | None = None,
@@ -507,7 +511,7 @@ class CommandFactory:
                 commands.append(
                     self._create_arg(
                         arg_flags=("--" + 
self._sanitize_arg_parameter_key(field),),
-                        arg_type=field_type.annotation,
+                        
arg_type=self._python_type_from_string(field_type.annotation),
                         arg_action=argparse.BooleanOptionalAction if 
field_type.annotation is bool else None,  # type: ignore
                         arg_help=f"{field} for {parameter_key} operation",
                         arg_default=False if field_type.annotation is bool 
else None,
@@ -522,7 +526,7 @@ class CommandFactory:
                 commands.append(
                     self._create_arg(
                         arg_flags=("--" + 
self._sanitize_arg_parameter_key(field),),
-                        arg_type=annotation,
+                        arg_type=self._python_type_from_string(annotation),
                         arg_action=argparse.BooleanOptionalAction if 
annotation is bool else None,  # type: ignore
                         arg_help=f"{field} for {parameter_key} operation",
                         arg_default=False if annotation is bool else None,
@@ -537,12 +541,11 @@ class CommandFactory:
             for parameter in operation.get("parameters"):
                 for parameter_key, parameter_type in parameter.items():
                     if self._is_primitive_type(type_name=parameter_type):
-                        python_type = 
self._python_type_from_string(parameter_type)
                         is_bool = parameter_type == "bool"
                         args.append(
                             self._create_arg(
                                 arg_flags=("--" + 
self._sanitize_arg_parameter_key(parameter_key),),
-                                arg_type=None if is_bool else python_type,
+                                
arg_type=self._python_type_from_string(parameter_type),
                                 arg_action=argparse.BooleanOptionalAction if 
is_bool else None,
                                 arg_help=f"{parameter_key} for 
{operation.get('name')} operation in {operation.get('parent').name}",
                                 arg_default=False if is_bool else None,
diff --git a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py 
b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
index 1e4f5728a25..f5694d5b4bc 100644
--- a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
+++ b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
@@ -17,14 +17,11 @@
 
 from __future__ import annotations
 
-import datetime
 from argparse import BooleanOptionalAction
 from textwrap import dedent
-from typing import Any
 
 import pytest
 
-from airflowctl.api.datamodels.generated import ReprocessBehavior
 from airflowctl.ctl.cli_config import ActionCommand, CommandFactory, 
GroupCommand, merge_commands
 
 
@@ -59,7 +56,7 @@ def test_args_create():
                 "help": "from_date for backfill operation",
                 "action": None,
                 "default": None,
-                "type": datetime.datetime,
+                "type": str,
                 "dest": None,
             },
         ),
@@ -69,7 +66,7 @@ def test_args_create():
                 "help": "to_date for backfill operation",
                 "action": None,
                 "default": None,
-                "type": datetime.datetime,
+                "type": str,
                 "dest": None,
             },
         ),
@@ -89,7 +86,7 @@ def test_args_create():
                 "help": "dag_run_conf for backfill operation",
                 "action": None,
                 "default": None,
-                "type": dict[str, Any],
+                "type": dict,
                 "dest": None,
             },
         ),
@@ -99,7 +96,7 @@ def test_args_create():
                 "help": "reprocess_behavior for backfill operation",
                 "action": None,
                 "default": None,
-                "type": ReprocessBehavior,
+                "type": str,
                 "dest": None,
             },
         ),
@@ -228,7 +225,7 @@ class TestCommandFactory:
                 class BackfillsOperations(BaseOperations):
                     def create(self, backfill: BackfillPostBody) -> 
BackfillResponse | ServerResponseError:
                         try:
-                            self.response = self.client.post("backfills", 
data=backfill.model_dump())
+                            self.response = self.client.post("backfills", 
json=_date_safe_dict_from_pydantic(backfill))
                             return 
BackfillResponse.model_validate_json(self.response.content)
                         except ServerResponseError as e:
                             raise e

Reply via email to