koliankolin commented on issue #14975:
URL: https://github.com/apache/airflow/issues/14975#issuecomment-808853781


   UPD: I found out reasons that cause this problem.
   
   As I mentioned before bug is reproduced with filling pickle_id in dag table. 
For that purpose we use DagModel.
   
   I should start with a fact that creating task_instance to launch happens 
differently. For example using DebugExecutor we import TaskInstance and create 
simple instance of it, using cli command (airflow tasks run --pickle 1) we also 
use logic as in DebugExecutor but when _scheduler_ is used logic is changed. In 
the file scheduler_job.py creating task_instance objects is placed in
   method 
**airflow.jobs.scheduler_job.SchedulerJob._executable_task_instances_to_queued**
 and there is such kind of code:
   ```
   query = (
       session.query(TI)
       .outerjoin(TI.dag_run)
       .filter(or_(DR.run_id.is_(None), DR.run_type != DagRunType.BACKFILL_JOB))
       .join(TI.dag_model)
       .filter(not_(DM.is_paused))
       .filter(TI.state == State.SCHEDULED)
       .options(selectinload('dag_model'))
       .limit(max_tis)
   )
   ```
   As you can see we can reproduce bug only using scheduler because only in 
this case we enrich our task_instance with DagModel. In different words only 
using _scheduler_ we add pickle_id to task_instance. Ok, keep this fact in mind 
and move on :)
   
   Next we use method 
**airflow.models.taskinstance.TaskInstance.generate_command** to create list 
with command and previously kept fact we get such kind of list:
   ```
   ['airflow', 'tasks', 'run', 'dummy', 'init', 
'2021-03-26T11:30:20.470827+00:00', '--pickle', '11', '--local', '--pool', 
'default_pool', '--subdir', '/usr/local/airflow/dags/dag_generator.py']
   ```
   Ok, after forming this command list instance of 
**airflow.task.task_runner.standard_task_runner.StandardTaskRunner** is created 
and in our case method 
**airflow.task.task_runner.standard_task_runner.StandardTaskRunner._start_by_fork**
 is used to start task_instance. Next all magic is happened :)
   
   In this method we feed argument parser with command list and go with this 
args to such kind of code:
   ```
   parser = get_parser()
   # [1:] - remove "airflow" from the start of the command
   args = parser.parse_args(self._command[1:])
   
   self.log.info('Running: %s', self._command)
   self.log.info('Job %s: Subtask %s', self._task_instance.job_id, 
self._task_instance.task_id)
   
   proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
   if hasattr(args, "job_id"):
       proc_title += " {0.job_id}"
   setproctitle(proc_title.format(args))
   
   try:
       args.func(args, dag=self.dag)
       return_code = 0
   except Exception:  # pylint: disable=broad-except
       return_code = 1
   finally:
       # Explicitly flush any pending exception to Sentry if enabled
       Sentry.flush()
       logging.shutdown()
       os._exit(return_code)  # pylint: disable=protected-access
   ``` 
   Lines with try-except block are very interesting. **We can see that all kind 
of exceptions are excepted and nothing is written to logs.** I think that is 
not a good practice but I didn't fall into despair and added such kind of line 
in source code before return_code = 1:
   ```
   self.log.error(e)
   ```
   and triggered dag from UI. Several seconds after failing dag I saw such kind 
of logs in UI:
   ```
   >2021-03-26 15:15:29,991|ERROR|standard_task_runner.py:88|You cannot use the 
--pickle option when using DAG.cli() method.
   ```
   Hmmm... What a strange exception... And after some seeking I see that line 
of code:
   ```
   if dag and args.pickle:
       raise AirflowException("You cannot use the --pickle option when using 
DAG.cli() method.")
   ```
   
   What does it mean? And this means that there is **logic bug** because 
enriching task_instance with pickle_id leads to this exception and this happens 
in that code lines of method 
**airflow.task.task_runner.standard_task_runner.StandardTaskRunner._start_by_fork**:
   ```
   try:
       args.func(args, dag=self.dag)
       return_code = 0
   ```
   exactly there --pickle flag and instance of dag is used together. I'm all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to