Piotr Pekala created AIRFLOW-4786:
-------------------------------------

             Summary: Task execution wails when Celery is used.
                 Key: AIRFLOW-4786
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4786
             Project: Apache Airflow
          Issue Type: Bug
          Components: celery
    Affects Versions: 1.10.3
            Reporter: Piotr Pekala


I'm using airflow 1.10.3 with LocalExecutor and everything is working properly. 
I want to switch to CeleryExecutor but tasks are failing (or are not executed 
at all).

I've tried in two separate clusters (2 machines: airflow + worker). Both with 
similar configuration (redis for broker and mysql for backend). In both cluster 
similar exception appears (below). In the first one tasks have status null and 
exception appears in scheduler while in second cluster tasks are started by 
worker but are failing with the same exception (on worker side):

 
{code:java}
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 32, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/cli.py", line 74, 
in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 498, 
in run
    _run(args, dag, ti)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 397, 
in _run
    run_job.run()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 202, in 
run
    self._execute()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 2598, in 
_execute
    pool=self.pool):
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 74, 
in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1557, 
in _check_and_change_state_before_execution
    session.commit()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
927, in commit
    self.transaction.commit()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
467, in commit
    self._prepare_impl()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
447, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
2209, in flush
    self._flush(objects)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
2329, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", 
line 66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 
2293, in _flush
    flush_context.execute()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
line 389, in execute
    rec.execute(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
line 548, in execute
    uow
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", 
line 177, in save_obj
    mapper, table, update)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", 
line 760, in _emit_update_statements
    (table.description, len(records), rows))
sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' 
expected to update 1 row(s); 0 were matched.
[2019-06-12 16:41:31,167: ERROR/ForkPoolWorker-4] execute_command encountered a 
CalledProcessError
Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", 
line 60, in execute_command
    close_fds=True, env=env)
  File "/usr/lib/python2.7/subprocess.py", line 186, in check_call
    raise CalledProcessError(retcode, cmd)
CalledProcessError: Command 'airflow run hello_world dummy_task 
2019-06-12T15:05:00+00:00 --local -sd /data/airflow/dags/hello_world.py' 
returned non-zero exit status 1
[2019-06-12 16:41:31,168: ERROR/ForkPoolWorker-4] None
[2019-06-12 16:41:31,172: ERROR/ForkPoolWorker-4] Task 
airflow.executors.celery_executor.execute_command[dc2d3451-1d86-4097-8ded-6fd1aacd1de1]
 raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 375, 
in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 632, 
in __protected_call__
    return self.run(*args, **kwargs)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", 
line 65, in execute_command
    raise AirflowException('Celery command failed')
AirflowException: Celery command failed{code}
 

I've tested different configuration variations with different celery and 
airflow versions. I was able to make it work only on airflow 1.8.2.

There is either a bug in airflow 1.10+ or some missing / new configuration that 
I'm not aware.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to