[ 
https://issues.apache.org/jira/browse/AURORA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204905#comment-15204905
 ] 

Amol S Deshmukh commented on AURORA-1642:
-----------------------------------------

I considered the following 2 options and picked the latter, in the review 
request at https://reviews.apache.org/r/45115/, for reasons mentioned below:

# Change the loop condition in {{PipedSubprocessExecutor.wait}} to:
{noformat}
    while rc is None:
{noformat}
This will ensure that {{wait()}} returns when the subprocess executes 
regardless of whether the pipes are still open (if they were in use by say a 
child-of-child process. However, because of the way the nested loop {{for fd in 
read_results:}} is implemented, this would also mean that some output intended 
for stdout/stderr handlers is lost if it does not all fit in the buffer of size 
{{READ_BUFFER_SIZE}}.
# Delegate {{PipedSuprocessExecutor.wait()}} to {{self._popen.wait()}}. This 
allows us to pass in the stdout and stderr handles to {{subprocess.Popen(..)}}.
However, this approach rules out the cases that need a {{TeeHandler}} and 
{{RotatingFileHandler}} - the former needs to write the output to 2 file 
handles, while the latter changes the file handles when rotating the output 
files.

A side-concern with the implementation of polling in the wait() method is that, 
for processes that generate copious amounts of output, there is also the 
potential for the process to stay blocked waiting on IO. This is because the 
current implementation only drains READ_BUFFER_SIZE per iteration of the 
polling (@ 1/s).

> Thermos runner finalization is broken
> -------------------------------------
>
>                 Key: AURORA-1642
>                 URL: https://issues.apache.org/jira/browse/AURORA-1642
>             Project: Aurora
>          Issue Type: Bug
>          Components: Executor
>            Reporter: Maxim Khutornenko
>            Assignee: Amol S Deshmukh
>
> We have noticed thermos runner finalization no longer works after this commit 
> [024bac9dcb8f37e4b31210e3a0a7aea2345a16ab|https://reviews.apache.org/r/40922/]
>  for tasks with blocking threads. 
> I was able to reproduce it in Vagrant by extending the sleep timeout of the 
> {{hello}} task and running {{aurora job killall}} immediately after launching 
> it:
> {noformat}
> while true; do
>       echo hello world
>       sleep 600
> {noformat}
> The finalizer never has a chance to run and after 1 minute a task is 
> forcefully aborted:
> {noformat}
> D0316 04:00:35.237905 19362 runner.py:951] Runner issued kill: force:False, 
> preemption_wait:1 mins
> D0316 04:00:35.238183 19362 runner.py:567] Flipping recovery mode off.
> D0316 04:00:35.238308 19362 ckpt.py:348] Flipping task state from ACTIVE to 
> ACTIVE
> D0316 04:00:35.238437 19362 runner.py:242] _on_task_transition: 
> TaskStatus(state=0, runner_uid=0, runner_pid=19362, 
> timestamp_ms=1458100835238)
> D0316 04:00:35.239079 19362 runner.py:180] Task on_active(TaskStatus(state=0, 
> runner_uid=0, runner_pid=19362, timestamp_ms=1458100835238))
> D0316 04:00:35.241660 19362 ckpt.py:348] Flipping task state from ACTIVE to 
> CLEANING
> D0316 04:00:35.241765 19362 runner.py:242] _on_task_transition: 
> TaskStatus(state=5, runner_uid=0, runner_pid=19362, 
> timestamp_ms=1458100835241)
> D0316 04:00:35.249836 19362 runner.py:188] Task 
> on_cleaning(TaskStatus(state=5, runner_uid=0, runner_pid=19362, 
> timestamp_ms=1458100835241))
> D0316 04:00:35.249953 19362 helper.py:217] 
> TaskRunnerHelper.terminate_process(hello)
> D0316 04:00:35.256520 19362 helper.py:220]    => SIGTERM pid 19368
> D0316 04:00:35.256705 19362 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 59.9812531471
> D0316 04:00:35.262578 19362 runner.py:929] Run loop: Work to be done within 
> 1.0s
> D0316 04:00:36.263881 19362 runner.py:939] Run loop: No updates collected, 
> touching checkpoint.
> D0316 04:00:36.264199 19362 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 58.9737620354
> D0316 04:00:36.264734 19362 runner.py:929] Run loop: Work to be done within 
> 1.0s
> ---<skipped>---
> D0316 04:01:31.397888 19362 runner.py:939] Run loop: No updates collected, 
> touching checkpoint.
> D0316 04:01:31.398144 19362 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 3.83981513977
> D0316 04:01:31.398538 19362 runner.py:929] Run loop: Work to be done within 
> 1.0s
> D0316 04:01:32.400230 19362 runner.py:939] Run loop: No updates collected, 
> touching checkpoint.
> D0316 04:01:32.401125 19362 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 2.8368370533
> D0316 04:01:32.401596 19362 runner.py:929] Run loop: Work to be done within 
> 1.0s
> D0316 04:01:33.404506 19362 runner.py:939] Run loop: No updates collected, 
> touching checkpoint.
> D0316 04:01:33.404815 19362 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 1.83315014839
> D0316 04:01:33.405534 19362 runner.py:929] Run loop: Work to be done within 
> 1.0s
> D0316 04:01:34.406909 19362 runner.py:939] Run loop: No updates collected, 
> touching checkpoint.
> D0316 04:01:34.407223 19362 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 0.830743074417
> D0316 04:01:34.407908 19362 runner.py:929] Run loop: Work to be done within 
> 0.8s
> D0316 04:01:35.415529 19362 runner.py:939] Run loop: No updates collected, 
> touching checkpoint.
> D0316 04:01:35.415683 19362 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 0
> D0316 04:01:35.415740 19362 runner.py:926] Run loop: No more work to be done 
> in state CLEANING
> D0316 04:01:35.415888 19362 runner.py:903] Forced terminal state: KILLED
> D0316 04:01:35.415936 19362 ckpt.py:348] Flipping task state from CLEANING to 
> KILLED
> D0316 04:01:35.415980 19362 runner.py:242] _on_task_transition: 
> TaskStatus(state=3, runner_uid=0, runner_pid=19362, 
> timestamp_ms=1458100895415)
> D0316 04:01:35.416937 19362 runner.py:201] Task on_killed(TaskStatus(state=3, 
> runner_uid=0, runner_pid=19362, timestamp_ms=1458100895415))
> D0316 04:01:35.417393 19362 runner.py:684] _set_process_status(hello <= 
> KILLED, seq=3[auto])
> D0316 04:01:35.417458 19362 ckpt.py:379] Running state machine for 
> process=hello/seq=3
> D0316 04:01:35.417460 19362 runner.py:238] _on_process_transition: 
> ProcessStatus(seq=3, process=u'hello', start_time=None, coordinator_pid=None, 
> pid=None, return_code=-1, state=4, stop_time=1458100895.417381, 
> fork_time=None)
> D0316 04:01:35.417853 19362 runner.py:156] Process on_killed 
> ProcessStatus(seq=3, process=u'hello', start_time=None, coordinator_pid=None, 
> pid=None, return_code=-1, state=4, stop_time=1458100895.417381, 
> fork_time=None)
> D0316 04:01:35.417921 19362 helper.py:226] 
> TaskRunnerHelper.kill_process(hello)
> D0316 04:01:35.418145 19362 helper.py:234]    => SIGKILL coordinator group 
> 19367
> D0316 04:01:35.418934 19362 helper.py:237]    => SIGKILL coordinator 19367
> D0316 04:01:35.419006 19362 muxer.py:94] unregistering hello
> D0316 04:01:35.419069 19362 runner.py:160] Process killed, marking it as a 
> loss.
> {noformat}
> After reverting the above commit, the task is killed almost instantaneously 
> with the finalizer exiting properly:
> {noformat}
> D0316 04:03:42.339101 20053 runner.py:939] Runner issued kill: force:False, 
> preemption_wait:1 mins
> D0316 04:03:42.339370 20053 runner.py:564] Flipping recovery mode off.
> D0316 04:03:42.339503 20053 ckpt.py:348] Flipping task state from ACTIVE to 
> ACTIVE
> D0316 04:03:42.339571 20053 runner.py:242] _on_task_transition: 
> TaskStatus(state=0, runner_uid=0, runner_pid=20053, 
> timestamp_ms=1458101022339)
> D0316 04:03:42.340327 20053 runner.py:180] Task on_active(TaskStatus(state=0, 
> runner_uid=0, runner_pid=20053, timestamp_ms=1458101022339))
> D0316 04:03:42.343360 20053 ckpt.py:348] Flipping task state from ACTIVE to 
> CLEANING
> D0316 04:03:42.343463 20053 runner.py:242] _on_task_transition: 
> TaskStatus(state=5, runner_uid=0, runner_pid=20053, 
> timestamp_ms=1458101022343)
> D0316 04:03:42.344290 20053 runner.py:188] Task 
> on_cleaning(TaskStatus(state=5, runner_uid=0, runner_pid=20053, 
> timestamp_ms=1458101022343))
> D0316 04:03:42.344388 20053 helper.py:217] 
> TaskRunnerHelper.terminate_process(hello)
> D0316 04:03:42.349311 20053 helper.py:220]    => SIGTERM pid 20059
> D0316 04:03:42.349566 20058 process.py:132] [process:20058=hello]: child 
> state transition 
> [/var/lib/mesos/slaves/cf74b250-4134-4227-8f86-b48a67e15b89-S0/frameworks/cf74b250-4134-4227-8f86-b48a67e15b89-0000/executors/thermos-www-data-prod-hello-0-acb78a97-a551-4e16-ba7e-2c488717556d/runs/0310abe1-3a24-4308-9c5e-6df7714ca2f2/checkpoints/checkpoints/www-data-prod-hello-0-acb78a97-a551-4e16-ba7e-2c488717556d/coordinator.hello]
>  <= RunnerCkpt(task_status=None, process_status=ProcessStatus(seq=3, 
> process=u'hello', start_time=None, coordinator_pid=None, pid=None, 
> return_code=-15, state=4, stop_time=1458101022.34949, fork_time=None), 
> runner_header=None)
> D0316 04:03:42.349777 20053 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 59.9893760681
> D0316 04:03:42.350320 20058 process.py:132] [process:20058=hello]: 
> Coordinator exiting.
> D0316 04:03:42.360085 20053 runner.py:917] Run loop: Work to be done within 
> 1.0s
> D0316 04:03:42.360292 20053 recordio.py:137] 
> /var/lib/mesos/slaves/cf74b250-4134-4227-8f86-b48a67e15b89-S0/frameworks/cf74b250-4134-4227-8f86-b48a67e15b89-0000/executors/thermos-www-data-prod-hello-0-acb78a97-a551-4e16-ba7e-2c488717556d/runs/0310abe1-3a24-4308-9c5e-6df7714ca2f2/checkpoints/checkpoints/www-data-prod-hello-0-acb78a97-a551-4e16-ba7e-2c488717556d/coordinator.hello
>  has no data (current offset = 171)
> D0316 04:03:42.360383 20053 muxer.py:155] select() returning 1 updates:
> D0316 04:03:42.360440 20053 muxer.py:157]   = RunnerCkpt(task_status=None, 
> process_status=ProcessStatus(seq=3, process='hello', start_time=None, 
> coordinator_pid=None, pid=None, return_code=-15, state=4, 
> stop_time=1458101022.34949, fork_time=None), runner_header=None)
> D0316 04:03:42.360486 20053 ckpt.py:379] Running state machine for 
> process=hello/seq=3
> D0316 04:03:42.360543 20053 runner.py:238] _on_process_transition: 
> ProcessStatus(seq=3, process='hello', start_time=None, coordinator_pid=None, 
> pid=None, return_code=-15, state=4, stop_time=1458101022.34949, 
> fork_time=None)
> D0316 04:03:42.361098 20053 runner.py:156] Process on_killed 
> ProcessStatus(seq=3, process='hello', start_time=None, coordinator_pid=None, 
> pid=None, return_code=-15, state=4, stop_time=1458101022.34949, 
> fork_time=None)
> D0316 04:03:42.361164 20053 helper.py:226] 
> TaskRunnerHelper.kill_process(hello)
> D0316 04:03:42.361391 20053 helper.py:234]    => SIGKILL coordinator group 
> 20058
> D0316 04:03:42.361438 20053 helper.py:237]    => SIGKILL coordinator 20058
> D0316 04:03:42.361488 20053 muxer.py:94] unregistering hello
> D0316 04:03:42.361536 20053 runner.py:160] Process killed, marking it as a 
> loss.
> D0316 04:03:42.361555 20053 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 59.9775869846
> D0316 04:03:42.361555 20053 runner.py:917] Run loop: Work to be done within 
> 1.0s
> D0316 04:03:42.361555 20053 runner.py:925] Update collection only took 0.0s, 
> idling 1.0s
> D0316 04:03:43.361848 20053 runner.py:927] Run loop: No updates collected, 
> touching checkpoint.
> D0316 04:03:43.362040 20053 helper.py:358] Detected terminated process: 
> pid=20058, status=9, rusage=resource.struct_rusage(ru_utime=0.00684, 
> ru_stime=0.008775999999999999, ru_maxrss=16288, ru_ixrss=0, ru_idrss=0, 
> ru_isrss=0, ru_minflt=4105, ru_majflt=0, ru_nswap=0, ru_inblock=0, 
> ru_oublock=48, ru_msgsnd=0, ru_msgrcv=0, ru_nsignals=0, ru_nvcsw=9, 
> ru_nivcsw=14)
> D0316 04:03:43.362119 20053 runner.py:327] TaskRunnerStage[CLEANING]: 
> Finalization remaining: 58.9770309925
> D0316 04:03:43.362361 20053 runner.py:914] Run loop: No more work to be done 
> in state CLEANING
> D0316 04:03:43.362416 20053 ckpt.py:348] Flipping task state from CLEANING to 
> FINALIZING
> D0316 04:03:43.362462 20053 runner.py:242] _on_task_transition: 
> TaskStatus(state=6, runner_uid=0, runner_pid=20053, 
> timestamp_ms=1458101023362)
> D0316 04:03:43.363035 20053 runner.py:193] Task 
> on_finalizing(TaskStatus(state=6, runner_uid=0, runner_pid=20053, 
> timestamp_ms=1458101023362))
> D0316 04:03:43.363317 20053 helper.py:226] 
> TaskRunnerHelper.kill_process(hello)
> D0316 04:03:43.363497 20053 helper.py:234]    => SIGKILL coordinator group 
> 20058
> D0316 04:03:43.363550 20053 runner.py:801] Schedule pass:
> D0316 04:03:43.363619 20053 runner.py:804] running: 
> D0316 04:03:43.363662 20053 runner.py:805] finished: 
> D0316 04:03:43.363723 20053 runner.py:815] runnable: 
> D0316 04:03:43.363761 20053 runner.py:817] waiting: 
> D0316 04:03:43.363961 20053 runner.py:348] TaskRunnerStage[FINALIZING]: 
> Finalization remaining: 58.9751880169
> D0316 04:03:43.364037 20053 runner.py:753] running:0 runnable:0 waiting:0 
> complete:True
> D0316 04:03:43.364084 20053 runner.py:914] Run loop: No more work to be done 
> in state FINALIZING
> D0316 04:03:43.364084 20053 runner.py:891] Forced terminal state: KILLED
> D0316 04:03:43.364084 20053 ckpt.py:348] Flipping task state from FINALIZING 
> to KILLED
> D0316 04:03:43.364084 20053 runner.py:242] _on_task_transition: 
> TaskStatus(state=3, runner_uid=0, runner_pid=20053, 
> timestamp_ms=1458101023364)
> D0316 04:03:43.364553 20053 runner.py:201] Task on_killed(TaskStatus(state=3, 
> runner_uid=0, runner_pid=20053, timestamp_ms=1458101023364))
> D0316 04:03:43.364784 20053 helper.py:226] 
> TaskRunnerHelper.kill_process(hello)
> D0316 04:03:43.364965 20053 helper.py:234]    => SIGKILL coordinator group 
> 20058 
> {noformat}
> Root cause still TBD.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to