[
https://issues.apache.org/jira/browse/AIRFLOW-4786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
PP resolved AIRFLOW-4786.
-------------------------
Resolution: Fixed
> Task execution wails when Celery is used.
> -----------------------------------------
>
> Key: AIRFLOW-4786
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4786
> Project: Apache Airflow
> Issue Type: Bug
> Components: celery
> Affects Versions: 1.10.3
> Reporter: PP
> Priority: Minor
>
> I'm using airflow 1.10.3 with LocalExecutor and everything is working
> properly. I want to switch to CeleryExecutor but tasks are failing (or are
> not executed at all).
> I've tried in two separate clusters (2 machines: airflow + worker). Both with
> similar configuration (redis for broker and mysql for backend). In both
> cluster similar exception appears (below). In the first one tasks have status
> null and exception appears in scheduler while in second cluster tasks are
> started by worker but are failing with the same exception (on worker side):
>
> {code:java}
> Traceback (most recent call last):
> File "/usr/local/bin/airflow", line 32, in <module>
> args.func(args)
> File "/usr/local/lib/python2.7/dist-packages/airflow/utils/cli.py", line
> 74, in wrapper
> return f(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 498,
> in run
> _run(args, dag, ti)
> File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 397,
> in _run
> run_job.run()
> File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 202, in
> run
> self._execute()
> File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 2598,
> in _execute
> pool=self.pool):
> File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 74,
> in wrapper
> return func(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1557,
> in _check_and_change_state_before_execution
> session.commit()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 927, in commit
> self.transaction.commit()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 467, in commit
> self._prepare_impl()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 447, in _prepare_impl
> self.session.flush()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 2209, in flush
> self._flush(objects)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 2329, in _flush
> transaction.rollback(_capture_exception=True)
> File
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line
> 66, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py",
> line 2293, in _flush
> flush_context.execute()
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py",
> line 389, in execute
> rec.execute(self)
> File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py",
> line 548, in execute
> uow
> File
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line
> 177, in save_obj
> mapper, table, update)
> File
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line
> 760, in _emit_update_statements
> (table.description, len(records), rows))
> sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance'
> expected to update 1 row(s); 0 were matched.
> [2019-06-12 16:41:31,167: ERROR/ForkPoolWorker-4] execute_command encountered
> a CalledProcessError
> Traceback (most recent call last):
> File
> "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py",
> line 60, in execute_command
> close_fds=True, env=env)
> File "/usr/lib/python2.7/subprocess.py", line 186, in check_call
> raise CalledProcessError(retcode, cmd)
> CalledProcessError: Command 'airflow run hello_world dummy_task
> 2019-06-12T15:05:00+00:00 --local -sd /data/airflow/dags/hello_world.py'
> returned non-zero exit status 1
> [2019-06-12 16:41:31,168: ERROR/ForkPoolWorker-4] None
> [2019-06-12 16:41:31,172: ERROR/ForkPoolWorker-4] Task
> airflow.executors.celery_executor.execute_command[dc2d3451-1d86-4097-8ded-6fd1aacd1de1]
> raised unexpected: AirflowException('Celery command failed',)
> Traceback (most recent call last):
> File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line
> 375, in trace_task
> R = retval = fun(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line
> 632, in __protected_call__
> return self.run(*args, **kwargs)
> File
> "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py",
> line 65, in execute_command
> raise AirflowException('Celery command failed')
> AirflowException: Celery command failed{code}
>
> I've tested different configuration variations with different celery and
> airflow versions. I was able to make it work only on airflow 1.8.2.
> There is either a bug in airflow 1.10+ or some missing / new configuration
> that I'm not aware.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)