Hi again

I spoke too soon when I said the problem was solved.  I think the more
general problem I am having is with Celery executor.  I still see about 90%
of the tasks sent to celery result in failure.  I see the failures in
Flower, though the Airflow UI doesn't show any of this.

I ran `airflow resetdb` just to be sure, added my db connection via the
admin UI, then activated the DAG in the UI.  Looking at the Tree View I see
some DAG runs starting to populate, and the squares are turning green with
success.  Flower tells a different story, with most tasks having a failure
state.  There are some tasks that failed several times --
AirflowException('Celery command failed',) -- then eventually succeeded.

I will report back when the scheduler is done.

thanks again,
Dennis


On Fri, Jun 24, 2016 at 3:06 PM Dennis O'Brien <den...@dennisobrien.net>
wrote:

> I found the error in /var/log/upstart/airflow-worker.log
>
> [2016-06-24 21:13:44,616] {__init__.py:36} INFO - Using executor
> CeleryExecutor
> [2016-06-24 21:13:44,990] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/Grammar.txt
> [2016-06-24 21:13:45,091] {driver.py:120} INFO - Generating grammar tables
> from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> Traceback (most recent call last):
>   File "/home/airflow/venv/bin/airflow", line 15, in <module>
>     args.func(args)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py",
> line 211, in run
>     DagPickle).filter(DagPickle.id == args.pickle).first()
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
> line 2659, in first
>     ret = list(self[0:1])
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
> line 2457, in __getitem__
>     return list(res)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
> line 86, in instances
>     util.raise_from_cause(err)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
> line 202, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
> line 71, in instances
>     rows = [proc(row) for row in fetch]
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
> line 428, in _instance
>     loaded_instance, populate_existing, populators)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py",
> line 486, in _populate_full
>     dict_[key] = getter(row)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py",
> line 1253, in process
>     return loads(value)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py", line
> 260, in loads
>     return load(file)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py", line
> 250, in load
>     obj = pik.load()
>   File "/usr/lib/python2.7/pickle.py", line 858, in load
>     dispatch[key](self)
>   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
>     klass = self.find_class(module, name)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/dill/dill.py", line
> 406, in find_class
>     return StockUnpickler.find_class(self, module, name)
>   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
>     __import__(module)
> ImportError: No module named vertica_operator
> [2016-06-24 21:13:46,434: ERROR/Worker-8] Command 'airflow run
> etl_gsn_daily_kpi_email t_validate_gsncom_events_direct_payment_recieved
> 2016-01-02T12:00:00 --pickle 20 --local ' returned non-zero exit status 1
> [2016-06-24 21:13:46,456: ERROR/MainProcess] Task
> airflow.executors.celery_executor.execute_command[09607f3b-90e1-4840-a9ca-5c7ed979c6d9]
> raised unexpected: AirflowException('Celery command failed',)
> Traceback (most recent call last):
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py",
> line 240, in trace_task
>     R = retval = fun(*args, **kwargs)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/celery/app/trace.py",
> line 438, in __protected_call__
>     return self.run(*args, **kwargs)
>   File
> "/home/airflow/venv/local/lib/python2.7/site-packages/airflow/executors/celery_executor.py",
> line 45, in execute_command
>     raise AirflowException('Celery command failed')
> AirflowException: Celery command failed
>
> This led me to these issues and PR about explicit imports
> https://issues.apache.org/jira/browse/AIRFLOW-200
> https://issues.apache.org/jira/browse/AIRFLOW-31
> https://github.com/apache/incubator-airflow/pull/1586
>
> So I changed my imports to be explicit and backfill jobs work now.
>
> Imports like this resulted in failures in the celery task:
> from airflow.contrib.operators import VerticaOperator, VerticaHook
>
> This style works:
> from airflow.contrib.hooks.vertica_hook import VerticaHook
> from airflow.contrib.operators.vertica_operator import VerticaOperator
>
>
> I think my problem is already covered in those tickets, but if you think
> it is separate and deserves an issue, I'll file one.
>
> thanks,
> Dennis
>
>
>
>
> On Thu, Jun 23, 2016 at 6:17 PM Dennis O'Brien <den...@dennisobrien.net>
> wrote:
>
>> I just cleared the history for this DAG (using CLI clear) then attempted
>> a backfill.  I'm consistently getting a deadlock error.  And I suspect this
>> is really from Celery failing to execute the task.
>>
>> $ airflow backfill --ignore_first_depends_on_past --start_date
>> 2016-01-01T12:00:00 --end_date 2016-01-02T12:00:00 etl_gsn_daily_kpi_email
>> ...
>> BackfillJob is deadlocked.Some of the deadlocked tasks were unable to run
>> because of "depends_on_past" relationships. Try running the backfill with
>> the option "ignore_first_depends_on_past=True" or passing "-I" at the
>> command line. These tasks were unable to run:
>> ...
>>
>> The DAG has depends_on_past: True, wait_for_downstream: True, start_date:
>> datetime(2016, 1, 1, 12).
>>
>> For each task in the DAG I see an output message that it was added to the
>> queue, that Celery is queuing it, and that it failed.  For example, for one
>> of the tasks, there are these three messages:
>>
>> [2016-06-24 00:56:14,613] {base_executor.py:36} INFO - Adding to queue:
>> airflow run etl_gsn_daily_kpi_email t_validate_gsncom_events_dau
>> 2016-01-01T12:00:00 --pickle 18 --local
>> ...
>> [2016-06-24 00:56:19,471] {celery_executor.py:64} INFO - [celery] queuing
>> ('etl_gsn_daily_kpi_email', 't_validate_gsnmobile_events_dau',
>> datetime.datetime(2016, 1, 1, 12, 0)) through celery, queue=default
>> ...
>> [2016-06-24 00:56:29,480] {jobs.py:924} ERROR - Task instance
>> ('etl_gsn_daily_kpi_email', 't_validate_gsnmobile_events_dau',
>> datetime.datetime(2016, 1, 1, 12, 0)) failed
>>
>>
>> It seems like I'm in a state where the DAG run through the scheduler
>> succeeds, but fails when run from the CLI or web interface.
>>
>> Any ideas for debugging this will be much appreciated.
>>
>> thanks,
>> Dennis
>>
>>
>>
>> On Wed, Jun 15, 2016 at 1:44 PM Dennis O'Brien <den...@dennisobrien.net>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm occasionally trying to force the final task in a DAG by clicking on
>>> the task in the web UI and clicking 'Run' (with 'Force' and 'Ignore
>>> Dependencies' selected).  The UI shows 'Sent <TaskInstance: ...> to the
>>> message queue, it should start any moment now'.  Going back to that task
>>> and selecting 'Log' I see something like this:
>>>
>>> [2016-06-15 20:11:44,172] {models.py:154} INFO - Filling up the DagBag from 
>>> /home/airflow/workspace/verticadw/airflow/dags/etl_gsn_daily_kpi_email.py
>>> [2016-06-15 20:11:44,881] {base_hook.py:53} INFO - Using connection to: 
>>> localhost
>>>
>>>
>>> No sign of an error but no sign of success.  But if I look at the Celery
>>> Flower dashboard, I can see this resulted in a failure with result:
>>> AirflowException('Celery command failed',)
>>>
>>> If I 'Clear'|'Downstream' the task, the task succeeds.  And the task run
>>> in the scheduled DAG succeeds.
>>>
>>> Example args for failed tasks:
>>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email
>>> 2016-06-14T10:00:00 --force --local -sd
>>> DAGS_FOLDER/etl_gsn_daily_kpi_email.py ']
>>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email
>>> 2016-06-14T10:00:00 -i --force --local -sd
>>> DAGS_FOLDER/etl_gsn_daily_kpi_email.py ']
>>>
>>> Example args for successful tasks:
>>> [u'airflow run etl_gsn_daily_kpi_email t_final_send_email
>>> 2016-06-14T10:00:00 --local -sd DAGS_FOLDER/etl_gsn_daily_kpi_email.py ']
>>>
>>> and the log:
>>>
>>> [2016-06-15 19:44:46,147] {models.py:1219} INFO - Executing 
>>> <Task(PythonOperator): t_final_send_email> on 2016-06-14 10:00:00
>>> [2016-06-15 19:44:51,979] {python_operator.py:67} INFO - Done. Returned 
>>> value was: [Airflow] GSN Games Metrics 2016-06-14 sent to 
>>> ('dobr...@gsngames.com',)
>>>
>>>
>>> I looked in the logs (airflow-flower.log, airflow-scheduler.log,
>>> airflow-webserver.log, and airflow-worker.log) but didn't find any clues.
>>>
>>> I'm running airflow v1.7.1.3, using CeleryExecutor backed by redis.
>>>
>>> Celery portion of airflow.cfg:
>>> [celery]
>>> celery_app_name = airflow.executors.celery_executor
>>> celeryd_concurrency = 16
>>> worker_log_server_port = 8793
>>> broker_url = redis://localhost
>>> celery_result_backend = redis://localhost
>>> flower_port = 5555
>>> default_queue = default
>>>
>>>
>>> Any advice on debugging this?
>>>
>>> thanks,
>>> Dennis
>>>
>>>

Reply via email to