I found the error in /var/log/upstart/airflow-worker.log

[2016-06-24 21:13:44,616] {__init__.py:36} INFO - Using executor
CeleryExecutor
[2016-06-24 21:13:44,990] {driver.py:120} INFO - Generating grammar tables
from /usr/lib/python2.7/lib2to3/Grammar.txt
[2016-06-24 21:13:45,091] {driver.py:120} INFO - Generating grammar tables
from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
Traceback (most recent call last):
  File "/home/airflow/venv/bin/airflow", line 15, in <module>
    args.func(args)
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py",
line 211, in run
    DagPickle).filter(DagPickle.id == args.pickle).first()
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2659, in first
    ret = list(self[0:1])
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2457, in __getitem__
    return list(res)
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 86, in instances
    util.raise_from_cause(err)
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 71, in instances
    rows = [proc(row) for row in fetch]
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 428, in _instance
    loaded_instance, populate_existing, populators)
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
line 486, in _populate_full
    dict_[key] = getter(row)
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py",
line 1253, in process
    return loads(value)
  File "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 260, in loads
    return load(file)
  File "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 250, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py",
line 406, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named vertica_operator
[2016-06-24 21:13:46,434: ERROR/Worker-8] Command 'airflow run
etl_gsn_daily_kpi_email t_validate_gsncom_events_direct_payment_recieved
2016-01-02T12:00:00 --pickle 20 --local ' returned non-zero exit status 1
[2016-06-24 21:13:46,456: ERROR/MainProcess] Task
airflow.executors.celery_executor.execute_command[09607f3b-90e1-4840-a9ca-5c7ed979c6d9]
raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py",
line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py",
line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File
"/home/airflow/venv/local/lib/python2.7/site-packages/airflow/executors/celery_executor.py",
line 45, in execute_command
    raise AirflowException('Celery command failed')
AirflowException: Celery command failed

This led me to these issues and PR about explicit imports
https://issues.apache.org/jira/browse/AIRFLOW-200
https://issues.apache.org/jira/browse/AIRFLOW-31
https://github.com/apache/incubator-airflow/pull/1586

So I changed my imports to be explicit and backfill jobs work now.

Imports like this resulted in failures in the celery task:
from airflow.contrib.operators import VerticaOperator, VerticaHook

This style works:
from airflow.contrib.hooks.vertica_hook import VerticaHook
from airflow.contrib.operators.vertica_operator import VerticaOperator


I think my problem is already covered in those tickets, but if you think it
is separate and deserves an issue, I'll file one.

thanks,
Dennis




On Thu, Jun 23, 2016 at 6:17 PM Dennis O'Brien <den...@dennisobrien.net>
wrote:

> I just cleared the history for this DAG (using CLI clear) then attempted a
> backfill.  I'm consistently getting a deadlock error.  And I suspect this
> is really from Celery failing to execute the task.
>
> $ airflow backfill --ignore_first_depends_on_past --start_date
> 2016-01-01T12:00:00 --end_date 2016-01-02T12:00:00 etl_gsn_daily_kpi_email
> ...
> BackfillJob is deadlocked.Some of the deadlocked tasks were unable to run
> because of "depends_on_past" relationships. Try running the backfill with
> the option "ignore_first_depends_on_past=True" or passing "-I" at the
> command line. These tasks were unable to run:
> ...
>
> The DAG has depends_on_past: True, wait_for_downstream: True, start_date:
> datetime(2016, 1, 1, 12).
>
> For each task in the DAG I see an output message that it was added to the
> queue, that Celery is queuing it, and that it failed.  For example, for one
> of the tasks, there are these three messages:
>
> [2016-06-24 00:56:14,613] {base_executor.py:36} INFO - Adding to queue:
> airflow run etl_gsn_daily_kpi_email t_validate_gsncom_events_dau
> 2016-01-01T12:00:00 --pickle 18 --local
> ...
> [2016-06-24 00:56:19,471] {celery_executor.py:64} INFO - [celery] queuing
> ('etl_gsn_daily_kpi_email', 't_validate_gsnmobile_events_dau',
> datetime.datetime(2016, 1, 1, 12, 0)) through celery, queue=default
> ...
> [2016-06-24 00:56:29,480] {jobs.py:924} ERROR - Task instance
> ('etl_gsn_daily_kpi_email', 't_validate_gsnmobile_events_dau',
> datetime.datetime(2016, 1, 1, 12, 0)) failed
>
>
> It seems like I'm in a state where the DAG run through the scheduler
> succeeds, but fails when run from the CLI or web interface.
>
> Any ideas for debugging this will be much appreciated.
>
> thanks,
> Dennis
>
>
>
> On Wed, Jun 15, 2016 at 1:44 PM Dennis O'Brien <den...@dennisobrien.net>
> wrote:
>
>> Hi,
>>
>> I'm occasionally trying to force the final task in a DAG by clicking on
>> the task in the web UI and clicking 'Run' (with 'Force' and 'Ignore
>> Dependencies' selected).  The UI shows 'Sent <TaskInstance: ...> to the
>> message queue, it should start any moment now'.  Going back to that task
>> and selecting 'Log' I see something like this:
>>
>> [2016-06-15 20:11:44,172] {models.py:154} INFO - Filling up the DagBag from 
>> /home/airflow/workspace/verticadw/airflow/dags/etl_gsn_daily_kpi_email.py
>> [2016-06-15 20:11:44,881] {base_hook.py:53} INFO - Using connection to: 
>> localhost
>>
>>
>> No sign of an error but no sign of success.  But if I look at the Celery
>> Flower dashboard, I can see this resulted in a failure with result:
>> AirflowException('Celery command failed',)
>>
>> If I 'Clear'|'Downstream' the task, the task succeeds.  And the task run
>> in the scheduled DAG succeeds.
>>
>> Example args for failed tasks:
>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email
>> 2016-06-14T10:00:00 --force --local -sd
>> DAGS_FOLDER/etl_gsn_daily_kpi_email.py ']
>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email
>> 2016-06-14T10:00:00 -i --force --local -sd
>> DAGS_FOLDER/etl_gsn_daily_kpi_email.py ']
>>
>> Example args for successful tasks:
>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email
>> 2016-06-14T10:00:00 --local -sd DAGS_FOLDER/etl_gsn_daily_kpi_email.py ']
>>
>> and the log:
>>
>> [2016-06-15 19:44:46,147] {models.py:1219} INFO - Executing 
>> <Task(PythonOperator): t_final_send_email> on 2016-06-14 10:00:00
>> [2016-06-15 19:44:51,979] {python_operator.py:67} INFO - Done. Returned 
>> value was: [Airflow] GSN Games Metrics 2016-06-14 sent to 
>> ('dobr...@gsngames.com',)
>>
>>
>> I looked in the logs (airflow-flower.log, airflow-scheduler.log,
>> airflow-webserver.log, and airflow-worker.log) but didn't find any clues.
>>
>> I'm running airflow v1.7.1.3, using CeleryExecutor backed by redis.
>>
>> Celery portion of airflow.cfg:
>> [celery]
>> celery_app_name = airflow.executors.celery_executor
>> celeryd_concurrency = 16
>> worker_log_server_port = 8793
>> broker_url = redis://localhost
>> celery_result_backend = redis://localhost
>> flower_port = 5555
>> default_queue = default
>>
>>
>> Any advice on debugging this?
>>
>> thanks,
>> Dennis
>>
>>

Reply via email to