[
https://issues.apache.org/jira/browse/AIRFLOW-7109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Oliver Ricken updated AIRFLOW-7109:
-----------------------------------
Description:
Dear experts,
using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a
custom-operator, we observe the improper handling of timeouts configured on
task-level using "execution_timeout".
The picture is always the same:
# The hook is instantiated, the connection opend and the query submitted
# The task-timeout is reached but the query is completed nonetheless
(confirmed by looking into YARN, job is flagged as "finished", i.e. successful)
# Although the query has been successful (hive-wise), no result is obtained
and processed but an obscure codec-error is raised, making the task fail. The
codec-error is based on the actual AirflowTimeoutException not being properly
propagated.
The persistent codec-error (including its stacktrace) is given below.
We investigated the issue further by slightly modifiying the vanilla version of
the HiveServer2Hook and adding some debugging output to various modules.
* We made the "pyhive.hive.connect"-object a class-variable (instead of having
it only exist in the scope of the _get_results-classmethod (vanilla)
* We added a "kill()"-method that invokes "connection.close()" on said
class-variable and call the method upon execution of "on_kill()" in our
custom-operator.
* We added debugging (i.e. logging) output to the
airflow/models/taskinstance.py "_run_raw_task"-method and the
"airflow/utils/timeout.py" class's "__enter__()" and "__exit__()"-methods.
Our findings, based on the attached log-files (and the modified
core-module-files as well as the modified hook, the custom-operator and the
DAG) is the following:
* In order to invoke the "on_kill()"-method of any operator upon timeout of
the task,
the "timeout.py"-class used to execute the "task_copy" from ("taskinstance.py,
"_run_raw_task") needs to exit with an "AirflowTimeoutException" (and nothing
else!)
* The AirflowTimeoutException is produced but not at the point in runtime at
which the timeout has actually been reached but only at the end of the
pyHive-execution of the query.
* Upward propagation of the AirflowTimeoutException then fails with the
obscure codec-error mentioned earlier and thus no "on_kill()"- and
"kill()"-methods are invoked. Nota bena: this would not be necessary as the
task does not need actual killing anymore as it converged anyhow.
In out opinion this leads to two aspects to be studied, understood and fixed,
potentially in pyHive though.
* The execution of the pyHive query needs to be killed upon timeout
* The propagation of the exception needs be performed properly
NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()"
does work properly if the task is killed manually, e.g. by setting it to
"failed". In this case, "on_kill()" is invoked directly instead of waiting for
the AirflowTimeoutException to pop up.
NB: things work perfectly using the "HiverCLIHook" which, under the hood, uses
beeline executed from a python subprocess-call which is terminated (killed
after 60'') upon invocation of "on_kill" in e.g. the vanilla HiveOperator. The
log attached as "vanilla_HiverOperator_task.log" shows how the behaviour of the
vanilla HiverOpator performing the same query. In principle, similarly flawless
behaviour is expected when using the HiveServer2Hook to submit the query ...
Your input on this matter and any suggestions are highly appreciated. The
"HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of
query-results for postprocessing.
Cheers
Oliver
{{[2020-03-20 14:45:42,982] \{taskinstance.py:1060} ERROR - 'utf-8' codec can't
decode byte 0xdb in position 0: invalid continuation byte
Traceback (most recent call last):
File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py", line
33, in _get_results
cur.execute(statement)
File "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py",
line 364, in execute
response = self._connection.client.ExecuteStatement(req)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py",
line 280, in ExecuteStatement
return self.recv_ExecuteStatement()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py",
line 292, in recv_ExecuteStatement
(fname, mtype, rseqid) = iprot.readMessageBegin()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py",
line 134, in readMessageBegin
sz = self.readI32()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py",
line 217, in readI32
buff = self.trans.readAll(4)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py",
line 60, in readAll
chunk = self.read(sz - have)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py",
line 166, in read
self._read_frame()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py",
line 170, in _read_frame
header = self._trans.readAll(4)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py",
line 60, in readAll
chunk = self.read(sz - have)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TSocket.py",
line 117, in read
buff = self.handle.recv(sz)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py",
line 43, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}
was:
Dear experts,
using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a
custom-operator, we observe the improper handling of timeouts configured on
task-level using "execution_timeout".
The picture is always the same:
# The hook is instantiated, the connection opend and the query submitted
# The task-timeout is reached but the query is completed nonetheless
(confirmed by looking into YARN, job is flagged as "finished", i.e. successful)
# Although the query has been successful (hive-wise), no result is obtained
and processed but an obscure codec-error is raised, making the task fail. The
codec-error is based on the actual AirflowTimeoutException not being properly
propagated.
The persistent codec-error (including its stacktrace) is given below.
We investigated the issue further by slightly modifiying the vanilla version of
the HiveServer2Hook and adding some debugging output to various modules.
* We made the "pyhive.hive.connect"-object a class-variable (instead of having
it only exist in the scope of the \_get_results-classmethod (vanilla)
* We added a "kill()"-method that invokes "connection.close()" on said
class-variable and call the method upon execution of "on_kill()" in our
custom-operator.
* We added debugging (i.e. logging) output to the
airflow/models/taskinstance.py "_run_raw_task"-method and the
"airflow/utils/timeout.py" class's "__enter__()" and "__exit__()"-methods.
Our findings, based on the attached log-files (and the modified
core-module-files as well as the modified hook, the custom-operator and the
DAG) is the following:
* In order to invoke the "on_kill()"-method of any operator upon timeout of
the task,
the "timeout.py"-class used to execute the "task_copy" from ("taskinstance.py,
"_run_raw_task") needs to exit with an "AirflowTimeoutException" (and nothing
else!)
* The AirflowTimeoutException is produced but not at the point in runtime at
which the timeout has actually been reached but only at the end of the
pyHive-execution of the query.
* Upward propagation of the AirflowTimeoutException then fails with the
obscure codec-error mentioned earlier and thus no "on_kill()"- and
"kill()"-methods are invoked. Nota bena: this would not be necessary as the
task does not need actual killing anymore as it converged anyhow.
In out opinion this leads to two aspects to be studied, understood and fixed,
potentially in pyHive though.
* The execution of the pyHive query needs to be killed upon timeout
* The propagation of the exception needs be performed properly
NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()"
does work properly if the task is killed manually, e.g. by setting it to
"failed". In this case, "on_kill()" is invoked directly instead of waiting for
the AirflowTimeoutException to pop up.
NB: things work perfectly using the "HiverCLIHook" which, under the hood, uses
beeline executed from a python subprocess-call which is terminated (killed
after 60'') upon invocation of "on_kill" in e.g. the vanilla HiveOperator. The
log attached as "vanilla_HiverOperator_task.log" shows how the behaviour of the
vanilla HiverOpator performing the same query. In principle, similarly flawless
behaviour is expected when using the HiveServer2Hook to submit the query ...
Your input on this matter and any suggestions are highly appreciated. The
"HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of
query-results for postprocessing.
Cheers
{{[2020-03-20 14:45:42,982] \{taskinstance.py:1060} ERROR - 'utf-8' codec can't
decode byte 0xdb in position 0: invalid continuation byte
Traceback (most recent call last):
File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py",
line 33, in _get_results
cur.execute(statement)
File "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py",
line 364, in execute
response = self._connection.client.ExecuteStatement(req)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py",
line 280, in ExecuteStatement
return self.recv_ExecuteStatement()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py",
line 292, in recv_ExecuteStatement
(fname, mtype, rseqid) = iprot.readMessageBegin()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py",
line 134, in readMessageBegin
sz = self.readI32()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py",
line 217, in readI32
buff = self.trans.readAll(4)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py",
line 60, in readAll
chunk = self.read(sz - have)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py",
line 166, in read
self._read_frame()
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py",
line 170, in _read_frame
header = self._trans.readAll(4)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py",
line 60, in readAll
chunk = self.read(sz - have)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TSocket.py",
line 117, in read
buff = self.handle.recv(sz)
File
"/home/airflow/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py",
line 43, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}
Environment: CentOS 7, python 3.6.7, Airflow 1.10.6, HDP 3.1.4
> HiveServer2Hook and pyHive do not handle timeout correctly
> ----------------------------------------------------------
>
> Key: AIRFLOW-7109
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7109
> Project: Apache Airflow
> Issue Type: Bug
> Components: hooks, utils
> Affects Versions: 1.10.6
> Environment: CentOS 7, python 3.6.7, Airflow 1.10.6, HDP 3.1.4
> Reporter: Oliver Ricken
> Priority: Major
> Attachments: dag.py, dev_hive_hook.py, dev_hive_operator.py,
> modified_HiveServer2Hook_task.log, taskinstance.py, timeout.py,
> vanilla_HiverOperator_task.log
>
>
> Dear experts,
> using the 1.10.6 vanilla "HiveServer2Hook" from airflow/hooks/hive_hooks in a
> custom-operator, we observe the improper handling of timeouts configured on
> task-level using "execution_timeout".
> The picture is always the same:
> # The hook is instantiated, the connection opend and the query submitted
> # The task-timeout is reached but the query is completed nonetheless
> (confirmed by looking into YARN, job is flagged as "finished", i.e.
> successful)
> # Although the query has been successful (hive-wise), no result is obtained
> and processed but an obscure codec-error is raised, making the task fail. The
> codec-error is based on the actual AirflowTimeoutException not being properly
> propagated.
> The persistent codec-error (including its stacktrace) is given below.
> We investigated the issue further by slightly modifiying the vanilla version
> of the HiveServer2Hook and adding some debugging output to various modules.
> * We made the "pyhive.hive.connect"-object a class-variable (instead of
> having it only exist in the scope of the _get_results-classmethod (vanilla)
> * We added a "kill()"-method that invokes "connection.close()" on said
> class-variable and call the method upon execution of "on_kill()" in our
> custom-operator.
> * We added debugging (i.e. logging) output to the
> airflow/models/taskinstance.py "_run_raw_task"-method and the
> "airflow/utils/timeout.py" class's "__enter__()" and "__exit__()"-methods.
> Our findings, based on the attached log-files (and the modified
> core-module-files as well as the modified hook, the custom-operator and the
> DAG) is the following:
> * In order to invoke the "on_kill()"-method of any operator upon timeout of
> the task,
> the "timeout.py"-class used to execute the "task_copy" from
> ("taskinstance.py, "_run_raw_task") needs to exit with an
> "AirflowTimeoutException" (and nothing else!)
> * The AirflowTimeoutException is produced but not at the point in runtime at
> which the timeout has actually been reached but only at the end of the
> pyHive-execution of the query.
> * Upward propagation of the AirflowTimeoutException then fails with the
> obscure codec-error mentioned earlier and thus no "on_kill()"- and
> "kill()"-methods are invoked. Nota bena: this would not be necessary as the
> task does not need actual killing anymore as it converged anyhow.
> In out opinion this leads to two aspects to be studied, understood and fixed,
> potentially in pyHive though.
> * The execution of the pyHive query needs to be killed upon timeout
> * The propagation of the exception needs be performed properly
> NB: closing the pyhive.hive.connection-object upon invocation of "on_kill()"
> does work properly if the task is killed manually, e.g. by setting it to
> "failed". In this case, "on_kill()" is invoked directly instead of waiting
> for the AirflowTimeoutException to pop up.
> NB: things work perfectly using the "HiverCLIHook" which, under the hood,
> uses beeline executed from a python subprocess-call which is terminated
> (killed after 60'') upon invocation of "on_kill" in e.g. the vanilla
> HiveOperator. The log attached as "vanilla_HiverOperator_task.log" shows how
> the behaviour of the vanilla HiverOpator performing the same query. In
> principle, similarly flawless behaviour is expected when using the
> HiveServer2Hook to submit the query ...
>
> Your input on this matter and any suggestions are highly appreciated. The
> "HiveServer2Hook" is, in our opinion, an irreplacable tool when in need of
> query-results for postprocessing.
>
> Cheers
> Oliver
> {{[2020-03-20 14:45:42,982] \{taskinstance.py:1060} ERROR - 'utf-8' codec
> can't decode byte 0xdb in position 0: invalid continuation byte
> Traceback (most recent call last):
> File "/home/airflow/dags/src/vipa_import/plugins/hooks/dev_hive_hook.py",
> line 33, in _get_results
> cur.execute(statement)
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/pyhive/hive.py", line
> 364, in execute
> response = self._connection.client.ExecuteStatement(req)
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py",
> line 280, in ExecuteStatement
> return self.recv_ExecuteStatement()
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/TCLIService/TCLIService.py",
> line 292, in recv_ExecuteStatement
> (fname, mtype, rseqid) = iprot.readMessageBegin()
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py",
> line 134, in readMessageBegin
> sz = self.readI32()
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/protocol/TBinaryProtocol.py",
> line 217, in readI32
> buff = self.trans.readAll(4)
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py",
> line 60, in readAll
> chunk = self.read(sz - have)
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py",
> line 166, in read
> self._read_frame()
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift_sasl/__init__.py",
> line 170, in _read_frame
> header = self._trans.readAll(4)
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TTransport.py",
> line 60, in readAll
> chunk = self.read(sz - have)
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/thrift/transport/TSocket.py",
> line 117, in read
> buff = self.handle.recv(sz)
> File
> "/home/airflow/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py",
> line 43, in handle_timeout
> raise AirflowTaskTimeout(self.error_message)
> airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 17089}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)