ekarsten opened a new issue #16340:
URL: https://github.com/apache/airflow/issues/16340


   **Description**
   
   Currently, the 
[PostgresOperator](https://github.com/apache/airflow/blob/main/airflow/providers/postgres/operators/postgres.py)
 defines the database hook in a fixed way: 
   ```self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, 
schema=self.database)```
   This makes the operator inflexible for certain tech stacks. For example, 
long queries in Redshift will sometimes fail (in my experience) with 
`psycopg2.operationalerror: SSL SYSCALL error: EOF detected` because the hook 
timed out on the Redshift side of things. The workaround (described in this 
[StackExchange post](https://stackoverflow.com/a/63130830/10131478)) that 
worked for me is to specify some additional arguments in the Hook (see below) 
and then write my own custom operator. 
   ```
   PostgresHook(
           postgres_conn_id=postgres_conn_id,
           extra={
               "keepalives": 1,
               "keepalives_idle": 30,
               "keepalives_interval": 5,
               "keepalives_count": 5,
           }
       )
   ```
   Now I don't think the default implementation of the PostgresOperator should 
have these extra parameters, I just think it should be modified to be more 
flexible so that users can pass arguments to the hook as well. My proposed 
solution (and I will open a PR after some community input on this solution) 
would be either 
   
   OPTION 1: (User-specified hook)
   
   ```
       def __init__(
           self,
           *,
           sql: str,
           postgres_conn_id: str = 'postgres_default',
           autocommit: bool = False,
           parameters: Optional[Union[Mapping, Iterable]] = None,
           database: Optional[str] = None,
           hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, 
schema=self.database),
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.sql = sql
           self.postgres_conn_id = postgres_conn_id
           self.autocommit = autocommit
           self.parameters = parameters
           self.database = database
           self.hook = hook
   
       def execute(self, context):
           self.log.info('Executing: %s', self.sql)
           self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
           for output in self.hook.conn.notices:
               self.log.info(output)
   ```
   
   OPTION 2: (Pass extra params to hook)
   
   ```
       def __init__(
           self,
           *,
           sql: str,
           postgres_conn_id: str = 'postgres_default',
           autocommit: bool = False,
           parameters: Optional[Union[Mapping, Iterable]] = None,
           database: Optional[str] = None,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.sql = sql
           self.postgres_conn_id = postgres_conn_id
           self.autocommit = autocommit
           self.parameters = parameters
           self.database = database
           self.extra = kwargs.pop("extra", None)
           self.hook = None
   
       def execute(self, context):
           self.log.info('Executing: %s', self.sql)
           self.hook = PostgresHook(
               postgres_conn_id=self.postgres_conn_id,
               schema=self.database,
               extra=self.extra
           )
           self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
           for output in self.hook.conn.notices:
               self.log.info(output)
   ```
   
   I slightly favor Option 2 because I don't think one should be using the 
PostgresOperator with a hook other than the PostgresHook, so really all we want 
is for users to be able to pass the named parameters through.
   
   **Use case / motivation**
   
   This feature will make the PostgresOperator more flexible, allowing users to 
pass more arguments to the PostgresHook to make better use of its features.
   
   **Are you willing to submit a PR?**
   
   Yes, I would be happy to submit a PR.
   
   **Related Issues**
   
   I couldn't find a related issue, but those more familiar with the project 
might identify this as a duplicate and I would be happy to delete if that is 
the case.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to