shahar1 commented on code in PR #61143:
URL: https://github.com/apache/airflow/pull/61143#discussion_r2773989685
##########
providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py:
##########
@@ -93,6 +99,7 @@ def __init__(
self.source_hook_params = source_hook_params
self.destination_conn_id = destination_conn_id
self.destination_hook_params = destination_hook_params
+ self._rows_processor = rows_processor
Review Comment:
Why making it internal attribute?
##########
providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py:
##########
@@ -196,15 +207,7 @@ def execute_complete(
self.log.info("Offset increased to %d", offset)
context["ti"].xcom_push(key="offset", value=offset)
- self.log.info("Inserting %d rows into %s", len(results),
self.destination_conn_id)
- self.destination_hook.insert_rows(
- table=self.destination_table, rows=results,
**self.insert_args
- )
- self.log.info(
- "Inserting %d rows into %s done!",
- len(results),
- self.destination_conn_id,
- )
+ self._insert_rows(rows=rows, context=context)
Review Comment:
Worth adding a unit test for the deferrable case
##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -1367,16 +1369,6 @@ def __init__(
self.insert_args = insert_args or {}
self.do_xcom_push = False
- def render_template_fields(
- self,
- context: Context,
- jinja_env: jinja2.Environment | None = None,
- ) -> None:
- super().render_template_fields(context=context, jinja_env=jinja_env)
-
- if isinstance(self.rows, XComArg):
- self.rows = self.rows.resolve(context=context)
Review Comment:
Before removing this, XComArgs were resolved before `execute()`. Now
`self.rows` could be an `XComArg` when `if not self.rows` runs, and `XComArg`
is always truthy, so the skip-on-empty check won’t fire (e.g., and XCom
resolving to an empty list).
##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -1307,7 +1305,9 @@ class SQLInsertRowsOperator(BaseSQLOperator):
:param rows: the rows to insert into the table. Rows can be a list of
tuples or a list of dictionaries.
When a list of dictionaries is provided, the column names are inferred
from the dictionary keys and
will be matched with the column names, ignored columns will be
filtered out.
- :rows_processor: (optional) a function that will be applied to the rows
before inserting them into the table.
+ :param rows_processor: (optional) A callable applied once per batch of
rows before insertion.
+ It receives the full list of rows and the task context, and must
return a list of rows compatible with
+ the underlying hook's.
Review Comment:
```suggestion
the underlying hook.
```
or
```suggestion
the underlying hook's `insert_rows` method.
```
##########
providers/common/sql/src/airflow/providers/common/sql/triggers/sql.pyi:
##########
@@ -32,11 +32,14 @@
from collections.abc import AsyncIterator
from typing import Any
+from airflow.providers.common.sql.hooks.sql import DbApiHook as DbApiHook
from airflow.triggers.base import BaseTrigger as BaseTrigger, TriggerEvent as
TriggerEvent
class SQLExecuteQueryTrigger(BaseTrigger):
def __init__(
self, sql: str | list[str], conn_id: str, hook_params: dict | None =
None, **kwargs
) -> None: ...
def serialize(self) -> tuple[str, dict[str, Any]]: ...
+ def get_hook(self) -> DbApiHook: ...
+ async def get_records(self) -> Any: ...
Review Comment:
There's no `async def get_records` in the implementation (only
`_get_records`).
I assume that the intention of prefixing with `_` in the implementation was
prepare for making it mandatory in the future, but maybe the current
definitions need some refinement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]