RikHeijdens opened a new issue #12785:
URL: https://github.com/apache/airflow/issues/12785


   **Apache Airflow version**: 2.0.0b3
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): N/A
   **Environment**:
   
   - **OS** (e.g. from /etc/os-release): Debian GNU/Linux 10 (buster)
   - **Kernel** (e.g. `uname -a`): Linux 6ae65b86e112 5.4.0-52-generic 
#57-Ubuntu SMP Thu Oct 15 10:57:00 UTC 2020 x86_64 GNU/Linux
   - **Others**: Python 3.8
   
   **What happened**:
   
   At JW Player we add additional macros to Airflow through a plugin. The 
definition of this plugin looks like the following (simplified):
   
   ```
   from airflow.plugins_manager import AirflowPlugin
   from utils_plugin.macros.convert_image_tag import convert_image_tag
   
   
   class JwUtilsPlugin(AirflowPlugin):
       name = 'jw_utils'
       macros = [convert_image_tag]
   ```
   
   `convert_image_tag` is a function that takes a string (a docker tag) as 
argument and resolves it to a SHA-256 hash that uniquely identifies an image by 
querying the docker registry. I.e. it is a function that takes a string as 
argument and returns a string.
   
   In Airflow 1.10.x we can successfully use this macro in our DAGs to resolve 
image tags to SHA-256 hashes, e.g. the following DAG will run an Alpine Image 
using a DockerOperator:
   
   ```python
   from datetime import datetime, timedelta
   from airflow import DAG
   
   try:
       from airflow.providers.docker.operators.docker import DockerOperator
   except ModuleNotFoundError:
       from airflow.operators.docker_operator import DockerOperator
   
   now = datetime.now()
   
   with DAG('test_dag',
            schedule_interval='*/15 * * * *',
            default_args={
                'owner': 'airflow',
                'start_date': datetime.utcnow() - timedelta(hours=1),
                'task_concurrency': 1,
                'execution_timeout': timedelta(minutes=5)
            },
            max_active_runs=1) as dag:
       
       task_sleep = DockerOperator(
           task_id='task_sleep',
           image=f"{{ macros.jw_utils.convert_image_tag('alpine') }}",
           command=['sleep', '10']
       )
   ```
   
   This is in contrast to Airflow 2.0, if we attempt to use our custom macro 
here, then when Airflow attempts to render the task template it will error out 
with the following error:
   
   ```
   [2020-12-03 12:54:43,666] {{taskinstance.py:1402}} ERROR - 'module object' 
has no attribute 'jw_utils'
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 
1087, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 
1224, in _prepare_and_execute_task_with_callbacks
       self.render_templates(context=context)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 
1690, in render_templates
       self.task.render_template_fields(context)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 
857, in render_template_fields
       self._do_render_template_fields(self, self.template_fields, context, 
jinja_env, set())
     File 
"/usr/local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 
870, in _do_render_template_fields
       rendered_content = self.render_template(content, context, jinja_env, 
seen_oids)
     File 
"/usr/local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 
907, in render_template
       return jinja_env.from_string(content).render(**context)
     File "/usr/local/lib/python3.8/site-packages/jinja2/environment.py", line 
1090, in render
       self.environment.handle_exception()
     File "/usr/local/lib/python3.8/site-packages/jinja2/environment.py", line 
832, in handle_exception
       reraise(*rewrite_traceback_stack(source=source))
     File "/usr/local/lib/python3.8/site-packages/jinja2/_compat.py", line 28, 
in reraise
       raise value.with_traceback(tb)
     File "<template>", line 1, in top-level template code
     File "/usr/local/lib/python3.8/site-packages/jinja2/environment.py", line 
471, in getattr
       return getattr(obj, attribute)
   jinja2.exceptions.UndefinedError: 'module object' has no attribute 'jw_utils'
   ```
   
   **What you expected to happen**:
   
   I would have expected that the DAG definition from above would have worked 
in Airflow 2.0, like it would have functioned in Airflow 1.10.x.
   
   **How to reproduce it**:
   
   This bug can be reproduced by creating a plugin that adds a macro, and then 
attempting to use that macro in a DAG.
   
   **Anything else we need to know**:
   
   In order to better understand the issue, I did a bit of digging. The plugin 
that we extend Airflow's functionality with has its own suite of pytest 
testcases. Since we are in the process of preparing for a transition to Airflow 
2.0 we are now running the unit tests for this plugin against both Airflow 
1.10.x and Airflow 2.0.0b3.
   
   After reviewing how plugins were being loaded in Airflow, I've added the 
following testcase to mimic how plugins were being loaded and how  
[`get_template_context()`](https://github.com/apache/airflow/blob/2.0.0b3/airflow/models/taskinstance.py#L1481)
 in Airflow 2.0 ensures that plugins have been imported:
   
   ```python
   def test_macro_namespacing(is_airflow_1):
       """
       Tests whether macros can be loaded from Airflow's namespace after 
loading plugins.
       """
       from airflow import macros
       if not is_airflow_1:
           # In Airflow 2.x, we need to make sure we invoke 
integrate_macros_plugins(), otherwise
           # the namespace will not be created properly.
           from airflow.plugins_manager import integrate_macros_plugins
           integrate_macros_plugins()
       from utils_plugin.plugin import JwUtilsPlugin
   
       # After Airflow has loaded the plugins, the macros should be available 
as airflow.macros.jw_utils.
       macros_module = import_module(f"airflow.macros.{JwUtilsPlugin.name}")
   
       for macro in JwUtilsPlugin.macros:
           # Verify that macros have been registered correctly.
           assert hasattr(macros_module, macro.__name__)
   
       # However, in order for the module to actually be allowed to be used in 
templates, it must also exist on
       # airflow.macros.
       assert hasattr(macros, 'jw_utils')
   ```
   
   This test case passes when being ran on Airflow 1.10, but surprisngly enough 
it fails on Airflow 2.x. Specifically it fails on the `assert hasattr(macros, 
'jw_utils')` statement in Airflow 2.0. This statement tests whether the macros 
that we create through the `JwUtilsPlugin` have been properly added to 
`airflow.macros`.
   
   I thought it was strange for the test-case to fail on this module, given 
that the `import_module()` statement succeeded in Airflow 2.0. After this 
observation I started comparing the logic for registering macros in Airflow 
1.10.x to the Airflow 2.0.0 implementation.
   
   While doing this I observed that the plugin loading mechanism in Airflow 
1.10.x works because Airflow [automatically 
discovers](https://github.com/apache/airflow/blob/1.10.13/airflow/__init__.py#L104)
 all plugins through the `plugins_manager` module. When this happens it 
automatically [initializes plugin-macro 
modules](https://github.com/apache/airflow/blob/1.10.13/airflow/plugins_manager.py#L306)
 in the `airflow.macros` namespace. Notably, after the plugin's module has been 
initialized it will also automatically be registered on the `airflow.macros` 
module [by updating the 
dictionary](https://github.com/apache/airflow/blob/1.10.13/airflow/macros/__init__.py#L93)
 returned by `globals()`.
   
   This is in contrast to Airflow 2.0, where plugins are no longer loaded 
automatically. Instead they are being loaded lazily, i.e. they will be loaded 
on-demand whenever a function needs them. In order to load macros (or ensure 
that macros have been loaded), modules need to import the 
[`integrate_macros_plugins`](https://github.com/apache/airflow/blob/2.0.0b3/airflow/plugins_manager.py#L395)
 function from `airflow.plugins_manager`.
   
   When Airflow attempts to prepare a template context, prior to running a 
task, it properly imports this function and invokes it in 
[taskinstance.py](https://github.com/apache/airflow/blob/2.0.0b3/airflow/models/taskinstance.py#L1483).
 However, in contrast to the old 1.10.x implementation, this function does not 
update the symbol table of `airflow.macros`. The result of this is that the 
macros from the plugin _will in fact_ be imported, but because `airflow.macros` 
symbol table itself is not being updated, the macros that are being added by 
the plugins can not be used in the template rendering context.
   
   I believe this issue could be solved by ensuring that 
`integrate_macros_plugins` sets a reference to the `airflow.macros.jw_utils` as 
`jw_utils` on the `airflow.macros` module. Once that has been done I believe 
macros provided through plugins are functional again.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to