[
https://issues.apache.org/jira/browse/AIRFLOW-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237358#comment-17237358
]
Christian Pfaff commented on AIRFLOW-2946:
------------------------------------------
I'm facing the same issue and have tried a couple of things:
* increased the sql_alchemy_pool_size and -max_overflow in airflow.cfg
* increased no of max connections on Postgres
nothing worked.
Background: The DAG is calling several 10-thousand objects behind a REST API
endpoint via a HttpHook in parallel. Therefore, the requests are running in
Threads with batches of 100.
What works as a quick-fix for me is a modified version of the HttpHook, which
"caches" the conn in a static variable:
{code:java}
class HttpHook2(BaseHook):
...
conn = None
...
def get_conn(self, headers=None):
...
session = requests.Session()
if self.http_conn_id:
if HttpHook2.conn == None:
HttpHook2.conn = self.get_connection(self.http_conn_id)
conn = HttpHook2.conn
else:
conn = HttpHook2.conn
...{code}
Not an elegant solution, I know, but maybe it helps to debug and fix the
problem with a more professional solution :)
> Connection times out on airflow worker
> --------------------------------------
>
> Key: AIRFLOW-2946
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2946
> Project: Apache Airflow
> Issue Type: Bug
> Components: celery, executors, worker
> Affects Versions: 1.10.0
> Environment: ubuntu 16.04, AWS EC2
> Reporter: Avik Aggarwal
> Priority: Critical
>
> Hi
> I have Airflow cluster setup running Celery executors with Postgresql
> installed on same machine as webserver and scheduler.
> After sometime, remote worker shows error 'Connection timed out' and Airflow
> queues number of configured tasks in pool in queue and flow hungs up there
> until queue tasks are deleted manually after stopping the scheduler service.
>
> Logs:
> [2018-08-23 13:44:03,954: ERROR/MainProcess] Pool callback raised exception:
> OperationalError('(psycopg2.OperationalError) could not connect to server:
> Connection timed out\n\tIs the server running on host "<host>" and
> accepting\n\tTCP/IP connections on port 5432?\n',)
> Traceback (most recent call last):
> File "/home/ubuntu/.local/lib/python2.7/site-packages/billiard/pool.py",
> line 1747, in safe_apply_callback
> fun(*args, **kwargs)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/worker/request.py",
> line 367, in on_failure
> self.id, exc, request=self, store_result=self.store_errors,
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/base.py",
> line 157, in mark_as_failure
> traceback=traceback, request=request)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/base.py",
> line 322, in store_result
> request=request, **kwargs)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py",
> line 53, in _inner
> return fun(*args, **kwargs)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py",
> line 105, in _store_result
> session = self.ResultSession()
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py",
> line 99, in ResultSession
> **self.engine_options)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/session.py",
> line 60, in session_factory
> self.prepare_models(engine)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/session.py",
> line 55, in prepare_models
> ResultModelBase.metadata.create_all(engine)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/sql/schema.py",
> line 4005, in create_all
> tables=tables)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
> line 1939, in _run_visitor
> with self._optional_conn_ctx_manager(connection) as conn:
> File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__
> return self.gen.next()
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
> line 1932, in _optional_conn_ctx_manager
> with self.contextual_connect() as conn:
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
> line 2123, in contextual_connect
> self._wrap_pool_connect(self.pool.connect, None),
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
> line 2162, in _wrap_pool_connect
> e, dialect, self)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
> line 1476, in _handle_dbapi_exception_noconnection
> exc_info
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
> line 265, in raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
> line 2158, in _wrap_pool_connect
> return fn()
> File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py",
> line 403, in connect
> return _ConnectionFairy._checkout(self)
> File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py",
> line 791, in _checkout
> fairy = _ConnectionRecord.checkout(pool)
> File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py",
> line 532, in checkout
> rec = pool._do_get()
> File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py",
> line 1287, in _do_get
> return self._create_connection()
> File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py",
> line 350, in _create_connection
> return _ConnectionRecord(self)
> File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py",
> line 477, in __init__
> self.__connect(first_connect_check=True)
> File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py",
> line 674, in __connect
> connection = pool._invoke_creator(self)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/strategies.py",
> line 106, in connect
> return dialect.connect(*cargs, **cparams)
> File
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py",
> line 412, in connect
> return self.dbapi.connect(*cargs, **cparams)
> File "/home/ubuntu/.local/lib/python2.7/site-packages/psycopg2/__init__.py",
> line 130, in connect
> conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
> OperationalError: (psycopg2.OperationalError) could not connect to server:
> Connection timed out
> Is the server running on host "<host>" and accepting
> TCP/IP connections on port 5432?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)