This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b4d50d3be1 Migrate Papermill example DAGs to new design #22456 (#24146)
b4d50d3be1 is described below
commit b4d50d3be1c9917182f231135b8312eb284f0f7f
Author: chethanuk-plutoflume <[email protected]>
AuthorDate: Sun Jun 5 16:33:43 2022 +0100
Migrate Papermill example DAGs to new design #22456 (#24146)
---
docs/apache-airflow-providers-papermill/index.rst | 2 +-
.../operators.rst | 9 ++++-
.../system/providers/papermill}/__init__.py | 0
.../providers/papermill}/example_papermill.py | 44 ++++------------------
.../papermill/example_papermill_verify.py | 30 ++++++---------
.../providers/papermill}/input_notebook.ipynb | 0
tests/www/api/experimental/test_endpoints.py | 2 +-
7 files changed, 28 insertions(+), 59 deletions(-)
diff --git a/docs/apache-airflow-providers-papermill/index.rst
b/docs/apache-airflow-providers-papermill/index.rst
index 7effd4b35c..54fe848e70 100644
--- a/docs/apache-airflow-providers-papermill/index.rst
+++ b/docs/apache-airflow-providers-papermill/index.rst
@@ -38,7 +38,7 @@ Content
:maxdepth: 1
:caption: Resources
- Example DAGs
<https://github.com/apache/airflow/tree/main/airflow/providers/papermill/example_dags>
+ Example DAGs
<https://github.com/apache/airflow/tree/main/tests/system/providers/papermill>
PyPI Repository
<https://pypi.org/project/apache-airflow-providers-papermill/>
Installing from sources <installing-providers-from-sources>
diff --git a/docs/apache-airflow-providers-papermill/operators.rst
b/docs/apache-airflow-providers-papermill/operators.rst
index 274d83adb9..c760eecaf0 100644
--- a/docs/apache-airflow-providers-papermill/operators.rst
+++ b/docs/apache-airflow-providers-papermill/operators.rst
@@ -50,8 +50,15 @@ Example DAG
Use the
:class:`~airflow.providers.papermill.operators.papermill.PapermillOperator`
to execute a jupyter notebook:
-.. exampleinclude::
/../../airflow/providers/papermill/example_dags/example_papermill.py
+.. exampleinclude::
/../../tests/system/providers/papermill/example_papermill.py
:language: python
:dedent: 4
:start-after: [START howto_operator_papermill]
:end-before: [END howto_operator_papermill]
+
+Example DAG to Verify the message in the notebook:
+
+.. exampleinclude::
/../../tests/system/providers/papermill/example_papermill_verify.py
+ :language: python
+ :start-after: [START howto_verify_operator_papermill]
+ :end-before: [END howto_verify_operator_papermill]
diff --git a/airflow/providers/papermill/example_dags/__init__.py
b/tests/system/providers/papermill/__init__.py
similarity index 100%
rename from airflow/providers/papermill/example_dags/__init__.py
rename to tests/system/providers/papermill/__init__.py
diff --git a/airflow/providers/papermill/example_dags/example_papermill.py
b/tests/system/providers/papermill/example_papermill.py
similarity index 61%
copy from airflow/providers/papermill/example_dags/example_papermill.py
copy to tests/system/providers/papermill/example_papermill.py
index c49b771579..8f828e4ed1 100644
--- a/airflow/providers/papermill/example_dags/example_papermill.py
+++ b/tests/system/providers/papermill/example_papermill.py
@@ -23,25 +23,23 @@ templated.
import os
from datetime import datetime, timedelta
-import scrapbook as sb
-
from airflow import DAG
-from airflow.decorators import task
-from airflow.lineage import AUTO
from airflow.providers.papermill.operators.papermill import PapermillOperator
START_DATE = datetime(2021, 1, 1)
SCHEDULE_INTERVAL = '0 0 * * *'
DAGRUN_TIMEOUT = timedelta(minutes=60)
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_papermill_operator"
with DAG(
- dag_id='example_papermill_operator',
+ dag_id=DAG_ID,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
dagrun_timeout=DAGRUN_TIMEOUT,
tags=['example'],
catchup=False,
-) as dag_1:
+) as dag:
# [START howto_operator_papermill]
run_this = PapermillOperator(
task_id="run_example_notebook",
@@ -51,35 +49,7 @@ with DAG(
)
# [END howto_operator_papermill]
+from tests.system.utils import get_test_run # noqa: E402
-@task
-def check_notebook(inlets, execution_date):
- """
- Verify the message in the notebook
- """
- notebook = sb.read_notebook(inlets[0].url)
- message = notebook.scraps['message']
- print(f"Message in notebook {message} for {execution_date}")
-
- if message.data != f"Ran from Airflow at {execution_date}!":
- return False
-
- return True
-
-
-with DAG(
- dag_id='example_papermill_operator_2',
- schedule_interval=SCHEDULE_INTERVAL,
- start_date=START_DATE,
- dagrun_timeout=DAGRUN_TIMEOUT,
- catchup=False,
-) as dag_2:
-
- run_this = PapermillOperator(
- task_id="run_example_notebook",
- input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)),
"input_notebook.ipynb"),
- output_nb="/tmp/out-{{ execution_date }}.ipynb",
- parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
- )
-
- run_this >> check_notebook(inlets=AUTO, execution_date="{{ execution_date
}}")
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/papermill/example_dags/example_papermill.py
b/tests/system/providers/papermill/example_papermill_verify.py
similarity index 79%
rename from airflow/providers/papermill/example_dags/example_papermill.py
rename to tests/system/providers/papermill/example_papermill_verify.py
index c49b771579..dd24fc51f6 100644
--- a/airflow/providers/papermill/example_dags/example_papermill.py
+++ b/tests/system/providers/papermill/example_papermill_verify.py
@@ -33,25 +33,11 @@ from airflow.providers.papermill.operators.papermill import
PapermillOperator
START_DATE = datetime(2021, 1, 1)
SCHEDULE_INTERVAL = '0 0 * * *'
DAGRUN_TIMEOUT = timedelta(minutes=60)
-
-with DAG(
- dag_id='example_papermill_operator',
- schedule_interval=SCHEDULE_INTERVAL,
- start_date=START_DATE,
- dagrun_timeout=DAGRUN_TIMEOUT,
- tags=['example'],
- catchup=False,
-) as dag_1:
- # [START howto_operator_papermill]
- run_this = PapermillOperator(
- task_id="run_example_notebook",
- input_nb="/tmp/hello_world.ipynb",
- output_nb="/tmp/out-{{ execution_date }}.ipynb",
- parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
- )
- # [END howto_operator_papermill]
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_papermill_operator_verify"
+# [START howto_verify_operator_papermill]
@task
def check_notebook(inlets, execution_date):
"""
@@ -68,12 +54,12 @@ def check_notebook(inlets, execution_date):
with DAG(
- dag_id='example_papermill_operator_2',
+ dag_id='example_papermill_operator_verify',
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
dagrun_timeout=DAGRUN_TIMEOUT,
catchup=False,
-) as dag_2:
+) as dag:
run_this = PapermillOperator(
task_id="run_example_notebook",
@@ -83,3 +69,9 @@ with DAG(
)
run_this >> check_notebook(inlets=AUTO, execution_date="{{ execution_date
}}")
+# [END howto_verify_operator_papermill]
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/papermill/example_dags/input_notebook.ipynb
b/tests/system/providers/papermill/input_notebook.ipynb
similarity index 100%
rename from airflow/providers/papermill/example_dags/input_notebook.ipynb
rename to tests/system/providers/papermill/input_notebook.ipynb
diff --git a/tests/www/api/experimental/test_endpoints.py
b/tests/www/api/experimental/test_endpoints.py
index 9710b38c68..8d5516a409 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -312,7 +312,7 @@ class TestApiExperimental(TestBase):
class TestLineageApiExperimental(TestBase):
- PAPERMILL_EXAMPLE_DAGS = os.path.join(ROOT_FOLDER, "airflow", "providers",
"papermill", "example_dags")
+ PAPERMILL_EXAMPLE_DAGS = os.path.join(ROOT_FOLDER, "tests", "system",
"providers", "papermill")
@pytest.fixture(scope="class", autouse=True)
def _populate_db(self):