malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-829203997
The following is an example of an alternative implementation that works with
the existing codebase, specifically addressing the requirement of calling a
stored procedure that has one or more _out parameters_.
Simply use a dummy parameter with the right Python type for each _out
parameter_ (e.g. the integer `0` for a number) and the entire set of parameters
(both in/out directions) will be pushed as Xcom (if enabled – which is the
default setting).
```python
from airflow.models import BaseOperator
from airflow.providers.oracle.hooks.oracle import OracleHook
from airflow.utils.decorators import apply_defaults
class OracleCallOperator(BaseOperator):
ui_color = '#ededed'
@apply_defaults
def __init__(
self,
*,
function: str,
oracle_conn_id: str = 'oracle_default',
parameters: Optional[Union[Mapping, Iterable]] = None,
autocommit: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.oracle_conn_id = oracle_conn_id
self.function = function
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context) -> None:
self.log.info('Executing: %s', self.function)
hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
args = ", ".join(
f":{name}" for name in (
self.parameters if isinstance(self.parameters, dict)
else range(1, len(self.parameters) + 1)
)
)
sql = f"BEGIN {self.function}({args}); END;"
with hook.get_conn() as conn, conn.cursor() as cursor:
cursor.execute(sql, self.parameters)
if isinstance(cursor.bindvars, list):
return [v.getvalue() for v in cursor.bindvars]
if isinstance(cursor.bindvars, dict):
return {n: v.getvalue() for (n, v) in
cursor.bindvars.items()}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]