potiuk commented on PR #38362:
URL: https://github.com/apache/airflow/pull/38362#issuecomment-2015097229
I do not think we ever use row count after `executemany` in airflow core so
it should be safe to merge.
I am not 100% sure (maybe others who know SQLAlchemy better) - but I believe
the only place where SQL Alchemy runs execute many for update would be in our
"bulk_" methods.
The only thin that `values_plus_batch` changes is that when it run bulk
update it uses 'execute_batch` which does not return row-count properly. I
looked at all the `bulk_*` methods of ours in core to see if they return any
row count or make use of it, but I could not find any.
The only place I could find that the bulk method would return anything is
(in dag.py)
```python
return cls.bulk_write_to_db(dags=dags, session=session)
```
But after closer inspection it always return None - because that's what
bulk_write_to_db returns.
I also looked at the `rowcount` use and the only places I could find it used
is:
* in delete statements (not relevant) - delete_dag.py, dag_run_endpoint.py,
dataset_endpoint.py, pool_endpoint.py,
* in single update statements:
- dag_processin/manager.py
```python
deactivated_dagmodel = session.execute(
update(DagModel)
.where(DagModel.dag_id.in_(to_deactivate))
.values(is_active=False)
.execution_options(synchronize_session="fetch")
)
deactivated = deactivated_dagmodel.rowcount
```
- scheduler_job_runner.py:
```python
num_failed = session.execute(
update(Job)
.where(
Job.job_type == "SchedulerJob",
Job.state == JobState.RUNNING,
Job.latest_heartbeat < (timezone.utcnow() -
timedelta(seconds=timeout)),
)
.values(state=JobState.FAILED)
).rowcount
```
and
```python
update(TI)
.where(
TI.state == TaskInstanceState.DEFERRED,
TI.trigger_timeout < timezone.utcnow(),
)
.values(
state=TaskInstanceState.SCHEDULED,
next_method="__fail__",
next_kwargs={"error": "Trigger/execution timeout"},
trigger_id=None,
)
```
- models/dagrun.py:
```python
count += session.execute(
update(TI)
.where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
tuple_in_condition((TI.task_id, TI.map_index),
schedulable_ti_ids_chunk),
)
.values(state=TaskInstanceState.SCHEDULED)
.execution_options(synchronize_session=False)
).rowcount
```
and
```python
update(TI)
.where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
tuple_in_condition((TI.task_id, TI.map_index),
dummy_ti_ids_chunk),
)
.values(
state=TaskInstanceState.SUCCESS,
start_date=timezone.utcnow(),
end_date=timezone.utcnow(),
duration=0,
)
.execution_options(
synchronize_session=False,
)
).rowcount
```
Here is the doc,
> 'values_plus_batch'- SQLAlchemy’s native
[insertmanyvalues](https://docs.sqlalchemy.org/en/20/core/connections.html#engine-insertmanyvalues)
handler is used for qualifying INSERT statements, assuming
[create_engine.use_insertmanyvalues](https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine.params.use_insertmanyvalues)
is left at its default value of True. Then, psycopg2’s execute_batch() handler
is used for qualifying UPDATE and DELETE statements when executed with multiple
parameter sets. When using this mode, the
[CursorResult.rowcount](https://docs.sqlalchemy.org/en/20/core/connections.html#sqlalchemy.engine.CursorResult.rowcount)
attribute will not contain a value for executemany-style executions against
UPDATE and DELETE statements.
It would be good if others (@ashb @dstandish @kaxil @uranusjr could take a
look as well and see if they can confirm that we should not be affected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]