jscheffl opened a new issue #15023:
URL: https://github.com/apache/airflow/issues/15023


   **Apache Airflow version**: 2.0.1
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): Tried both pip install and k8s image
   
   **Environment**: Dev Workstation of K8s execution - both the same
   
   - **OS** (e.g. from /etc/os-release): Ubuntu 20.04 LTS
   - **Others**: Python 3.6
   
   **What happened**:
   
   We use Airflow 1.10.14 currently in production and have a couple of DAGs 
defined today which digest a batch call. We implemented the batch (currently) 
in a way that the jobs are provided as dag_run.conf as an array of dicts, e.g. 
"[ { "job": "1" }, { "job": "2" } ]".
   Trying to upgrade to Airflow 2.0.1 we see that such calls are still possible 
to submit but all further actions are failing:
   - It is not possible to query status via REST API, generates a HTTP 500
   - DAG starts but all tasks fail.
   - Logs can not be displayed (actually there are none produced on the file 
system)
   - Error logging is a bit complex, Celery worker does not provide meaningful 
logs on console nor produces log files, running a scheduler as 
SequentialExecutor reveals at least one meaningful sack trace as below
   - (probably a couple of other internal logic is also failing
   - Note that the dag_run.conf can be seen as submitted (so is correctly 
received) in Browse--> DAG Runs menu
   
   As a regression using the same dag and passing a dag_run.conf = "{ "batch": 
[ { "job": "1" }, { "job": "2" } ] }" as well as "{}".
   
   Example (simple) DAG to reproduce:
   `from airflow import DAG
   from airflow.operators.bash import BashOperator
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   
   dag = DAG(
       'test1',
       description='My first DAG',
       default_args={
           'owner': 'jscheffl',
           'email': ['***@***.de'],
           'email_on_failure': True,
           'email_on_retry': True,
           'retries': 5,
           'retry_delay': timedelta(minutes=5),
       },
       start_date=days_ago(2)
   )
   
   hello_world = BashOperator(
       task_id='hello_world',
       bash_command='echo hello world',
       dag=dag,
   )`
   
   Stack trace from SequentialExecutor:
   `Traceback (most recent call last):
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/bin/airflow",
 line 8, in <module>
       sys.exit(main())
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/__main__.py",
 line 40, in main
       args.func(args)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/cli/cli_parser.py",
 line 48, in command
       return func(*args, **kwargs)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/cli.py",
 line 89, in wrapper
       return f(*args, **kwargs)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 225, in task_run
       ti.init_run_context(raw=args.raw)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1987, in init_run_context
       self._set_context(self)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py",
 line 54, in _set_context
       set_context(self.log, context)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py",
 line 174, in set_context
       handler.set_context(value)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py",
 line 56, in set_context
       local_loc = self._init_file(ti)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py",
 line 245, in _init_file
       relative_path = self._render_filename(ti, ti.try_number)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py",
 line 77, in _render_filename
       jinja_context = ti.get_template_context()
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/session.py",
 line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1606, in get_template_context
       self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)
     File 
"/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1743, in overwrite_params_with_dag_run_conf
       params.update(dag_run.conf)
   ValueError: dictionary update sequence element #0 has length 4; 2 is 
required`
   
   **What you expected to happen**:
   - EITHER the submission of arrays as dag_run.conf is supported like in 
1.10.14
   - OR I would expect that the submission contains a validation if array 
values are not supported by Airflow (which it seems it was at least working in 
1.10)
   
   **How to reproduce it**: See DAG code above, reproduce the error e.g. by 
triggering with "[ "test" ]" as dag_run.conf
   
   **Anything else we need to know**: I assume not :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to