IKholopov opened a new issue, #36802:
URL: https://github.com/apache/airflow/issues/36802

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==10.12.0
   
   ### Apache Airflow version
   
   2.6.3
   
   ### Operating System
   
   Ubuntu 20.04.6
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   N/A
   
   ### What happened
   
   In a DAG with ~500  GkeStartPodOperator tasks (running pods on another 
cluster, hosted on GKE) we discovered that operator execution hangs polling 
logs in ~0.2% of the task instances. Based on logs, the execution halts in the 
call inside kubernetes client (`read_namespaced_pod_log` to be exact). 
   
   Only after the DAG run timeout (hours later), when SIGTERM is dispatched to 
the `task run` process, execution resumes, attempts to retry to fetch logs and 
pod status, but those have already been garbage collected.
   
   This looks exactly like 
https://github.com/kubernetes-client/python/issues/1234#issuecomment-695801558. 
After running the same deployment in the deferred mode, 1 task also ended up 
being locked up in a similar way, this time for another call (for creation):
   ```
   Traceback (most recent call last):
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
 line 310, in run_pod_async
       resp = self._client.create_namespaced_pod(
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py",
 line 7356, in create_namespaced_pod
       return self.create_namespaced_pod_with_http_info(namespace, body, 
**kwargs)  # noqa: E501
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py",
 line 7455, in create_namespaced_pod_with_http_info
       return self.api_client.call_api(
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", 
line 348, in call_api
       return self.__call_api(resource_path, method,
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", 
line 180, in __call_api
       response_data = self.request(
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", 
line 391, in request
       return [self.rest_client.POST](http://self.rest_client.post/)(url,
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 
275, in POST
       return self.request("POST", url,
     File 
"/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 
168, in request
       r = self.pool_manager.request(
     File "/opt/python3.8/lib/python3.8/site-packages/urllib3/request.py", line 
81, in request
       return self.request_encode_body(
     File "/opt/python3.8/lib/python3.8/site-packages/urllib3/request.py", line 
173, in request_encode_body
       return self.urlopen(method, url, **extra_kw)
     File "/opt/python3.8/lib/python3.8/site-packages/urllib3/poolmanager.py", 
line 376, in urlopen
       response = conn.urlopen(method, u.request_uri, **kw)
     File 
"/opt/python3.8/lib/python3.8/site-packages/urllib3/connectionpool.py", line 
715, in urlopen
       httplib_response = self._make_request(
     File 
"/opt/python3.8/lib/python3.8/site-packages/urllib3/connectionpool.py", line 
467, in _make_request
       six.raise_from(e, None)
     File "<string>", line 3, in raise_from
     File 
"/opt/python3.8/lib/python3.8/site-packages/urllib3/connectionpool.py", line 
462, in _make_request
       httplib_response = conn.getresponse()
     File "/opt/python3.8/lib/python3.8/http/client.py", line 1348, in 
getresponse
       response.begin()
     File "/opt/python3.8/lib/python3.8/http/client.py", line 316, in begin
       version, status, reason = self._read_status()
     File "/opt/python3.8/lib/python3.8/http/client.py", line 277, in 
_read_status
       line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
     File "/opt/python3.8/lib/python3.8/socket.py", line 669, in readinto
       return self._sock.recv_into(b)
     File "/opt/python3.8/lib/python3.8/ssl.py", line 1241, in recv_into
       return self.read(nbytes, buffer)
     File "/opt/python3.8/lib/python3.8/ssl.py", line 1099, in read
       return self._sslobj.read(len, buffer)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", 
line 1521, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   ```
   
   I believe this is specific to GkeStartPodOperator, as KubernetesHook does 
have the mechanism ensuring TCP keep alive to be configured by default: 
https://github.com/apache/airflow/blob/1d5d5022b8fc92f23f9fdc3b61269e5c7acfaf39/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L216,
 but GKEPodHook does not: 
https://github.com/apache/airflow/blob/1d5d5022b8fc92f23f9fdc3b61269e5c7acfaf39/airflow/providers/google/cloud/hooks/kubernetes_engine.py#L390
   
   ### What you think should happen instead
   
   GKEPodHook should reuse the same socket configuration, that is used in 
KubernetesHook and configure TCP Keep-alive.
   
   ### How to reproduce
   
   Run ~500 tasks on GKE with spot VMs. There is no reliable repro, but the 
problem has been clearly documented before and fixed for CNCF-k8s provider: 
https://github.com/apache/airflow/pull/11406.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to