george-zubrienko opened a new issue #14422:
URL: https://github.com/apache/airflow/issues/14422


   **Apache Airflow version**: 2.0.1
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): 1.16.x
   
   **Environment**: KubernetesExecutor with single scheduler pod
   
   **What happened**: On all previous versions we used (from 1.10.x to 2.0.0), 
evicting or deleting a running task pod triggered the `on_failure_callback` 
from `BaseOperator`. We use this functionality quite a lot to detect eviction 
and provide work carry-over and automatic task clear.
   
   We've recently updated our dev environment to 2.0.1 and it seems that now 
`on_failure_callback` is only fired when pod completes naturally, i.e. not 
evicted / deleted with kubectl
   
   Everything looks the same on task log level when pod is removed with 
`kubectl delete pod...`:
   
   ```
   Received SIGTERM. Terminating subprocesses
   Sending Signals.SIGTERM to GPID 16
   Received SIGTERM. Terminating subprocesses.
   Task received SIGTERM signal
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1112, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1285, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1315, in _execute_task
       result = task_copy.execute(context=context)
     File 
"/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/li_operator.py",
 line 357, in execute
       self.operator_task_code(context)
     File 
"/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/yarn_jar_operator.py",
 line 62, in operator_task_code
       ssh_connection=_ssh_con
     File 
"/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/li_mapreduce_cluster_operator.py",
 line 469, in watch_application
       existing_apps=_associated_applications.keys()
     File 
"/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/li_mapreduce_cluster_operator.py",
 line 376, in get_associated_application_info
       logger=self.log
     File 
"/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/yarn_api/yarn_api_ssh_client.py",
 line 26, in send_request
       _response = requests.get(request)
     File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 76, in 
get
       return request('get', url, params=params, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in 
request
       return session.request(method=method, url=url, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 
542, in request
       resp = self.send(prep, **send_kwargs)
     File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 
655, in send
       r = adapter.send(request, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 
449, in send
       timeout=timeout
     File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", 
line 677, in urlopen
       chunked=chunked,
     File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", 
line 392, in _make_request
       conn.request(method, url, **httplib_request_kw)
     File "/usr/local/lib/python3.7/http/client.py", line 1277, in request
       self._send_request(method, url, body, headers, encode_chunked)
     File "/usr/local/lib/python3.7/http/client.py", line 1323, in _send_request
       self.endheaders(body, encode_chunked=encode_chunked)
     File "/usr/local/lib/python3.7/http/client.py", line 1272, in endheaders
       self._send_output(message_body, encode_chunked=encode_chunked)
     File "/usr/local/lib/python3.7/http/client.py", line 1032, in _send_output
       self.send(msg)
     File "/usr/local/lib/python3.7/http/client.py", line 972, in send
       self.connect()
     File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 
187, in connect
       conn = self._new_conn()
     File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 
160, in _new_conn
       (self._dns_host, self.port), self.timeout, **extra_kw
     File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", 
line 74, in create_connection
       sock.connect(sa)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1241, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   Marking task as FAILED. dag_id=mock_dag_limr, 
task_id=SetupMockScaldingDWHJob, execution_date=20190910T000000, 
start_date=20210224T162811, end_date=20210224T163044
   Process psutil.Process(pid=16, status='terminated', exitcode=1, 
started='16:28:10') (16) terminated with exit code 1
   ```
   
   But `on_failure_callback` is not triggered. For simplicity, let's assume the 
callback does this:
   
   ```
   def act_on_failure(context):
       send_slack_message(
           message=f"{context['task_instance_key_str']} fired failure callback",
           channel=get_stored_variable('slack_log_channel')
       )
   
   def get_stored_variable(variable_name, deserialize=False):
       try:
           return Variable.get(variable_name, deserialize_json=deserialize)
       except KeyError:
           if os.getenv('PYTEST_CURRENT_TEST'):
               _root_dir = str(Path(__file__).parent)
               _vars_path = os.path.join(_root_dir, "vars.json")
               _vars_json = json.loads(open(_vars_path, 'r').read())
               if deserialize:
                   return _vars_json.get(variable_name, {})
               else:
                   return _vars_json.get(variable_name, "")
           else:
               raise
   
   def send_slack_message(message, channel):
       _web_hook_url = get_stored_variable('slack_web_hook')
       post = {
           "text": message,
           "channel": channel
       }
   
       try:
           json_data = json.dumps(post)
           req = request.Request(
               _web_hook_url,
               data=json_data.encode('ascii'),
               headers={'Content-Type': 'application/json'}
           )
   
           request.urlopen(req)
       except request.HTTPError as em:
           print('Failed to send slack messsage to the hook {hook}: {msg}, 
request: {req}'.format(
               hook=_web_hook_url,
               msg=str(em),
               req=str(post)
           ))
   ```
   
   Scheduler logs related to this event:
   
   ```
   21-02-24 16:33:04,968] {kubernetes_executor.py:147} INFO - Event: 
mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 had an event 
of type MODIFIED
   [2021-02-24 16:33:04,968] {kubernetes_executor.py:202} INFO - Event: 
mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 Pending
   [2021-02-24 16:33:04,979] {kubernetes_executor.py:147} INFO - Event: 
mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 had an event 
of type DELETED
   [2021-02-24 16:33:04,979] {kubernetes_executor.py:197} INFO - Event: Failed 
to start pod mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3, 
will reschedule
   [2021-02-24 16:33:05,406] {kubernetes_executor.py:354} INFO - Attempting to 
finish pod; pod_id: 
mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3; state: 
up_for_reschedule; annotations: {'dag_id': 'mock_dag_limr', 'task_id': 
'SetupMockSparkDwhJob', 'execution_date': '2019-09-10T00:00:00+00:00', 
'try_number': '9'}
   [2021-02-24 16:33:05,419] {kubernetes_executor.py:528} INFO - Changing state 
of (TaskInstanceKey(dag_id='mock_dag_limr', task_id='SetupMockSparkDwhJob', 
execution_date=datetime.datetime(2019, 9, 10, 0, 0, tzinfo=tzlocal()), 
try_number=9), 'up_for_reschedule', 
'mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3', 'airflow', 
'173647183') to up_for_reschedule
   [2021-02-24 16:33:05,422] {scheduler_job.py:1206} INFO - Executor reports 
execution of mock_dag_limr.SetupMockSparkDwhJob execution_date=2019-09-10 
00:00:00+00:00 exited with status up_for_reschedule for try_number 9
   ```
   
   However task stays in failed state (not what scheduler says)
   
   When pod completes on its own (fails, exits with 0), callbacks are triggered 
correctly
   
   **What you expected to happen**: `on_failure_callback` is called regardless 
of how pod exists, including SIGTERM-based interruptions: pod eviction, pod 
deletion
   
   <!-- What do you think went wrong? --> Not sure really. We believe this code 
is executed since we get full stack trace
   
https://github.com/apache/airflow/blob/2.0.1/airflow/models/taskinstance.py#L1149
   
   But then it is unclear why `finally` clause here does not run:
   
https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1422
   
   **How to reproduce it**:
   
   With Airflow 2.0.1 running KubernetesExecutor, execute `kubectl delete ...` 
on any running task pod. Task operator should define `on_failure_callback`. In 
order to check that it is/not called, send data from it to any external logging 
system
   
   **Anything else we need to know**:
   Problem is persistent and only exists in 2.0.1 version
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to