kazanzhy commented on code in PR #25717:
URL: https://github.com/apache/airflow/pull/25717#discussion_r956570972


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -112,6 +124,61 @@ def get_db_hook(self) -> DbApiHook:
         return self._hook
 
 
+class SQLExecuteQueryOperator(BaseSQLOperator):
+    """
+    Executes SQL code in a specific database
+    :param sql: the SQL code or string pointing to a template file to be 
executed (templated).
+    File must have a '.sql' extensions.
+    :param handler: (optional) the function that will be applied to the cursor.
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :param conn_id: reference to a specific database
+    """
+
+    template_fields: Sequence[str] = ('sql', 'parameters')
+    template_ext: Sequence[str] = ('.sql',)
+    ui_color = '#cdaaed'
+
+    def __init__(
+        self,
+        *,
+        sql: Union[str, List[str]],
+        autocommit: bool = False,
+        parameters: Optional[Union[Mapping, Iterable]] = None,
+        handler: Callable[[Any], Any] = fetch_all_handler,
+        split_statements: bool = False,
+        return_last: bool = True,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.sql = sql
+        self.autocommit = autocommit
+        self.parameters = parameters
+        self.handler = handler
+        self.split_statements = split_statements
+        self.return_last = return_last
+
+    def execute(self, context):
+        self.log.info('Executing: %s', self.sql)
+        hook = self.get_db_hook()
+        if self.do_xcom_push:
+            output = hook.run(
+                self.sql,
+                autocommit=self.autocommit,
+                parameters=self.parameters,
+                handler=self.handler,
+                split_statements=self.split_statements,
+                return_last=self.return_last,
+            )
+        else:
+            output = hook.run(self.sql, autocommit=self.autocommit, 
parameters=self.parameters)
+
+        if hasattr(self, '_process_output'):
+            for out in output:
+                self._process_output(*out)
+
+        return output

Review Comment:
   Here is one interesting thing.
   If we set `None` as the default handler then results won't be returned. 
   Even for `do_xcom_push=True`.
   In another case, if we want to keep `fetch_all_handler` as the default 
handler then we can't pass it for non-SELECT statements because it raises an 
error.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to