ekarsten opened a new issue #16340: URL: https://github.com/apache/airflow/issues/16340
**Description** Currently, the [PostgresOperator](https://github.com/apache/airflow/blob/main/airflow/providers/postgres/operators/postgres.py) defines the database hook in a fixed way: ```self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)``` This makes the operator inflexible for certain tech stacks. For example, long queries in Redshift will sometimes fail (in my experience) with `psycopg2.operationalerror: SSL SYSCALL error: EOF detected` because the hook timed out on the Redshift side of things. The workaround (described in this [StackExchange post](https://stackoverflow.com/a/63130830/10131478)) that worked for me is to specify some additional arguments in the Hook (see below) and then write my own custom operator. ``` PostgresHook( postgres_conn_id=postgres_conn_id, extra={ "keepalives": 1, "keepalives_idle": 30, "keepalives_interval": 5, "keepalives_count": 5, } ) ``` Now I don't think the default implementation of the PostgresOperator should have these extra parameters, I just think it should be modified to be more flexible so that users can pass arguments to the hook as well. My proposed solution (and I will open a PR after some community input on this solution) would be either OPTION 1: (User-specified hook) ``` def __init__( self, *, sql: str, postgres_conn_id: str = 'postgres_default', autocommit: bool = False, parameters: Optional[Union[Mapping, Iterable]] = None, database: Optional[str] = None, hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database), **kwargs, ) -> None: super().__init__(**kwargs) self.sql = sql self.postgres_conn_id = postgres_conn_id self.autocommit = autocommit self.parameters = parameters self.database = database self.hook = hook def execute(self, context): self.log.info('Executing: %s', self.sql) self.hook.run(self.sql, self.autocommit, parameters=self.parameters) for output in self.hook.conn.notices: self.log.info(output) ``` OPTION 2: (Pass extra params to hook) ``` def __init__( self, *, sql: str, postgres_conn_id: str = 'postgres_default', autocommit: bool = False, parameters: Optional[Union[Mapping, Iterable]] = None, database: Optional[str] = None, **kwargs, ) -> None: super().__init__(**kwargs) self.sql = sql self.postgres_conn_id = postgres_conn_id self.autocommit = autocommit self.parameters = parameters self.database = database self.extra = kwargs.pop("extra", None) self.hook = None def execute(self, context): self.log.info('Executing: %s', self.sql) self.hook = PostgresHook( postgres_conn_id=self.postgres_conn_id, schema=self.database, extra=self.extra ) self.hook.run(self.sql, self.autocommit, parameters=self.parameters) for output in self.hook.conn.notices: self.log.info(output) ``` I slightly favor Option 2 because I don't think one should be using the PostgresOperator with a hook other than the PostgresHook, so really all we want is for users to be able to pass the named parameters through. **Use case / motivation** This feature will make the PostgresOperator more flexible, allowing users to pass more arguments to the PostgresHook to make better use of its features. **Are you willing to submit a PR?** Yes, I would be happy to submit a PR. **Related Issues** I couldn't find a related issue, but those more familiar with the project might identify this as a duplicate and I would be happy to delete if that is the case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
