RikHeijdens opened a new issue #12785:
URL: https://github.com/apache/airflow/issues/12785
**Apache Airflow version**: 2.0.0b3
**Kubernetes version (if you are using kubernetes)** (use `kubectl
version`): N/A
**Environment**:
- **OS** (e.g. from /etc/os-release): Debian GNU/Linux 10 (buster)
- **Kernel** (e.g. `uname -a`): Linux 6ae65b86e112 5.4.0-52-generic
#57-Ubuntu SMP Thu Oct 15 10:57:00 UTC 2020 x86_64 GNU/Linux
- **Others**: Python 3.8
**What happened**:
At JW Player we add additional macros to Airflow through a plugin. The
definition of this plugin looks like the following (simplified):
```
from airflow.plugins_manager import AirflowPlugin
from utils_plugin.macros.convert_image_tag import convert_image_tag
class JwUtilsPlugin(AirflowPlugin):
name = 'jw_utils'
macros = [convert_image_tag]
```
`convert_image_tag` is a function that takes a string (a docker tag) as
argument and resolves it to a SHA-256 hash that uniquely identifies an image by
querying the docker registry. I.e. it is a function that takes a string as
argument and returns a string.
In Airflow 1.10.x we can successfully use this macro in our DAGs to resolve
image tags to SHA-256 hashes, e.g. the following DAG will run an Alpine Image
using a DockerOperator:
```python
from datetime import datetime, timedelta
from airflow import DAG
try:
from airflow.providers.docker.operators.docker import DockerOperator
except ModuleNotFoundError:
from airflow.operators.docker_operator import DockerOperator
now = datetime.now()
with DAG('test_dag',
schedule_interval='*/15 * * * *',
default_args={
'owner': 'airflow',
'start_date': datetime.utcnow() - timedelta(hours=1),
'task_concurrency': 1,
'execution_timeout': timedelta(minutes=5)
},
max_active_runs=1) as dag:
task_sleep = DockerOperator(
task_id='task_sleep',
image=f"{{ macros.jw_utils.convert_image_tag('alpine') }}",
command=['sleep', '10']
)
```
This is in contrast to Airflow 2.0, if we attempt to use our custom macro
here, then when Airflow attempts to render the task template it will error out
with the following error:
```
[2020-12-03 12:54:43,666] {{taskinstance.py:1402}} ERROR - 'module object'
has no attribute 'jw_utils'
Traceback (most recent call last):
File
"/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line
1087, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File
"/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line
1224, in _prepare_and_execute_task_with_callbacks
self.render_templates(context=context)
File
"/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line
1690, in render_templates
self.task.render_template_fields(context)
File
"/usr/local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line
857, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context,
jinja_env, set())
File
"/usr/local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line
870, in _do_render_template_fields
rendered_content = self.render_template(content, context, jinja_env,
seen_oids)
File
"/usr/local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line
907, in render_template
return jinja_env.from_string(content).render(**context)
File "/usr/local/lib/python3.8/site-packages/jinja2/environment.py", line
1090, in render
self.environment.handle_exception()
File "/usr/local/lib/python3.8/site-packages/jinja2/environment.py", line
832, in handle_exception
reraise(*rewrite_traceback_stack(source=source))
File "/usr/local/lib/python3.8/site-packages/jinja2/_compat.py", line 28,
in reraise
raise value.with_traceback(tb)
File "<template>", line 1, in top-level template code
File "/usr/local/lib/python3.8/site-packages/jinja2/environment.py", line
471, in getattr
return getattr(obj, attribute)
jinja2.exceptions.UndefinedError: 'module object' has no attribute 'jw_utils'
```
**What you expected to happen**:
I would have expected that the DAG definition from above would have worked
in Airflow 2.0, like it would have functioned in Airflow 1.10.x.
**How to reproduce it**:
This bug can be reproduced by creating a plugin that adds a macro, and then
attempting to use that macro in a DAG.
**Anything else we need to know**:
In order to better understand the issue, I did a bit of digging. The plugin
that we extend Airflow's functionality with has its own suite of pytest
testcases. Since we are in the process of preparing for a transition to Airflow
2.0 we are now running the unit tests for this plugin against both Airflow
1.10.x and Airflow 2.0.0b3.
After reviewing how plugins were being loaded in Airflow, I've added the
following testcase to mimic how plugins were being loaded and how
[`get_template_context()`](https://github.com/apache/airflow/blob/2.0.0b3/airflow/models/taskinstance.py#L1481)
in Airflow 2.0 ensures that plugins have been imported:
```python
def test_macro_namespacing(is_airflow_1):
"""
Tests whether macros can be loaded from Airflow's namespace after
loading plugins.
"""
from airflow import macros
if not is_airflow_1:
# In Airflow 2.x, we need to make sure we invoke
integrate_macros_plugins(), otherwise
# the namespace will not be created properly.
from airflow.plugins_manager import integrate_macros_plugins
integrate_macros_plugins()
from utils_plugin.plugin import JwUtilsPlugin
# After Airflow has loaded the plugins, the macros should be available
as airflow.macros.jw_utils.
macros_module = import_module(f"airflow.macros.{JwUtilsPlugin.name}")
for macro in JwUtilsPlugin.macros:
# Verify that macros have been registered correctly.
assert hasattr(macros_module, macro.__name__)
# However, in order for the module to actually be allowed to be used in
templates, it must also exist on
# airflow.macros.
assert hasattr(macros, 'jw_utils')
```
This test case passes when being ran on Airflow 1.10, but surprisngly enough
it fails on Airflow 2.x. Specifically it fails on the `assert hasattr(macros,
'jw_utils')` statement in Airflow 2.0. This statement tests whether the macros
that we create through the `JwUtilsPlugin` have been properly added to
`airflow.macros`.
I thought it was strange for the test-case to fail on this module, given
that the `import_module()` statement succeeded in Airflow 2.0. After this
observation I started comparing the logic for registering macros in Airflow
1.10.x to the Airflow 2.0.0 implementation.
While doing this I observed that the plugin loading mechanism in Airflow
1.10.x works because Airflow [automatically
discovers](https://github.com/apache/airflow/blob/1.10.13/airflow/__init__.py#L104)
all plugins through the `plugins_manager` module. When this happens it
automatically [initializes plugin-macro
modules](https://github.com/apache/airflow/blob/1.10.13/airflow/plugins_manager.py#L306)
in the `airflow.macros` namespace. Notably, after the plugin's module has been
initialized it will also automatically be registered on the `airflow.macros`
module [by updating the
dictionary](https://github.com/apache/airflow/blob/1.10.13/airflow/macros/__init__.py#L93)
returned by `globals()`.
This is in contrast to Airflow 2.0, where plugins are no longer loaded
automatically. Instead they are being loaded lazily, i.e. they will be loaded
on-demand whenever a function needs them. In order to load macros (or ensure
that macros have been loaded), modules need to import the
[`integrate_macros_plugins`](https://github.com/apache/airflow/blob/2.0.0b3/airflow/plugins_manager.py#L395)
function from `airflow.plugins_manager`.
When Airflow attempts to prepare a template context, prior to running a
task, it properly imports this function and invokes it in
[taskinstance.py](https://github.com/apache/airflow/blob/2.0.0b3/airflow/models/taskinstance.py#L1483).
However, in contrast to the old 1.10.x implementation, this function does not
update the symbol table of `airflow.macros`. The result of this is that the
macros from the plugin _will in fact_ be imported, but because `airflow.macros`
symbol table itself is not being updated, the macros that are being added by
the plugins can not be used in the template rendering context.
I believe this issue could be solved by ensuring that
`integrate_macros_plugins` sets a reference to the `airflow.macros.jw_utils` as
`jw_utils` on the `airflow.macros` module. Once that has been done I believe
macros provided through plugins are functional again.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]