[ 
https://issues.apache.org/jira/browse/AIRFLOW-2965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16609889#comment-16609889
 ] 

ASF GitHub Bot commented on AIRFLOW-2965:
-----------------------------------------

r39132 closed pull request #3834: [AIRFLOW-2965] CLI tool to show the next 
execution datetime
URL: https://github.com/apache/incubator-airflow/pull/3834
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e22427cf40..c38116e6c0 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -551,6 +551,31 @@ def dag_state(args):
     print(dr[0].state if len(dr) > 0 else None)
 
 
+@cli_utils.action_logging
+def next_execution(args):
+    """
+    Returns the next execution datetime of a DAG at the command line.
+    >>> airflow next_execution tutorial
+    2018-08-31 10:38:00
+    """
+    dag = get_dag(args)
+
+    if dag.is_paused:
+        print("[INFO] Please be reminded this DAG is PAUSED now.")
+
+    if dag.latest_execution_date:
+        next_execution_dttm = dag.following_schedule(dag.latest_execution_date)
+
+        if next_execution_dttm is None:
+            print("[WARN] No following schedule can be found. " +
+                  "This DAG may have schedule interval '@once' or `None`.")
+
+        print(next_execution_dttm)
+    else:
+        print("[WARN] Only applicable when there is execution record found for 
the DAG.")
+        print(None)
+
+
 @cli_utils.action_logging
 def list_dags(args):
     dagbag = DagBag(process_subdir(args.subdir))
@@ -1986,6 +2011,11 @@ class CLIFactory(object):
             'func': sync_perm,
             'help': "Update existing role's permissions.",
             'args': tuple(),
+        },
+        {
+            'func': next_execution,
+            'help': "Get the next execution datetime of a DAG.",
+            'args': ('dag_id', 'subdir')
         }
     )
     subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 616b9a0f16..93ec0576e6 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -20,9 +20,12 @@
 
 import unittest
 
+from datetime import datetime, timedelta, time
 from mock import patch, Mock, MagicMock
 from time import sleep
 import psutil
+import pytz
+import subprocess
 from argparse import Namespace
 from airflow import settings
 from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
@@ -165,3 +168,80 @@ def test_local_run(self):
             ti.refresh_from_db()
             state = ti.current_state()
             self.assertEqual(state, State.SUCCESS)
+
+    def test_next_execution(self):
+        # A scaffolding function
+        def reset_dr_db(dag_id):
+            session = Session()
+            dr = session.query(models.DagRun).filter_by(dag_id=dag_id)
+            dr.delete()
+            session.commit()
+            session.close()
+
+        EXAMPLE_DAGS_FOLDER = os.path.join(
+            os.path.dirname(
+                os.path.dirname(
+                    os.path.dirname(os.path.realpath(__file__))
+                )
+            ),
+            "airflow/example_dags"
+        )
+
+        dagbag = models.DagBag(dag_folder=EXAMPLE_DAGS_FOLDER,
+                               include_examples=False)
+        dag_ids = ['example_bash_operator',  # schedule_interval is '0 0 * * *'
+                   'latest_only',  # schedule_interval is timedelta(hours=4)
+                   'example_python_operator',  # schedule_interval=None
+                   'example_xcom']  # schedule_interval="@once"
+
+        # The details below is determined by the schedule_interval of example 
DAGs
+        now = timezone.utcnow()
+        next_execution_time_for_dag1 = pytz.utc.localize(
+            datetime.combine(
+                now.date() + timedelta(days=1),
+                time(0)
+            )
+        )
+        next_execution_time_for_dag2 = now + timedelta(hours=4)
+        expected_output = [str(next_execution_time_for_dag1),
+                           str(next_execution_time_for_dag2),
+                           "None",
+                           "None"]
+
+        for i in range(len(dag_ids)):
+            dag_id = dag_ids[i]
+
+            # Clear dag run so no execution history fo each DAG
+            reset_dr_db(dag_id)
+
+            p = subprocess.Popen(["airflow", "next_execution", dag_id,
+                                  "--subdir", EXAMPLE_DAGS_FOLDER],
+                                 stdout=subprocess.PIPE)
+            p.wait()
+            stdout = []
+            for line in p.stdout:
+                stdout.append(str(line.decode("utf-8").rstrip()))
+
+            # `next_execution` function is inapplicable if no execution record 
found
+            # It prints `None` in such cases
+            self.assertEqual(stdout[-1], "None")
+
+            dag = dagbag.dags[dag_id]
+            # Create a DagRun for each DAG, to prepare for next step
+            dag.create_dagrun(
+                run_id='manual__' + now.isoformat(),
+                execution_date=now,
+                start_date=now,
+                state=State.FAILED
+            )
+
+            p = subprocess.Popen(["airflow", "next_execution", dag_id,
+                                  "--subdir", EXAMPLE_DAGS_FOLDER],
+                                 stdout=subprocess.PIPE)
+            p.wait()
+            stdout = []
+            for line in p.stdout:
+                stdout.append(str(line.decode("utf-8").rstrip()))
+            self.assertEqual(stdout[-1], expected_output[i])
+
+            reset_dr_db(dag_id)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add CLI command to find the next dag run.
> -----------------------------------------
>
>                 Key: AIRFLOW-2965
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2965
>             Project: Apache Airflow
>          Issue Type: Task
>    Affects Versions: 1.10.0
>            Reporter: jack
>            Assignee: Xiaodong DENG
>            Priority: Minor
>
> I have a dag with the following properties:
> {code:java}
> dag = DAG(
>     dag_id='mydag',
>     default_args=args,
>     schedule_interval='0 1 * * *',
>     max_active_runs=1,
>     catchup=False){code}
>  
>  
> This runs great.
> Last run is: 2018-08-26 01:00  (start date is 2018-08-27 01:00)
>  
> Now it's 2018-08-27 17:55 I decided to change my dag to:
>  
> {code:java}
> dag = DAG(
>     dag_id='mydag',
>     default_args=args,
>     schedule_interval='0 23 * * *',
>     max_active_runs=1,
>     catchup=False){code}
>  
> Now, I have no idea when will be the next dag run.
> Will it be today at 23:00? I can't be sure when the cycle is complete. I'm 
> not even sure that this change will do what I wish.
> I'm sure you guys are expert and you can answer this question but most of us 
> wouldn't know.
>  
> The scheduler has the knowledge when the dag is available for running. All 
> I'm asking is to take that knowledge and create a CLI command that I will 
> give the dag_id and it will tell me the next date/hour which my dag will be 
> runnable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to