Oliver Ricken created AIRFLOW-7109:
--------------------------------------
Summary: HiveServer2Hook and pyHive do not handle timeout correctly
Key: AIRFLOW-7109
URL: https://issues.apache.org/jira/browse/AIRFLOW-7109
Project: Apache Airflow
Issue Type: Bug
Components: hooks, utils
Affects Versions: 1.10.6
Reporter: Oliver Ricken
Attachments: dag.py, dev_hive_hook.py, dev_hive_operator.py,
modified_HiveServer2Hook_task.log, taskinstance.py, timeout.py,
vanilla_HiverOperator_task.log
Dear experts,
using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a
custom-operator, we observe the improper handling of timeouts configured on
task-level using "execution_timeout".
The picture is always the same:
# The hook is instantiated, the connection opend and the query submitted
# The task-timeout is reached but the query is completed nonetheless
(confirmed by looking into YARN, job is flagged as "finished", i.e. successful)
# Although the query has been successful (hive-wise), no result is obtained
and processed but an obscure codec-error is raised, making the task fail. The
codec-error is based on the actual AirflowTimeoutException not being properly
propagated.
The persistent codec-error (including its stacktrace) is given below.
We investigated the issue further by slightly modifiying the vanilla version of
the HiveServer2Hook and adding some debugging output to various modules.
* We made the "pyhive.hive.connect"-object a class-variable (instead of having
it only exist in the scope of the \_get_results-classmethod (vanilla)
* We added a "kill()"-method that invokes "connection.close()" on said
class-variable and call the method upon execution of "on_kill()" in our
custom-operator.
* We added debugging (i.e. logging) output to the
airflow/models/taskinstance.py "_run_raw_task"-method and the
"airflow/utils/timeout.py" class's "__enter__()" and "__exit__()"-methods.
Our findings, based on the attached log-files (and the modified
core-module-files as well as the modified hook, the custom-operator and the
DAG) is the following:
* In order to invoke the "on_kill()"-method of any operator upon timeout of
the task,
the "timeout.py"-class used to execute the "task_copy" from ("taskinstance.py,
"_run_raw_task") needs to exit with an "AirflowTimeoutException" (and nothing
else!)
* The AirflowTimeoutException is produced but not at the point in runtime at
which the timeout has actually been reached but only at the end of the
pyHive-execution of the query.
* Upward propagation of the AirflowTimeoutException then fails with the
obscure codec-error mentioned earlier and thus no "on_kill()"- and
"kill()"-methods are invoked. Nota bena: this would not be necessary as the
task does not need actual killing anymore as it converged anyhow.
In out opinion this leads to two aspects to be studied, understood and fixed,
potentially in pyHive though.
* The execution of the pyHive query needs to be killed upon timeout
* The propagation of the exception needs be performed properly
NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()"
does work properly if the task is killed manually, e.g. by setting it to
"failed". In this case, "on_kill()" is invoked directly instead of waiting for
the AirflowTimeoutException to pop up.
NB: things work perfectly using the "HiverCLIHook" which, under the hood, uses
beeline executed from a python subprocess-call which is terminated (killed
after 60'') upon invocation of "on_kill" in e.g. the vanilla HiveOperator. The
log attached as "vanilla_HiverOperator_task.log" shows how the behaviour of the
vanilla HiverOpator performing the same query. In principle, similarly flawless
behaviour is expected when using the HiveServer2Hook to submit the query ...
Your input on this matter and any suggestions are highly appreciated. The
"HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of
query-results for postprocessing.
Cheers
{{[2020-03-20 14:45:42,982] \{taskinstance.py:1060} ERROR - 'utf-8' codec can't
decode byte 0xdb in position 0: invalid continuation byte
Traceback (most recent call last):
File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py",
line 33, in _get_results
cur.execute(statement)
File "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py",
line 364, in execute
response = self._connection.client.ExecuteStatement(req)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py",
line 280, in ExecuteStatement
return self.recv_ExecuteStatement()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py",
line 292, in recv_ExecuteStatement
(fname, mtype, rseqid) = iprot.readMessageBegin()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py",
line 134, in readMessageBegin
sz = self.readI32()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py",
line 217, in readI32
buff = self.trans.readAll(4)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py",
line 60, in readAll
chunk = self.read(sz - have)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py",
line 166, in read
self._read_frame()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py",
line 170, in _read_frame
header = self._trans.readAll(4)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py",
line 60, in readAll
chunk = self.read(sz - have)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TSocket.py",
line 117, in read
buff = self.handle.recv(sz)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py",
line 43, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)