[
https://issues.apache.org/jira/browse/AIRFLOW-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17209458#comment-17209458
]
Giovanni Paolo Gibilisco commented on AIRFLOW-2385:
---------------------------------------------------
Happened again on 1.10.9.
Timeout reached, correctly logged but the task is still running. Shouldn't the
expected behavior be to fail the task and proceed to the next one? In my
scenario, I don't need to perform special cleanup handling while handling the
on_kill call. Do we still need to specify it even if we don't need any custom
cleanup?
Relevnt DAG line:
{code:java}
task_test = ExecutorOperator(retries=0,
execution_timeout=timedelta(minutes=45),
on_failure_callback=on_failure_callback )
{code}
We are using a custom operator from Base which does *not* re-define on_kill
Last portion of logs:
{code:java}
[2020-10-06 22:09:03,182] {{log_stream.py:10}} INFO - summary = 17312 in
00:31:21 = 9.2/s Avg: 2683 Min: 48 Max: 413821 Err: 11 (0.06%)
[2020-10-06 22:22:37,240] {{logging_mixin.py:112}} INFO - [2020-10-06
22:22:37,240] {{timeout.py:42}} ERROR - Process timed out, PID: 18195
{code}
> Airflow task is not stopped when execution timeout gets triggered
> -----------------------------------------------------------------
>
> Key: AIRFLOW-2385
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2385
> Project: Apache Airflow
> Issue Type: Bug
> Components: DAG
> Affects Versions: 1.9.0
> Reporter: Yohei Shimomae
> Priority: Major
>
> I have my own custom operator extends BaseOperator as follows. I tried to
> kill a task if the task runs for more than 30 minutes. timeout seems to be
> triggered according to a log but the task still continued.
> Am I missing something? I checked the official document but do not know what
> is wrong.[https://airflow.apache.org/code.html#baseoperator]
> My operator is like as follows.
> {code:java}
> class MyOperator(BaseOperator):
> @apply_defaults
> def __init__(
> self,
> some_parameters_here,
> *args,
> **kwargs):
> super(MyOperator, self).__init__(*args, **kwargs)
> # some initialization here
> def execute(self, context):
> # some code here
> {code}
>
> {{}}My task is like as follows.
> {code:java}
> t = MyOperator(
> task_id='task',
> dag=scheduled_dag,
> execution_timeout=timedelta(minutes=30)
> {code}
>
> I found this error but the task continued.
> {code:java}
> [2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage
> 6:==================================================(1380 + -160) /
> 1224][2018-04- 12 03:30:28,353] {timeout.py:36} ERROR - Process timed out
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)