[
https://issues.apache.org/jira/browse/AIRFLOW-5447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930761#comment-16930761
]
Daniel Imberman commented on AIRFLOW-5447:
------------------------------------------
Ok so I've broken down the currently running threads in hopes this helps us out
Thread 1: attempting to put a new task in the task_queue
{code:java}
Thread 0x7f0c13c7d700
File "/usr/local/airflow/.local/bin/airflow", line 32, in <module>
args.func(args)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py",
line 74, in wrapper
return f(*args, **kwargs)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/bin/cli.py",
line 1013, in scheduler
job.run() File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 213, in run
self._execute()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1350, in _execute
self._execute_helper()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1439, in _execute_helper
self.executor.heartbeat()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 132, in heartbeat
self.trigger_tasks(open_slots)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 156, in trigger_tasks
executor_config=simple_ti.executor_config)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 767, in execute_async
self.task_queue.put((key, command, kube_executor_config))
File "<string>", line 2, in put
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 819, in
_callmethod
kind, result = conn.recv()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 250, in
recv
buf = self._recv_bytes()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 407, in
_recv_bytes
buf = self._recv(4)
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 379, in
_recv
chunk = read(handle, remaining)
File "<string>", line 1, in <module>
File "<string>", line 5, in <module>
{code}
Thread 2: re-reading plugins files
{code:java}
Thread 0x7f0c01c31700
File "/usr/local/lib/python3.7/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)Thread 0x7f0bff430700 File
"/usr/local/lib/python3.7/multiprocessing/managers.py", line 201, in
handle_request
result = func(c, *args, **kwds)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 422, in
accept_connection
self.serve_client(c)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 234, in
serve_client
request = recv()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 251, in
recv
return _ForkingPickler.loads(buf.getbuffer())
File "<frozen importlib._bootstrap>", line 202, in _lock_unlock_module
File "<frozen importlib._bootstrap>", line 98, in acquire
File "/usr/local/lib/python3.7/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 178, in
accepter
c = self.listener.accept()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 453, in
accept
c = self._listener.accept()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 598, in
accept
s, self._last_accepted = self._socket.accept()
File "/usr/local/lib/python3.7/socket.py", line 212, in accept
fd, addr = self._accept()
File "/usr/local/airflow/.local/bin/airflow", line 21, in <module>
from airflow import configuration
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/__init__.py",
line 94, in <module>
operators._integrate_plugins()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/__init__.py",
line 104, in _integrate_plugins
from airflow.plugins_manager import operators_modules
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
{code}
Thread 3: thread manager server
{code:java}
Thread 0x7f0c13c7d700 File "<frozen importlib._bootstrap_external>", line 728,
in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/plugins_manager.py",
line 142, in <module>
m = imp.load_source(namespace, filepath)
File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/airflow/plugins/parameterized_dags/__init__.py", line 4, in
<module>
from airflow.www.app import csrf
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/www/app.py",
line 37, in <module>
from airflow.www.blueprints import routes
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/www/blueprints.py",
line 24, in <module>
from airflow import jobs
File "<frozen importlib._bootstrap>", line 1035, in _handle_fromlist
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/__init__.py",
line 21, in <module>
from airflow.jobs.base_job import BaseJob # noqa: F401
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 48, in <module>
class BaseJob(Base, LoggingMixin):
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 82, in BaseJob
executor=executors.get_default_executor(),
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/executors/__init__.py",
line 48, in get_default_executor
DEFAULT_EXECUTOR = _get_executor(executor_name)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/executors/__init__.py",
line 86, in _get_executor
return KubernetesExecutor()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 650, in __init__
self._manager = multiprocessing.Manager()
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 56, in
Manager
m.start()
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 563, in
start
self._process.start()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 277, in
_Popen
return Popen(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 20, in
__init__
self._launch(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 74, in
_launch
code = process_obj._bootstrap()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in
_bootstrap
self.run()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 597, in
_run_server
server.serve_forever()
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 165, in
serve_forever
self.stop_event.wait(1)
File "/usr/local/lib/python3.7/threading.py", line 552, in wait
signaled = self._cond.wait(timeout)
File "/usr/local/lib/python3.7/threading.py", line 300, in wait
gotit = waiter.acquire(True, timeout)
File "<string>", line 1, in <module>
File "<string>", line 5, in <module>
{code}
Thread 4: a second(?) threadmanager server
{code:java}
Thread 0x7f0c01c31700
File "/usr/local/lib/python3.7/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 201, in
handle_request
result = func(c, *args, **kwds)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 422, in
accept_connection
self.serve_client(c)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 234, in
serve_client
request = recv()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 250, in
recv
buf = self._recv_bytes()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 407, in
_recv_bytes
buf = self._recv(4)
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 379, in
_recv
chunk = read(handle, remaining)Thread 0x7f0bff430700
File "/usr/local/lib/python3.7/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 178, in
accepter
c = self.listener.accept()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 453, in
accept
c = self._listener.accept()
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 598, in
accept
s, self._last_accepted = self._socket.accept()
File "/usr/local/lib/python3.7/socket.py", line 212, in accept
fd, addr = self._accept()Thread 0x7f0c13c7d700 File
"/usr/local/airflow/.local/bin/airflow", line 32, in <module>
args.func(args)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py",
line 74, in wrapper
return f(*args, **kwargs)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/bin/cli.py",
line 1013, in scheduler
job.run()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 213, in run
self._execute()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1350, in _execute
self._execute_helper()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1374, in _execute_helper
self.executor.start()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 756, in start
self.kube_client, self.worker_uuid
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 412, in __init__
self._manager = multiprocessing.Manager()
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 56, in
Manager
m.start()
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 563, in
start
self._process.start()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 277, in
_Popen
return Popen(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 20, in
__init__
self._launch(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 74, in
_launch
code = process_obj._bootstrap()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in
_bootstrap
self.run()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 597, in
_run_server
server.serve_forever()
File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 165, in
serve_forever
self.stop_event.wait(1)
File "/usr/local/lib/python3.7/threading.py", line 552, in wait
signaled = self._cond.wait(timeout)
File "/usr/local/lib/python3.7/threading.py", line 300, in wait
gotit = waiter.acquire(True, timeout)
File "<string>", line 1, in <module>
File "<string>", line 5, in <module>
{code}
Thread 5: Jobwatcher reading k8s stream
{code:java}
Thread 0x7f0c13c7d700
File "/usr/local/airflow/.local/bin/airflow", line 32, in <module>
args.func(args) File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py",
line 74, in wrapper
return f(*args, **kwargs)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/bin/cli.py",
line 1013, in scheduler
job.run()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 213, in run
self._execute()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1350, in _execute
self._execute_helper()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1374, in _execute_helper
self.executor.start()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 756, in start
self.kube_client, self.worker_uuid
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 415, in __init__
self.kube_watcher = self._make_kube_watcher()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 421, in _make_kube_watcher
watcher.start()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 223, in
_Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 277, in
_Popen
return Popen(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 20, in
__init__
self._launch(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 74, in
_launch
code = process_obj._bootstrap()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in
_bootstrap
self.run()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 325, in run
self.worker_uuid, self.kube_config)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
line 349, in _run
**kwargs):
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/watch/watch.py",
line 144, in stream
for line in iter_resp_lines(resp):
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/watch/watch.py",
line 48, in iter_resp_lines
for seg in resp.read_chunked(decode_content=False):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 704,
in read_chunked
self._update_chunk_length()
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 636,
in _update_chunk_length
line = self._fp.fp.readline()
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py",
line 304, in recv_into
return self.connection.recv_into(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1821, in
recv_into
result = _lib.SSL_read(self._ssl, buf, nbytes)
File "<string>", line 1, in <module>
File "<string>", line 5, in <module>
{code}
Thread 6: reading DAG files
{code:java}
Thread 0x7f0c13c7d700
File "/usr/local/airflow/.local/bin/airflow", line 32, in <module>
args.func(args)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py",
line 74, in wrapper
return f(*args, **kwargs)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/bin/cli.py",
line 1013, in scheduler
job.run()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 213, in run
self._execute()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1350, in _execute
self._execute_helper()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1380, in _execute_helper
self.processor_agent.start()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/dag_processing.py",
line 543, in start
self._process.start()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 223, in
_Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 277, in
_Popen
return Popen(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 20, in
__init__
self._launch(process_obj)
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 74, in
_launch
code = process_obj._bootstrap()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in
_bootstrap
self.run()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/dag_processing.py",
line 613, in _run_processor_manager
processor_manager.start()
File
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/dag_processing.py",
line 872, in start
self._signal_conn.send(dag_parsing_stat)
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 206, in
send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 404, in
_send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 368, in
_send
n = write(self._handle, buf)
File "<string>", line 1, in <module>
File "<string>", line 5, in <module>
{code}
> KubernetesExecutor hangs on task queueing
> -----------------------------------------
>
> Key: AIRFLOW-5447
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5447
> Project: Apache Airflow
> Issue Type: Bug
> Components: executor-kubernetes
> Affects Versions: 1.10.4, 1.10.5
> Environment: Kubernetes version v1.14.3, Airflow version 1.10.4-1.10.5
> Reporter: Henry Cohen
> Assignee: Daniel Imberman
> Priority: Blocker
>
> Starting in 1.10.4, and continuing in 1.10.5, when using the
> KubernetesExecutor, with the webserver and scheduler running in the
> kubernetes cluster, tasks are scheduled, but when added to the task queue,
> the executor process hangs indefinitely. Based on log messages, it appears to
> be stuck at this line
> https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/executors/kubernetes_executor.py#L761
--
This message was sent by Atlassian Jira
(v8.3.2#803003)