Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3414139738


##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -184,22 +553,165 @@ def start(self) -> None:
             name=f"shared-stream-poll[{self.key!r}]",
         )
 
+    def _request_pump_stop(self) -> None:
+        """
+        Ask the advance pump to drain the already-resolved lane heads and exit.
+
+        Synchronous (no await) so it can run inside ``_poll``'s terminal
+        section without yielding.
+        """
+        self._pump_stopping = True
+        self._advance_wakeup.set()
+
+    async def _run_advance_pump(self, producer: SharedStreamProducer) -> None:
+        """
+        Dispatch broker advances in per-lane fan-out order, one batch at a 
time.
+
+        A single task scans the lanes round-robin: each pass dispatches at
+        most one batch per lane — the lane's contiguous resolved prefix —
+        and passes repeat until one makes no progress. An advance that
+        raises is logged, its batch is abandoned to broker redelivery, and
+        the pump moves on. On stop the pump keeps passing until every
+        already-resolved dispatchable event is drained, abandons unresolved
+        entries to broker redelivery, and exits.
+        """
+        while True:
+            if not self._pump_stopping:
+                await self._advance_wakeup.wait()
+            self._advance_wakeup.clear()
+            progress = True
+            while progress:
+                progress = False
+                # Snapshot: lanes are added (by _poll) while we await below.
+                # A lane inserted during an await is invisible to this pass,
+                # but its appearance always comes with a wakeup.set() (born
+                # resolved) or a later resolve, so the next progress pass or
+                # the next wakeup picks it up — nothing is lost. A no-progress
+                # pass costs O(#lanes) and falls back to wait().
+                for lane in list(self._lane_queues):
+                    lane_queue = self._lane_queues[lane]
+                    # The deque head is always present in _outstanding: entry
+                    # deletion belongs to the pump alone, and it removes the
+                    # deque slot and the dict entry together below — a
+                    # KeyError here would be a bug. Harvest the lane's
+                    # contiguous resolved prefix synchronously, before the
+                    # await, so an advance that raises abandons the whole
+                    # batch to broker redelivery.
+                    batch: list[AdvanceItem] = []
+                    while lane_queue and not 
self._outstanding[lane_queue[0]].pending:
+                        head_id = lane_queue.popleft()
+                        entry = self._outstanding.pop(head_id)
+                        batch.append(
+                            AdvanceItem(
+                                entry.broker_payload,
+                                AdvanceOutcome(
+                                    acked=entry.acked, failed=entry.failed, 
rejected=entry.rejected
+                                ),
+                            )
+                        )
+                    if not lane_queue:
+                        # Sole lane GC point; _poll recreates the lane on 
demand.
+                        del self._lane_queues[lane]
+                    if not batch:
+                        continue
+                    try:
+                        await producer.advance(batch)
+                    except Exception as exc:
+                        self.log.error(
+                            "Producer advance raised; broker advance failed",
+                            key=self.key,
+                            lane=lane,
+                            batch_size=len(batch),
+                            exc_info=exc,
+                        )

Review Comment:
   I ended up go with option 1. Now any `advance` exception terminates the 
whole shared-stream group and fails every subscriber, instead of logging and 
continuing. The offset is never committed, so the broker redelivers from the 
last committed point — no silent loss on single-partition Kafka.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to