[ 
https://issues.apache.org/jira/browse/AIRFLOW-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237358#comment-17237358
 ] 

Christian Pfaff commented on AIRFLOW-2946:
------------------------------------------

I'm facing the same issue and have tried a couple of things:
 * increased the sql_alchemy_pool_size and -max_overflow in airflow.cfg
 * increased no of max connections on Postgres

nothing worked.

Background: The DAG is calling several 10-thousand objects behind a REST API 
endpoint via a HttpHook in parallel. Therefore, the requests are running in 
Threads with batches of 100.

 

What works as a quick-fix for me is a modified version of the HttpHook, which 
"caches" the conn in a static variable:
{code:java}
class HttpHook2(BaseHook):    
...
    conn = None 
...

    def get_conn(self, headers=None):        
    ...        
    session = requests.Session()        
    if self.http_conn_id:            
        if HttpHook2.conn == None:                 
            HttpHook2.conn = self.get_connection(self.http_conn_id)             
   
            conn = HttpHook2.conn            
        else:                 
            conn = HttpHook2.conn
...{code}
 
 

Not an elegant solution, I know, but maybe it helps to debug and fix the 
problem with a more professional solution :)

 

 

> Connection times out on airflow worker
> --------------------------------------
>
>                 Key: AIRFLOW-2946
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2946
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery, executors, worker
>    Affects Versions: 1.10.0
>         Environment: ubuntu 16.04, AWS EC2 
>            Reporter: Avik Aggarwal
>            Priority: Critical
>
> Hi 
> I have Airflow cluster setup running Celery executors with Postgresql 
> installed on same machine as webserver and scheduler.
> After sometime, remote worker shows error 'Connection timed out' and Airflow 
> queues number of configured tasks in pool in queue and flow hungs up there 
> until queue tasks are deleted manually after stopping the scheduler service.
>  
> Logs:
> [2018-08-23 13:44:03,954: ERROR/MainProcess] Pool callback raised exception: 
> OperationalError('(psycopg2.OperationalError) could not connect to server: 
> Connection timed out\n\tIs the server running on host "<host>" and 
> accepting\n\tTCP/IP connections on port 5432?\n',)
>  Traceback (most recent call last):
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/billiard/pool.py", 
> line 1747, in safe_apply_callback
>  fun(*args, **kwargs)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/worker/request.py", 
> line 367, in on_failure
>  self.id, exc, request=self, store_result=self.store_errors,
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/base.py", 
> line 157, in mark_as_failure
>  traceback=traceback, request=request)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/base.py", 
> line 322, in store_result
>  request=request, **kwargs)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py",
>  line 53, in _inner
>  return fun(*args, **kwargs)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py",
>  line 105, in _store_result
>  session = self.ResultSession()
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/__init__.py",
>  line 99, in ResultSession
>  **self.engine_options)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/session.py",
>  line 60, in session_factory
>  self.prepare_models(engine)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/celery/backends/database/session.py",
>  line 55, in prepare_models
>  ResultModelBase.metadata.create_all(engine)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/sql/schema.py", 
> line 4005, in create_all
>  tables=tables)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 1939, in _run_visitor
>  with self._optional_conn_ctx_manager(connection) as conn:
>  File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__
>  return self.gen.next()
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 1932, in _optional_conn_ctx_manager
>  with self.contextual_connect() as conn:
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 2123, in contextual_connect
>  self._wrap_pool_connect(self.pool.connect, None),
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 2162, in _wrap_pool_connect
>  e, dialect, self)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 1476, in _handle_dbapi_exception_noconnection
>  exc_info
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", 
> line 265, in raise_from_cause
>  reraise(type(exception), exception, tb=exc_tb, cause=cause)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
> line 2158, in _wrap_pool_connect
>  return fn()
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
> line 403, in connect
>  return _ConnectionFairy._checkout(self)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
> line 791, in _checkout
>  fairy = _ConnectionRecord.checkout(pool)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
> line 532, in checkout
>  rec = pool._do_get()
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
> line 1287, in _do_get
>  return self._create_connection()
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
> line 350, in _create_connection
>  return _ConnectionRecord(self)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
> line 477, in __init__
>  self.__connect(first_connect_check=True)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
> line 674, in __connect
>  connection = pool._invoke_creator(self)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/strategies.py",
>  line 106, in connect
>  return dialect.connect(*cargs, **cparams)
>  File 
> "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py",
>  line 412, in connect
>  return self.dbapi.connect(*cargs, **cparams)
>  File "/home/ubuntu/.local/lib/python2.7/site-packages/psycopg2/__init__.py", 
> line 130, in connect
>  conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
>  OperationalError: (psycopg2.OperationalError) could not connect to server: 
> Connection timed out
>  Is the server running on host "<host>" and accepting
>  TCP/IP connections on port 5432?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to