Oliver Ricken created AIRFLOW-7109:

             Summary: HiveServer2Hook and pyHive do not handle timeout correctly
                 Key: AIRFLOW-7109
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-7109
             Project: Apache Airflow
          Issue Type: Bug
          Components: hooks, utils
    Affects Versions: 1.10.6
            Reporter: Oliver Ricken
         Attachments: dag.py, dev_hive_hook.py, dev_hive_operator.py, 
modified_HiveServer2Hook_task.log, taskinstance.py, timeout.py, 

Dear experts,

using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a 
custom-operator, we observe the improper handling of timeouts configured on 
task-level using "execution_timeout".

The picture is always the same:
 # The hook is instantiated, the connection opend and the query submitted
 # The task-timeout is reached but the query is completed nonetheless 
(confirmed by looking into YARN, job is flagged as "finished", i.e. successful)
 # Although the query has been successful (hive-wise), no result is obtained 
and processed but an obscure codec-error is raised, making the task fail. The 
codec-error is based on the actual AirflowTimeoutException not being properly 

The persistent codec-error (including its stacktrace) is given below.

We investigated the issue further by slightly modifiying the vanilla version of 
the HiveServer2Hook and adding some debugging output to various modules.
 * We made the "pyhive.hive.connect"-object a class-variable (instead of having 
it only exist in the scope of the \_get_results-classmethod (vanilla)
 * We added a "kill()"-method that invokes "connection.close()" on said 
class-variable and call the method upon execution of "on_kill()" in our 
 * We added debugging (i.e. logging) output to the 
airflow/models/taskinstance.py "_run_raw_task"-method and the 
"airflow/utils/timeout.py" class's "__enter__()" and "__exit__()"-methods.

Our findings, based on the attached log-files (and the modified 
core-module-files as well as the modified hook, the custom-operator and the 
DAG) is the following:
 * In order to invoke the "on_kill()"-method of any operator upon timeout of 
the task, 
the "timeout.py"-class used to execute the "task_copy" from ("taskinstance.py, 
"_run_raw_task")  needs to exit with an "AirflowTimeoutException" (and nothing 
 * The AirflowTimeoutException is produced but not at the point in runtime at 
which the timeout has actually been reached but only at the end of the 
pyHive-execution of the query.
 * Upward propagation of the AirflowTimeoutException then fails with the 
obscure codec-error mentioned earlier and thus no "on_kill()"- and 
"kill()"-methods are invoked. Nota bena: this would not be necessary as the 
task does not need actual killing anymore as it converged anyhow.

In out opinion this leads to two aspects to be studied, understood and fixed, 
potentially in pyHive though.
 * The execution of the pyHive query needs to be killed upon timeout
 * The propagation of the exception needs be performed properly

NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()" 
does work properly if the task is killed manually, e.g. by setting it to 
"failed". In this case, "on_kill()" is invoked directly instead of waiting for 
the AirflowTimeoutException to pop up.

NB: things work perfectly using the "HiverCLIHook" which, under the hood, uses 
beeline executed from a python subprocess-call which is terminated (killed 
after 60'') upon invocation of "on_kill" in e.g. the vanilla HiveOperator. The 
log attached as "vanilla_HiverOperator_task.log" shows how the behaviour of the 
vanilla HiverOpator performing the same query. In principle, similarly flawless 
behaviour is expected when using the HiveServer2Hook to submit the query ...


Your input on this matter and any suggestions are highly appreciated. The 
"HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of 
query-results for postprocessing.




{{[2020-03-20 14:45:42,982] \{taskinstance.py:1060} ERROR - 'utf-8' codec can't 
decode byte 0xdb in position 0: invalid continuation byte
Traceback (most recent call last):
  File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py", 
line 33, in _get_results
  File "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py", 
line 364, in execute
    response = self._connection.client.ExecuteStatement(req)
 line 280, in ExecuteStatement
    return self.recv_ExecuteStatement()
 line 292, in recv_ExecuteStatement
    (fname, mtype, rseqid) = iprot.readMessageBegin()
 line 134, in readMessageBegin
    sz = self.readI32()
 line 217, in readI32
    buff = self.trans.readAll(4)
 line 60, in readAll
    chunk = self.read(sz - have)
 line 166, in read
 line 170, in _read_frame
    header = self._trans.readAll(4)
 line 60, in readAll
    chunk = self.read(sz - have)
 line 117, in read
    buff = self.handle.recv(sz)
 line 43, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}

This message was sent by Atlassian Jira

Reply via email to