This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2690c79a5d141e869437e51ea98249812837a963
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Sep 6 10:40:17 2021 +0200

    Improve the description of how to handle dynamic task generation (#17963)
    
    The Top-Level best practices were a little misleading. They
    suggested that no code should be written at the top-level DAG other
    than just creating operators, but the story is a little more nuanced.
    
    Better explanation is give and also examples on how you can deal
    with the situation when you need to generate your data based on
    some meta-data. From Slack discussion it seems that it is not
    obvious at all what are the best ways to handle that so two
    alternatives were presented with generating a meta-data file
    and generating an importable python code containing the meta-data.
    
    During that change, I noticed also, that config sections and
    config variables were not sorted - which made it very difficult to
    search for them in the index. All the config variables are now
    sorted so the references to the righ sections/variables make much
    more sense now.
    
    (cherry picked from commit 1be3ef635fab635f741b775c52e0da7fe0871567)
---
 airflow/configuration.py               |   4 +-
 docs/apache-airflow/best-practices.rst | 150 ++++++++++++++++++++++++++++-----
 docs/conf.py                           |   6 ++
 3 files changed, 139 insertions(+), 21 deletions(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 452f127..16151dc 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -32,7 +32,7 @@ from collections import OrderedDict
 # Ignored Mypy on configparser because it thinks the configparser module has 
no _UNSET attribute
 from configparser import _UNSET, ConfigParser, NoOptionError, NoSectionError  
# type: ignore
 from json.decoder import JSONDecodeError
-from typing import Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Union
 
 from airflow.exceptions import AirflowConfigException
 from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
@@ -92,7 +92,7 @@ def _default_config_file_path(file_name: str):
     return os.path.join(templates_dir, file_name)
 
 
-def default_config_yaml() -> List[dict]:
+def default_config_yaml() -> List[Dict[str, Any]]:
     """
     Read Airflow configs from YAML file
 
diff --git a/docs/apache-airflow/best-practices.rst 
b/docs/apache-airflow/best-practices.rst
index 7d06192..6b88776 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -55,7 +55,7 @@ Some of the ways you can avoid producing a different result -
   Someone may update the input data between re-runs, which results in 
different outputs.
   A better way is to read the input data from a specific partition. You can 
use ``execution_date`` as a partition.
   You should follow this partitioning method while writing data in S3/HDFS, as 
well.
-* The python datetime ``now()`` function gives the current datetime object.
+* The Python datetime ``now()`` function gives the current datetime object.
   This function should never be used inside a task, especially to do the 
critical computation, as it leads to different outcomes on each run.
   It's fine to use it, for example, to generate a temporary log.
 
@@ -88,20 +88,121 @@ and the downstream tasks can pull the path from XCom and 
use it to read the data
 The tasks should also not store any authentication parameters such as 
passwords or token inside them.
 Where at all possible, use :doc:`Connections </concepts/connections>` to store 
data securely in Airflow backend and retrieve them using a unique connection id.
 
+Top level Python Code and Dynamic DAGs
+--------------------------------------
 
-Variables
----------
+You should avoid writing the top level code which is not necessary to create 
Operators
+and build DAG relations between them. This is because of the design decision 
for the scheduler of Airflow
+and the impact the top-level code parsing speed on both performance and 
scalability of Airflow.
 
-You should avoid usage of Variables outside an operator's ``execute()`` method 
or Jinja templates if possible,
-as Variables create a connection to metadata DB of Airflow to fetch the value, 
which can slow down parsing and
-place extra load on the DB.
+Airflow scheduler executes the code outside the Operator's ``execute`` methods 
with the minimum interval of
+:ref:`min_file_process_interval<config:scheduler__min_file_process_interval>` 
seconds. This is done in order
+to allow dynamic scheduling of the DAGs - where scheduling and dependencies 
might change over time and
+impact the next schedule of the DAG. Airflow scheduler tries to continuously 
make sure that what you have
+in DAGs is correctly reflected in scheduled tasks.
 
-Airflow parses all the DAGs in the background at a specific period.
-The default period is set using the ``processor_poll_interval`` config, which 
is 1 second by default.
-During parsing, Airflow creates a new connection to the metadata DB for each 
DAG.
-This can result in a lot of open connections.
+Specifically you should not run any database access, heavy computations and 
networking operations.
+
+This limitation is especially important in case of dynamic DAG configuration, 
which can be configured
+essentially in one of those ways:
+
+* via `environment variables 
<https://wiki.archlinux.org/title/environment_variables>`_ (not to be mistaken
+  with the :doc:`Airflow Variables </concepts/variables>`)
+* via externally provided, generated Python code, containing meta-data in the 
DAG folder
+* via externally provided, generated configuration meta-data file in the DAG 
folder
+
+All cases are described in the following chapters.
+
+Dynamic DAGs with environment variables
+.......................................
+
+If you want to use variables to configure your code, you should always use
+`environment variables 
<https://wiki.archlinux.org/title/environment_variables>`_ in your
+top-level code rather than :doc:`Airflow Variables </concepts/variables>`. 
Using Airflow Variables
+at top-level code creates a connection to metadata DB of Airflow to fetch the 
value, which can slow
+down parsing and place extra load on the DB. See the `Airflow Variables 
<_best_practices/airflow_variables>`_
+on how to make best use of Airflow Variables in your DAGs using Jinja 
templates .
+
+For example you could set ``DEPLOYMENT`` variable differently for your 
production and development
+environments. The variable ``DEPLOYMENT`` could be set to ``PROD`` in your 
production environment and to
+``DEV`` in your development environment. Then you could build your dag 
differently in production and
+development environment, depending on the value of the environment variable.
+
+.. code-block:: python
+
+    deployment = os.environ.get("DEPLOYMENT", "PROD")
+    if deployment == "PROD":
+        task = Operator(param="prod-param")
+    elif deployment == "DEV":
+        task = Operator(param="dev-param")
+
+
+Generating Python code with embedded meta-data
+..............................................
+
+You can externally generate Python code containing the meta-data as importable 
constants.
+Such constant can then be imported directly by your DAG and used to construct 
the object and build
+the dependencies. This makes it easy to import such code from multiple DAGs 
without the need to find,
+load and parse the meta-data stored in the constant - this is done 
automatically by Python interpreter
+when it processes the "import" statement. This sounds strange at first, but it 
is surprisingly easy
+to generate such code and make sure this is a valid Python code that you can 
import from your DAGs.
+
+For example assume you dynamically generate (in your DAG folder), the 
``my_company_utils/common.py`` file:
+
+.. code-block:: python
+
+    # This file is generated automatically !
+    ALL_TASKS = ["task1", "task2", "task3"]
+
+Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like 
that:
+
+.. code-block:: python
+
+    from my_company_utils.common import ALL_TASKS
+
+    with DAG(dag_id="my_dag", schedule_interval=None, start_date=days_ago(2)) 
as dag:
+        for task in ALL_TASKS:
+            # create your operators and relations here
+            pass
+
+Don't forget that in this case you need to add empty ``__init__.py`` file in 
the ``my_company_utils`` folder
+and you should add the ``my_company_utils/.*`` line to ``.airflowignore`` 
file, so that the whole folder is
+ignored by the scheduler when it looks for DAGs.
+
+
+Dynamic DAGs with external configuration from a structured data file
+....................................................................
+
+If you need to use a more complex meta-data to prepare your DAG structure and 
you would prefer to keep the
+data in a structured non-python format, you should export the data to the DAG 
folder in a file and push
+it to the DAG folder, rather than try to pull the data by the DAG's top-level 
code - for the reasons
+explained in the parent `Top level Python code <_top-level-python-code>`_.
+
+The meta-data should be exported and stored together with the DAGs in a 
convenient file format (JSON, YAML
+formats are good candidates) in DAG folder. Ideally, the meta-data should be 
published in the same
+package/folder as the module of the DAG file you load it from, because then 
you can find location of
+the meta-data file in your DAG easily. The location of the file to read can be 
found using the
+``__file__`` attribute of the module containing the DAG:
+
+.. code-block:: python
+
+    my_dir = os.path.dirname(os.path.abspath(__file__))
+    configuration_file_path = os.path.join(my_dir, "config.yaml")
+    with open(configuration_file_path) as yaml_file:
+        configuration = yaml.safe_load(yaml_file)
+    # Configuration dict is available here
+
+
+.. _best_practices/airflow_variables:
+
+Airflow Variables
+-----------------
+
+As mentioned in the previous chapter, `Top level Python code 
<_top-level-python-code>`_. you should avoid
+using Airflow Variables at top level Python code of DAGs. You can use the 
Airflow Variables freely inside the
+``execute()`` methods of the operators, but you can also pass the Airflow 
Variables to the existing operators
+via Jinja template, which will delay reading the value until the task 
execution.
 
-The best way of using variables is via a Jinja template, which will delay 
reading the value until the task execution.
 The template syntax to do this is:
 
 .. code-block::
@@ -117,17 +218,28 @@ or if you need to deserialize a json object from the 
variable :
 For security purpose, you're recommended to use the :ref:`Secrets 
Backend<secrets_backend_configuration>`
 for any variable that contains sensitive data.
 
-An alternative option is to use environment variables in the top-level Python 
code or use environment variables to
-create and manage Airflow variables. This will avoid new connections to 
Airflow metadata DB every time
-Airflow parses the Python file. For more information, see: 
:ref:`managing_variables`.
+Triggering DAGs after changes
+-----------------------------
+
+Avoid triggering DAGs immediately after changing them or any other 
accompanying files that you change in the
+DAG folder.
 
-Top level Python Code
----------------------
+You should give the system sufficient time to process the changed files. This 
takes several steps.
+First the files have to be distributed to scheduler - usually via distributed 
filesystem or Git-Sync, then
+scheduler has to parse the Python files and store them in the database. 
Depending on your configuration,
+speed of your distributed filesystem, number of files, number of DAGs, number 
of changes in the files,
+sizes of the files, number of schedulers, speed of CPUS, this can take from 
seconds to minutes, in extreme
+cases many minutes. You should wait for your DAG to appear in the UI to be 
able to trigger it.
 
-In general, you should not write any code outside of defining Airflow 
constructs like Operators. The code outside the
-tasks runs every time Airflow parses an eligible python file, which happens at 
the minimum frequency of
-:ref:`min_file_process_interval<config:scheduler__min_file_process_interval>` 
seconds.
+In case you see long delays between updating it and the time it is ready to be 
triggered, you can look
+at the following configuration parameters and fine tune them according your 
needs (see details of
+each parameter by following the links):
 
+* :ref:`config:scheduler__processor_poll_interval`
+* :ref:`config:scheduler__min_file_process_interval`
+* :ref:`config:scheduler__dag_dir_list_interval`
+* :ref:`config:scheduler__parsing_processes`
+* :ref:`config:scheduler__file_parsing_sort_mode`
 
 Testing a DAG
 ^^^^^^^^^^^^^
diff --git a/docs/conf.py b/docs/conf.py
index 975ed16..0197e42 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -352,6 +352,12 @@ if PACKAGE_NAME == 'apache-airflow':
             for key in keys_to_format:
                 if option[key] and "{{" in option[key]:
                     option[key] = option[key].replace("{{", "{").replace("}}", 
"}")
+    # Sort options, config and deprecated options for JINJA variables to 
display
+    for config in configs:
+        config["options"] = sorted(config["options"], key=lambda o: o["name"])
+    configs = sorted(configs, key=lambda l: l["name"])
+    for section in deprecated_options:
+        deprecated_options[section] = {k: v for k, v in 
sorted(deprecated_options[section].items())}
 
     jinja_contexts = {
         'config_ctx': {"configs": configs, "deprecated_options": 
deprecated_options},

Reply via email to