This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b6013c0b8f Added exclude_microseconds to cli (#27640)
b6013c0b8f is described below

commit b6013c0b8f1064c523af2d905c3f32ff1cbec421
Author: MrParosk <[email protected]>
AuthorDate: Sat Nov 26 01:07:11 2022 +0100

    Added exclude_microseconds to cli (#27640)
    
    * Added exclude_microseconds to cli
    
    * Changed parsing type
    
    * Renamed arg
---
 airflow/api/client/api_client.py       |  3 ++-
 airflow/api/client/json_client.py      |  3 ++-
 airflow/api/client/local_client.py     |  8 ++++++--
 airflow/cli/cli_parser.py              |  9 ++++++++-
 airflow/cli/commands/dag_command.py    |  6 +++++-
 tests/cli/commands/test_dag_command.py | 22 ++++++++++++++++++++++
 6 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/airflow/api/client/api_client.py b/airflow/api/client/api_client.py
index 1334771b8d..8d1c26065b 100644
--- a/airflow/api/client/api_client.py
+++ b/airflow/api/client/api_client.py
@@ -30,13 +30,14 @@ class Client:
         if auth:
             self._session.auth = auth
 
-    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
+    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, 
replace_microseconds=True):
         """Create a dag run for the specified dag.
 
         :param dag_id:
         :param run_id:
         :param conf:
         :param execution_date:
+        :param replace_microseconds:
         :return:
         """
         raise NotImplementedError()
diff --git a/airflow/api/client/json_client.py 
b/airflow/api/client/json_client.py
index c6995ceb2a..c225e08537 100644
--- a/airflow/api/client/json_client.py
+++ b/airflow/api/client/json_client.py
@@ -43,7 +43,7 @@ class Client(api_client.Client):
 
         return resp.json()
 
-    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
+    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, 
replace_microseconds=True):
         endpoint = f"/api/experimental/dags/{dag_id}/dag_runs"
         url = urljoin(self._api_base_url, endpoint)
         data = self._request(
@@ -53,6 +53,7 @@ class Client(api_client.Client):
                 "run_id": run_id,
                 "conf": conf,
                 "execution_date": execution_date,
+                "replace_microseconds": replace_microseconds,
             },
         )
         return data["message"]
diff --git a/airflow/api/client/local_client.py 
b/airflow/api/client/local_client.py
index 3b6e61241e..2c8f471b39 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -28,9 +28,13 @@ from airflow.models.pool import Pool
 class Client(api_client.Client):
     """Local API client implementation."""
 
-    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
+    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, 
replace_microseconds=True):
         dag_run = trigger_dag.trigger_dag(
-            dag_id=dag_id, run_id=run_id, conf=conf, 
execution_date=execution_date
+            dag_id=dag_id,
+            run_id=run_id,
+            conf=conf,
+            execution_date=execution_date,
+            replace_microseconds=replace_microseconds,
         )
         return f"Created {dag_run}"
 
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index a6ec776bd2..0fc2f10101 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -433,6 +433,13 @@ ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using 
the imgcat tool.", ac
 ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
 ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the 
DagRun's conf attribute")
 ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the 
DAG", type=parsedate)
+ARG_REPLACE_MICRO = Arg(
+    ("--no-replace-microseconds",),
+    help="whether microseconds should be zeroed",
+    dest="replace_microseconds",
+    action="store_false",
+    default=True,
+)
 
 # db
 ARG_DB_TABLES = Arg(
@@ -1082,7 +1089,7 @@ DAGS_COMMANDS = (
         name="trigger",
         help="Trigger a DAG run",
         func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"),
-        args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE, 
ARG_VERBOSE),
+        args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE, 
ARG_VERBOSE, ARG_REPLACE_MICRO),
     ),
     ActionCommand(
         name="delete",
diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 0465c474ed..2df0a162c4 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -143,7 +143,11 @@ def dag_trigger(args):
     api_client = get_current_api_client()
     try:
         message = api_client.trigger_dag(
-            dag_id=args.dag_id, run_id=args.run_id, conf=args.conf, 
execution_date=args.exec_date
+            dag_id=args.dag_id,
+            run_id=args.run_id,
+            conf=args.conf,
+            execution_date=args.exec_date,
+            replace_microseconds=args.replace_microseconds,
         )
         print(message)
     except OSError as err:
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 025a7bbacd..419b9bdccc 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -570,6 +570,28 @@ class TestCliDags:
         assert dagrun.data_interval_start.isoformat(timespec="seconds") == 
"2021-06-03T00:00:00+00:00"
         assert dagrun.data_interval_end.isoformat(timespec="seconds") == 
"2021-06-04T00:00:00+00:00"
 
+    def test_trigger_dag_with_microseconds(self):
+        dag_command.dag_trigger(
+            self.parser.parse_args(
+                [
+                    "dags",
+                    "trigger",
+                    "example_bash_operator",
+                    "--run-id=test_trigger_dag_with_micro",
+                    "--exec-date=2021-06-04T09:00:00.000001+08:00",
+                    "--no-replace-microseconds",
+                ],
+            )
+        )
+
+        with create_session() as session:
+            dagrun = session.query(DagRun).filter(DagRun.run_id == 
"test_trigger_dag_with_micro").one()
+
+        assert dagrun, "DagRun not created"
+        assert dagrun.run_type == DagRunType.MANUAL
+        assert dagrun.external_trigger
+        assert dagrun.execution_date.isoformat(timespec="microseconds") == 
"2021-06-04T01:00:00.000001+00:00"
+
     def test_trigger_dag_invalid_conf(self):
         with pytest.raises(ValueError):
             dag_command.dag_trigger(

Reply via email to