jscheffl opened a new issue #15023:
URL: https://github.com/apache/airflow/issues/15023
**Apache Airflow version**: 2.0.1
**Kubernetes version (if you are using kubernetes)** (use `kubectl
version`): Tried both pip install and k8s image
**Environment**: Dev Workstation of K8s execution - both the same
- **OS** (e.g. from /etc/os-release): Ubuntu 20.04 LTS
- **Others**: Python 3.6
**What happened**:
We use Airflow 1.10.14 currently in production and have a couple of DAGs
defined today which digest a batch call. We implemented the batch (currently)
in a way that the jobs are provided as dag_run.conf as an array of dicts, e.g.
"[ { "job": "1" }, { "job": "2" } ]".
Trying to upgrade to Airflow 2.0.1 we see that such calls are still possible
to submit but all further actions are failing:
- It is not possible to query status via REST API, generates a HTTP 500
- DAG starts but all tasks fail.
- Logs can not be displayed (actually there are none produced on the file
system)
- Error logging is a bit complex, Celery worker does not provide meaningful
logs on console nor produces log files, running a scheduler as
SequentialExecutor reveals at least one meaningful sack trace as below
- (probably a couple of other internal logic is also failing
- Note that the dag_run.conf can be seen as submitted (so is correctly
received) in Browse--> DAG Runs menu
As a regression using the same dag and passing a dag_run.conf = "{ "batch":
[ { "job": "1" }, { "job": "2" } ] }" as well as "{}".
Example (simple) DAG to reproduce:
`from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
dag = DAG(
'test1',
description='My first DAG',
default_args={
'owner': 'jscheffl',
'email': ['***@***.de'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 5,
'retry_delay': timedelta(minutes=5),
},
start_date=days_ago(2)
)
hello_world = BashOperator(
task_id='hello_world',
bash_command='echo hello world',
dag=dag,
)`
Stack trace from SequentialExecutor:
`Traceback (most recent call last):
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/bin/airflow",
line 8, in <module>
sys.exit(main())
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/__main__.py",
line 40, in main
args.func(args)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/cli.py",
line 89, in wrapper
return f(*args, **kwargs)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
line 225, in task_run
ti.init_run_context(raw=args.raw)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1987, in init_run_context
self._set_context(self)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py",
line 54, in _set_context
set_context(self.log, context)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py",
line 174, in set_context
handler.set_context(value)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py",
line 56, in set_context
local_loc = self._init_file(ti)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py",
line 245, in _init_file
relative_path = self._render_filename(ti, ti.try_number)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py",
line 77, in _render_filename
jinja_context = ti.get_template_context()
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/session.py",
line 65, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1606, in get_template_context
self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)
File
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1743, in overwrite_params_with_dag_run_conf
params.update(dag_run.conf)
ValueError: dictionary update sequence element #0 has length 4; 2 is
required`
**What you expected to happen**:
- EITHER the submission of arrays as dag_run.conf is supported like in
1.10.14
- OR I would expect that the submission contains a validation if array
values are not supported by Airflow (which it seems it was at least working in
1.10)
**How to reproduce it**: See DAG code above, reproduce the error e.g. by
triggering with "[ "test" ]" as dag_run.conf
**Anything else we need to know**: I assume not :-)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]