shahar1 commented on code in PR #61143:
URL: https://github.com/apache/airflow/pull/61143#discussion_r2773989685


##########
providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py:
##########
@@ -93,6 +99,7 @@ def __init__(
         self.source_hook_params = source_hook_params
         self.destination_conn_id = destination_conn_id
         self.destination_hook_params = destination_hook_params
+        self._rows_processor = rows_processor

Review Comment:
   Why making it internal attribute?



##########
providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py:
##########
@@ -196,15 +207,7 @@ def execute_complete(
                 self.log.info("Offset increased to %d", offset)
                 context["ti"].xcom_push(key="offset", value=offset)
 
-                self.log.info("Inserting %d rows into %s", len(results), 
self.destination_conn_id)
-                self.destination_hook.insert_rows(
-                    table=self.destination_table, rows=results, 
**self.insert_args
-                )
-                self.log.info(
-                    "Inserting %d rows into %s done!",
-                    len(results),
-                    self.destination_conn_id,
-                )
+                self._insert_rows(rows=rows, context=context)

Review Comment:
   Worth adding a unit test for the deferrable case



##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -1367,16 +1369,6 @@ def __init__(
         self.insert_args = insert_args or {}
         self.do_xcom_push = False
 
-    def render_template_fields(
-        self,
-        context: Context,
-        jinja_env: jinja2.Environment | None = None,
-    ) -> None:
-        super().render_template_fields(context=context, jinja_env=jinja_env)
-
-        if isinstance(self.rows, XComArg):
-            self.rows = self.rows.resolve(context=context)

Review Comment:
   Before removing this, XComArgs were resolved before `execute()`. Now 
`self.rows` could be an `XComArg` when `if not self.rows` runs, and `XComArg` 
is always truthy, so the skip-on-empty check won’t fire (e.g., and XCom 
resolving to an empty list).



##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -1307,7 +1305,9 @@ class SQLInsertRowsOperator(BaseSQLOperator):
     :param rows: the rows to insert into the table. Rows can be a list of 
tuples or a list of dictionaries.
         When a list of dictionaries is provided, the column names are inferred 
from the dictionary keys and
         will be matched with the column names, ignored columns will be 
filtered out.
-    :rows_processor: (optional) a function that will be applied to the rows 
before inserting them into the table.
+    :param rows_processor: (optional) A callable applied once per batch of 
rows before insertion.
+        It receives the full list of rows and the task context, and must 
return a list of rows compatible with
+        the underlying hook's.

Review Comment:
   ```suggestion
           the underlying hook.
   ```
   
   or
   
   ```suggestion
           the underlying hook's `insert_rows` method.
   ```



##########
providers/common/sql/src/airflow/providers/common/sql/triggers/sql.pyi:
##########
@@ -32,11 +32,14 @@
 from collections.abc import AsyncIterator
 from typing import Any
 
+from airflow.providers.common.sql.hooks.sql import DbApiHook as DbApiHook
 from airflow.triggers.base import BaseTrigger as BaseTrigger, TriggerEvent as 
TriggerEvent
 
 class SQLExecuteQueryTrigger(BaseTrigger):
     def __init__(
         self, sql: str | list[str], conn_id: str, hook_params: dict | None = 
None, **kwargs
     ) -> None: ...
     def serialize(self) -> tuple[str, dict[str, Any]]: ...
+    def get_hook(self) -> DbApiHook: ...
+    async def get_records(self) -> Any: ...

Review Comment:
   There's no `async def get_records` in the implementation (only 
`_get_records`).
   I assume that the intention of prefixing with `_` in the implementation was 
prepare for making it mandatory in the future, but maybe the current 
definitions need some refinement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to