kaxil commented on code in PR #63874:
URL: https://github.com/apache/airflow/pull/63874#discussion_r3334166319


##########
airflow-core/src/airflow/models/renderedtifields.py:
##########
@@ -239,13 +242,62 @@ def get_k8s_pod_yaml(cls, ti: TaskInstance, session: 
Session = NEW_SESSION) -> d
 
     @provide_session
     @retry_db_transaction
-    def write(self, session: Session):
+    def write(self, session: Session = NEW_SESSION):
         """
         Write instance to database.
 
+        Uses a database-level upsert (INSERT ... ON CONFLICT DO UPDATE) to
+        atomically insert or update the record, avoiding race conditions that
+        can occur with session.merge() when concurrent requests (e.g. from
+        client-side timeout retries) target the same primary key.
+
         :param session: SqlAlchemy Session
         """
-        session.merge(self)
+        dialect_name = get_dialect_name(session)
+
+        values = {
+            "dag_id": self.dag_id,
+            "task_id": self.task_id,
+            "run_id": self.run_id,
+            "map_index": self.map_index,
+            "rendered_fields": self.rendered_fields,
+            "k8s_pod_yaml": self.k8s_pod_yaml,
+        }
+        update_on_conflict = {
+            "rendered_fields": self.rendered_fields,
+            "k8s_pod_yaml": self.k8s_pod_yaml,
+        }
+
+        stmt: MySQLInsert | PostgreSQLInsert | SQLiteInsert
+
+        if dialect_name == "postgresql":

Review Comment:
   Nice-to-have / possible follow-up (non-blocking): this pg/mysql/sqlite 
branching is the same shape as `_build_variable_upsert_stmt` in 
`models/variable.py`, which already returns `MySQLInsert | PostgreSQLInsert | 
SQLiteInsert` and takes `conflict_cols`/`values`/`update_fields`. Could reuse a 
shared helper near `get_dialect_name` instead of inlining another copy. One 
thing a consolidated helper should pick up from here: the explicit `else: raise 
ValueError` is better than that helper's silent SQLite fallthrough (it treats 
any unknown dialect as sqlite), so the shared version should adopt this PR's 
strictness.



##########
airflow-core/tests/unit/models/test_renderedtifields.py:
##########
@@ -372,6 +372,66 @@ def test_write(self, dag_maker):
             {"bash_command": "echo test_val_updated", "env": None, "cwd": 
None},
         )
 
+    def test_write_upsert_existing_record(self, dag_maker, session):
+        """
+        Test that write() handles a pre-existing row without raising 
IntegrityError.
+
+        The first row is seeded via a direct INSERT (bypassing write()), 
simulating
+        a row already committed by the first SDK request. The second call goes 
through
+        write() with different values, which must succeed and update the 
record.
+
+        A plain INSERT statement would raise IntegrityError here; write() must 
use a
+        database-level upsert to handle the conflict atomically. This is the 
scenario

Review Comment:
   Following up on the resolved Copilot thread: this docstring claim isn't 
quite right. The old `session.merge(self)` would also handle the seeded row 
without an `IntegrityError` (merge does a SELECT-then-UPDATE), so the test 
passes against both the old and new implementations, and the "plain INSERT 
would raise" framing overstates what it proves. It also runs in a single 
transaction, so it doesn't actually reproduce the concurrent-transaction race 
it describes. The test still adds real DO-UPDATE coverage -- I'd just soften 
the docstring to say it verifies `write()` updates an existing row, rather than 
implying it reproduces the race.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to