This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 60fc407911 add output format arg for `cli.dags.trigger` (#29224)
60fc407911 is described below
commit 60fc40791121b19fe379e4216529b2138162b443
Author: Hussein Awala <[email protected]>
AuthorDate: Sun Feb 19 16:15:56 2023 +0100
add output format arg for `cli.dags.trigger` (#29224)
* add output format arg for cli.dags.trigger
* add unit tests for local client
* add unitest for dag trigger output
* Update tests/api/client/test_local_client.py
Co-authored-by: Ephraim Anierobi <[email protected]>
---------
Co-authored-by: Ephraim Anierobi <[email protected]>
Co-authored-by: eladkal <[email protected]>
---
airflow/api/client/local_client.py | 21 ++++++++++++++++--
airflow/cli/cli_parser.py | 11 +++++++++-
airflow/cli/commands/dag_command.py | 4 ++++
tests/api/client/test_local_client.py | 39 ++++++++++++++++++++++++++++++++++
tests/cli/commands/test_dag_command.py | 25 ++++++++++++++++++++++
5 files changed, 97 insertions(+), 3 deletions(-)
diff --git a/airflow/api/client/local_client.py
b/airflow/api/client/local_client.py
index 2c8f471b39..afdcd0abd6 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -28,7 +28,9 @@ from airflow.models.pool import Pool
class Client(api_client.Client):
"""Local API client implementation."""
- def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None,
replace_microseconds=True):
+ def trigger_dag(
+ self, dag_id, run_id=None, conf=None, execution_date=None,
replace_microseconds=True
+ ) -> dict | None:
dag_run = trigger_dag.trigger_dag(
dag_id=dag_id,
run_id=run_id,
@@ -36,7 +38,22 @@ class Client(api_client.Client):
execution_date=execution_date,
replace_microseconds=replace_microseconds,
)
- return f"Created {dag_run}"
+ if dag_run:
+ return {
+ "conf": dag_run.conf,
+ "dag_id": dag_run.dag_id,
+ "dag_run_id": dag_run.run_id,
+ "data_interval_end": dag_run.data_interval_start,
+ "data_interval_start": dag_run.data_interval_end,
+ "end_date": dag_run.end_date,
+ "external_trigger": dag_run.external_trigger,
+ "last_scheduling_decision": dag_run.last_scheduling_decision,
+ "logical_date": dag_run.logical_date,
+ "run_type": dag_run.run_type,
+ "start_date": dag_run.start_date,
+ "state": dag_run.state,
+ }
+ return dag_run
def delete_dag(self, dag_id):
count = delete_dag.delete_dag(dag_id)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 72026ade5c..993be83c0a 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1180,7 +1180,16 @@ DAGS_COMMANDS = (
name="trigger",
help="Trigger a DAG run",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"),
- args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE,
ARG_VERBOSE, ARG_REPLACE_MICRO),
+ args=(
+ ARG_DAG_ID,
+ ARG_SUBDIR,
+ ARG_RUN_ID,
+ ARG_CONF,
+ ARG_EXEC_DATE,
+ ARG_VERBOSE,
+ ARG_REPLACE_MICRO,
+ ARG_OUTPUT,
+ ),
),
ActionCommand(
name="delete",
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index 2df0a162c4..4a9a1f2c12 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -150,6 +150,10 @@ def dag_trigger(args):
replace_microseconds=args.replace_microseconds,
)
print(message)
+ AirflowConsole().print_as(
+ data=[message] if message is not None else [],
+ output=args.output,
+ )
except OSError as err:
raise AirflowException(err)
diff --git a/tests/api/client/test_local_client.py
b/tests/api/client/test_local_client.py
index ff2ea3b048..5527f00dea 100644
--- a/tests/api/client/test_local_client.py
+++ b/tests/api/client/test_local_client.py
@@ -128,6 +128,45 @@ class TestLocalClient:
)
mock.reset_mock()
+ # test output
+ queued_at = pendulum.now()
+ started_at = pendulum.now()
+ mock.return_value = DagRun(
+ dag_id=test_dag_id,
+ run_id=run_id,
+ queued_at=queued_at,
+ execution_date=EXECDATE,
+ start_date=started_at,
+ external_trigger=True,
+ state=DagRunState.QUEUED,
+ conf={},
+ run_type=DagRunType.MANUAL,
+ data_interval=(EXECDATE, EXECDATE +
pendulum.duration(hours=1)),
+ )
+ expected_dag_run = {
+ "conf": {},
+ "dag_id": test_dag_id,
+ "dag_run_id": run_id,
+ "data_interval_end": EXECDATE,
+ "data_interval_start": EXECDATE + pendulum.duration(hours=1),
+ "end_date": None,
+ "external_trigger": True,
+ "last_scheduling_decision": None,
+ "logical_date": EXECDATE,
+ "run_type": DagRunType.MANUAL,
+ "start_date": started_at,
+ "state": DagRunState.QUEUED,
+ }
+ dag_run = self.client.trigger_dag(dag_id=test_dag_id)
+ assert expected_dag_run == dag_run
+ mock.reset_mock()
+
+ # test output when no DagRun is created
+ mock.return_value = None
+ dag_run = self.client.trigger_dag(dag_id=test_dag_id)
+ assert not dag_run
+ mock.reset_mock()
+
def test_delete_dag(self):
key = "my_dag_id"
diff --git a/tests/cli/commands/test_dag_command.py
b/tests/cli/commands/test_dag_command.py
index 419b9bdccc..e667b3b1cd 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import contextlib
import io
+import json
import os
import tempfile
from datetime import datetime, timedelta
@@ -608,6 +609,30 @@ class TestCliDags:
),
)
+ def test_trigger_dag_output_as_json(self):
+ args = self.parser.parse_args(
+ [
+ "dags",
+ "trigger",
+ "example_bash_operator",
+ "--run-id",
+ "trigger_dag_xxx",
+ "--conf",
+ '{"conf1": "val1", "conf2": "val2"}',
+ "--output=json",
+ ]
+ )
+ with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
+ dag_command.dag_trigger(args)
+ # get the last line from the logs ignoring all logging lines
+ out = temp_stdout.getvalue().strip().split("\n")[-1]
+ parsed_out = json.loads(out)
+
+ assert 1 == len(parsed_out)
+ assert "example_bash_operator" == parsed_out[0]["dag_id"]
+ assert "trigger_dag_xxx" == parsed_out[0]["dag_run_id"]
+ assert {"conf1": "val1", "conf2": "val2"} == parsed_out[0]["conf"]
+
def test_delete_dag(self):
DM = DagModel
key = "my_dag_id"