Hi again I spoke too soon when I said the problem was solved. I think the more general problem I am having is with Celery executor. I still see about 90% of the tasks sent to celery result in failure. I see the failures in Flower, though the Airflow UI doesn't show any of this.
I ran `airflow resetdb` just to be sure, added my db connection via the admin UI, then activated the DAG in the UI. Looking at the Tree View I see some DAG runs starting to populate, and the squares are turning green with success. Flower tells a different story, with most tasks having a failure state. There are some tasks that failed several times -- AirflowException('Celery command failed',) -- then eventually succeeded. I will report back when the scheduler is done. thanks again, Dennis On Fri, Jun 24, 2016 at 3:06 PM Dennis O'Brien <den...@dennisobrien.net> wrote: > I found the error in /var/log/upstart/airflow-worker.log > > [2016-06-24 21:13:44,616] {__init__.py:36} INFO - Using executor > CeleryExecutor > [2016-06-24 21:13:44,990] {driver.py:120} INFO - Generating grammar tables > from /usr/lib/python2.7/lib2to3/Grammar.txt > [2016-06-24 21:13:45,091] {driver.py:120} INFO - Generating grammar tables > from /usr/lib/python2.7/lib2to3/PatternGrammar.txt > Traceback (most recent call last): > File "/home/airflow/venv/bin/airflow", line 15, in <module> > args.func(args) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", > line 211, in run > DagPickle).filter(DagPickle.id == args.pickle).first() > File > "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2659, in first > ret = list(self[0:1]) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2457, in __getitem__ > return list(res) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", > line 86, in instances > util.raise_from_cause(err) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", > line 202, in raise_from_cause > reraise(type(exception), exception, tb=exc_tb, cause=cause) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", > line 71, in instances > rows = [proc(row) for row in fetch] > File > "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", > line 428, in _instance > loaded_instance, populate_existing, populators) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", > line 486, in _populate_full > dict_[key] = getter(row) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", > line 1253, in process > return loads(value) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py", line > 260, in loads > return load(file) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py", line > 250, in load > obj = pik.load() > File "/usr/lib/python2.7/pickle.py", line 858, in load > dispatch[key](self) > File "/usr/lib/python2.7/pickle.py", line 1090, in load_global > klass = self.find_class(module, name) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py", line > 406, in find_class > return StockUnpickler.find_class(self, module, name) > File "/usr/lib/python2.7/pickle.py", line 1124, in find_class > __import__(module) > ImportError: No module named vertica_operator > [2016-06-24 21:13:46,434: ERROR/Worker-8] Command 'airflow run > etl_gsn_daily_kpi_email t_validate_gsncom_events_direct_payment_recieved > 2016-01-02T12:00:00 --pickle 20 --local ' returned non-zero exit status 1 > [2016-06-24 21:13:46,456: ERROR/MainProcess] Task > airflow.executors.celery_executor.execute_command[09607f3b-90e1-4840-a9ca-5c7ed979c6d9] > raised unexpected: AirflowException('Celery command failed',) > Traceback (most recent call last): > File > "/home/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py", > line 240, in trace_task > R = retval = fun(*args, **kwargs) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py", > line 438, in __protected_call__ > return self.run(*args, **kwargs) > File > "/home/airflow/venv/local/lib/python2.7/site-packages/airflow/executors/celery_executor.py", > line 45, in execute_command > raise AirflowException('Celery command failed') > AirflowException: Celery command failed > > This led me to these issues and PR about explicit imports > https://issues.apache.org/jira/browse/AIRFLOW-200 > https://issues.apache.org/jira/browse/AIRFLOW-31 > https://github.com/apache/incubator-airflow/pull/1586 > > So I changed my imports to be explicit and backfill jobs work now. > > Imports like this resulted in failures in the celery task: > from airflow.contrib.operators import VerticaOperator, VerticaHook > > This style works: > from airflow.contrib.hooks.vertica_hook import VerticaHook > from airflow.contrib.operators.vertica_operator import VerticaOperator > > > I think my problem is already covered in those tickets, but if you think > it is separate and deserves an issue, I'll file one. > > thanks, > Dennis > > > > > On Thu, Jun 23, 2016 at 6:17 PM Dennis O'Brien <den...@dennisobrien.net> > wrote: > >> I just cleared the history for this DAG (using CLI clear) then attempted >> a backfill. I'm consistently getting a deadlock error. And I suspect this >> is really from Celery failing to execute the task. >> >> $ airflow backfill --ignore_first_depends_on_past --start_date >> 2016-01-01T12:00:00 --end_date 2016-01-02T12:00:00 etl_gsn_daily_kpi_email >> ... >> BackfillJob is deadlocked.Some of the deadlocked tasks were unable to run >> because of "depends_on_past" relationships. Try running the backfill with >> the option "ignore_first_depends_on_past=True" or passing "-I" at the >> command line. These tasks were unable to run: >> ... >> >> The DAG has depends_on_past: True, wait_for_downstream: True, start_date: >> datetime(2016, 1, 1, 12). >> >> For each task in the DAG I see an output message that it was added to the >> queue, that Celery is queuing it, and that it failed. For example, for one >> of the tasks, there are these three messages: >> >> [2016-06-24 00:56:14,613] {base_executor.py:36} INFO - Adding to queue: >> airflow run etl_gsn_daily_kpi_email t_validate_gsncom_events_dau >> 2016-01-01T12:00:00 --pickle 18 --local >> ... >> [2016-06-24 00:56:19,471] {celery_executor.py:64} INFO - [celery] queuing >> ('etl_gsn_daily_kpi_email', 't_validate_gsnmobile_events_dau', >> datetime.datetime(2016, 1, 1, 12, 0)) through celery, queue=default >> ... >> [2016-06-24 00:56:29,480] {jobs.py:924} ERROR - Task instance >> ('etl_gsn_daily_kpi_email', 't_validate_gsnmobile_events_dau', >> datetime.datetime(2016, 1, 1, 12, 0)) failed >> >> >> It seems like I'm in a state where the DAG run through the scheduler >> succeeds, but fails when run from the CLI or web interface. >> >> Any ideas for debugging this will be much appreciated. >> >> thanks, >> Dennis >> >> >> >> On Wed, Jun 15, 2016 at 1:44 PM Dennis O'Brien <den...@dennisobrien.net> >> wrote: >> >>> Hi, >>> >>> I'm occasionally trying to force the final task in a DAG by clicking on >>> the task in the web UI and clicking 'Run' (with 'Force' and 'Ignore >>> Dependencies' selected). The UI shows 'Sent <TaskInstance: ...> to the >>> message queue, it should start any moment now'. Going back to that task >>> and selecting 'Log' I see something like this: >>> >>> [2016-06-15 20:11:44,172] {models.py:154} INFO - Filling up the DagBag from >>> /home/airflow/workspace/verticadw/airflow/dags/etl_gsn_daily_kpi_email.py >>> [2016-06-15 20:11:44,881] {base_hook.py:53} INFO - Using connection to: >>> localhost >>> >>> >>> No sign of an error but no sign of success. But if I look at the Celery >>> Flower dashboard, I can see this resulted in a failure with result: >>> AirflowException('Celery command failed',) >>> >>> If I 'Clear'|'Downstream' the task, the task succeeds. And the task run >>> in the scheduled DAG succeeds. >>> >>> Example args for failed tasks: >>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email >>> 2016-06-14T10:00:00 --force --local -sd >>> DAGS_FOLDER/etl_gsn_daily_kpi_email.py '] >>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email >>> 2016-06-14T10:00:00 -i --force --local -sd >>> DAGS_FOLDER/etl_gsn_daily_kpi_email.py '] >>> >>> Example args for successful tasks: >>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email >>> 2016-06-14T10:00:00 --local -sd DAGS_FOLDER/etl_gsn_daily_kpi_email.py '] >>> >>> and the log: >>> >>> [2016-06-15 19:44:46,147] {models.py:1219} INFO - Executing >>> <Task(PythonOperator): t_final_send_email> on 2016-06-14 10:00:00 >>> [2016-06-15 19:44:51,979] {python_operator.py:67} INFO - Done. Returned >>> value was: [Airflow] GSN Games Metrics 2016-06-14 sent to >>> ('dobr...@gsngames.com',) >>> >>> >>> I looked in the logs (airflow-flower.log, airflow-scheduler.log, >>> airflow-webserver.log, and airflow-worker.log) but didn't find any clues. >>> >>> I'm running airflow v1.7.1.3, using CeleryExecutor backed by redis. >>> >>> Celery portion of airflow.cfg: >>> [celery] >>> celery_app_name = airflow.executors.celery_executor >>> celeryd_concurrency = 16 >>> worker_log_server_port = 8793 >>> broker_url = redis://localhost >>> celery_result_backend = redis://localhost >>> flower_port = 5555 >>> default_queue = default >>> >>> >>> Any advice on debugging this? >>> >>> thanks, >>> Dennis >>> >>>