This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b6013c0b8f Added exclude_microseconds to cli (#27640)
b6013c0b8f is described below
commit b6013c0b8f1064c523af2d905c3f32ff1cbec421
Author: MrParosk <[email protected]>
AuthorDate: Sat Nov 26 01:07:11 2022 +0100
Added exclude_microseconds to cli (#27640)
* Added exclude_microseconds to cli
* Changed parsing type
* Renamed arg
---
airflow/api/client/api_client.py | 3 ++-
airflow/api/client/json_client.py | 3 ++-
airflow/api/client/local_client.py | 8 ++++++--
airflow/cli/cli_parser.py | 9 ++++++++-
airflow/cli/commands/dag_command.py | 6 +++++-
tests/cli/commands/test_dag_command.py | 22 ++++++++++++++++++++++
6 files changed, 45 insertions(+), 6 deletions(-)
diff --git a/airflow/api/client/api_client.py b/airflow/api/client/api_client.py
index 1334771b8d..8d1c26065b 100644
--- a/airflow/api/client/api_client.py
+++ b/airflow/api/client/api_client.py
@@ -30,13 +30,14 @@ class Client:
if auth:
self._session.auth = auth
- def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
+ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None,
replace_microseconds=True):
"""Create a dag run for the specified dag.
:param dag_id:
:param run_id:
:param conf:
:param execution_date:
+ :param replace_microseconds:
:return:
"""
raise NotImplementedError()
diff --git a/airflow/api/client/json_client.py
b/airflow/api/client/json_client.py
index c6995ceb2a..c225e08537 100644
--- a/airflow/api/client/json_client.py
+++ b/airflow/api/client/json_client.py
@@ -43,7 +43,7 @@ class Client(api_client.Client):
return resp.json()
- def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
+ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None,
replace_microseconds=True):
endpoint = f"/api/experimental/dags/{dag_id}/dag_runs"
url = urljoin(self._api_base_url, endpoint)
data = self._request(
@@ -53,6 +53,7 @@ class Client(api_client.Client):
"run_id": run_id,
"conf": conf,
"execution_date": execution_date,
+ "replace_microseconds": replace_microseconds,
},
)
return data["message"]
diff --git a/airflow/api/client/local_client.py
b/airflow/api/client/local_client.py
index 3b6e61241e..2c8f471b39 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -28,9 +28,13 @@ from airflow.models.pool import Pool
class Client(api_client.Client):
"""Local API client implementation."""
- def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
+ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None,
replace_microseconds=True):
dag_run = trigger_dag.trigger_dag(
- dag_id=dag_id, run_id=run_id, conf=conf,
execution_date=execution_date
+ dag_id=dag_id,
+ run_id=run_id,
+ conf=conf,
+ execution_date=execution_date,
+ replace_microseconds=replace_microseconds,
)
return f"Created {dag_run}"
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index a6ec776bd2..0fc2f10101 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -433,6 +433,13 @@ ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using
the imgcat tool.", ac
ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the
DagRun's conf attribute")
ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the
DAG", type=parsedate)
+ARG_REPLACE_MICRO = Arg(
+ ("--no-replace-microseconds",),
+ help="whether microseconds should be zeroed",
+ dest="replace_microseconds",
+ action="store_false",
+ default=True,
+)
# db
ARG_DB_TABLES = Arg(
@@ -1082,7 +1089,7 @@ DAGS_COMMANDS = (
name="trigger",
help="Trigger a DAG run",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"),
- args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE,
ARG_VERBOSE),
+ args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE,
ARG_VERBOSE, ARG_REPLACE_MICRO),
),
ActionCommand(
name="delete",
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index 0465c474ed..2df0a162c4 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -143,7 +143,11 @@ def dag_trigger(args):
api_client = get_current_api_client()
try:
message = api_client.trigger_dag(
- dag_id=args.dag_id, run_id=args.run_id, conf=args.conf,
execution_date=args.exec_date
+ dag_id=args.dag_id,
+ run_id=args.run_id,
+ conf=args.conf,
+ execution_date=args.exec_date,
+ replace_microseconds=args.replace_microseconds,
)
print(message)
except OSError as err:
diff --git a/tests/cli/commands/test_dag_command.py
b/tests/cli/commands/test_dag_command.py
index 025a7bbacd..419b9bdccc 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -570,6 +570,28 @@ class TestCliDags:
assert dagrun.data_interval_start.isoformat(timespec="seconds") ==
"2021-06-03T00:00:00+00:00"
assert dagrun.data_interval_end.isoformat(timespec="seconds") ==
"2021-06-04T00:00:00+00:00"
+ def test_trigger_dag_with_microseconds(self):
+ dag_command.dag_trigger(
+ self.parser.parse_args(
+ [
+ "dags",
+ "trigger",
+ "example_bash_operator",
+ "--run-id=test_trigger_dag_with_micro",
+ "--exec-date=2021-06-04T09:00:00.000001+08:00",
+ "--no-replace-microseconds",
+ ],
+ )
+ )
+
+ with create_session() as session:
+ dagrun = session.query(DagRun).filter(DagRun.run_id ==
"test_trigger_dag_with_micro").one()
+
+ assert dagrun, "DagRun not created"
+ assert dagrun.run_type == DagRunType.MANUAL
+ assert dagrun.external_trigger
+ assert dagrun.execution_date.isoformat(timespec="microseconds") ==
"2021-06-04T01:00:00.000001+00:00"
+
def test_trigger_dag_invalid_conf(self):
with pytest.raises(ValueError):
dag_command.dag_trigger(