Copilot commented on code in PR #64612:
URL: https://github.com/apache/airflow/pull/64612#discussion_r3066478215


##########
providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py:
##########
@@ -142,6 +146,40 @@ async def test_trigger_run_bad(self, mocker):
         assert task.done() is False
         asyncio.get_event_loop().stop()
 
+    @pytest.mark.asyncio
+    async def test_cleanup_closes_consumer(self, mocker):
+        consumer = MockedConsumer()
+        close_mock = mocker.patch.object(consumer, "close")
+
+        mocker.patch.object(KafkaConsumerHook, "get_consumer", 
return_value=consumer)
+
+        trigger = AwaitMessageTrigger(
+            kafka_config_id="kafka_d",
+            
apply_function="unit.apache.kafka.triggers.test_await_message.apply_function_true",
+            topics=["noop"],
+            poll_timeout=0.0001,
+            poll_interval=5,
+        )
+
+        generator = trigger.run()
+        await generator.__anext__()
+        await trigger.cleanup()
+        await generator.aclose()

Review Comment:
   Calling the dunder method `__anext__()` directly is less idiomatic and 
harder to read. Prefer `await anext(generator)` (or consuming via `async for` 
with a break) to make the test intent clearer.



##########
providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/await_message.py:
##########
@@ -141,3 +145,12 @@ async def run(self):
                     if self.commit_offset:
                         await async_commit(message=message, asynchronous=False)
                     await asyncio.sleep(self.poll_interval)
+
+    async def cleanup(self) -> None:
+        consumer = self._consumer
+        if consumer is not None:
+            self._consumer = None
+            try:
+                await sync_to_async(consumer.close)()
+            except Exception:
+                log.warning("Failed to close Kafka consumer", exc_info=True)

Review Comment:
   Potential race: `run()` captures `poll`/`commit` callables from 
`self._consumer`, but `cleanup()` can concurrently close the consumer (e.g., on 
cancellation) while `run()` is still looping, leading to poll/commit calls on a 
closed consumer and noisy failures. A more robust pattern is to have 
`cleanup()` only signal shutdown (e.g., set a stop flag/event) and perform the 
actual `consumer.close()` in `run()`'s `finally` after the loop exits, so close 
cannot interleave with poll/commit.



##########
providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/await_message.py:
##########
@@ -141,3 +145,12 @@ async def run(self):
                     if self.commit_offset:
                         await async_commit(message=message, asynchronous=False)
                     await asyncio.sleep(self.poll_interval)
+
+    async def cleanup(self) -> None:
+        consumer = self._consumer
+        if consumer is not None:
+            self._consumer = None
+            try:
+                await sync_to_async(consumer.close)()
+            except Exception:
+                log.warning("Failed to close Kafka consumer", exc_info=True)

Review Comment:
   The warning log on close failure lacks context, which can make production 
debugging difficult when multiple triggers are active. Consider including 
identifying details (e.g., `kafka_config_id` and `topics`) in the warning 
message/extra fields so operators can correlate failures to the specific 
trigger instance.



##########
providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/await_message.py:
##########
@@ -107,10 +111,10 @@ async def run(self):
         consumer_hook = KafkaConsumerHook(topics=self.topics, 
kafka_config_id=self.kafka_config_id)
 
         async_get_consumer = sync_to_async(consumer_hook.get_consumer)
-        consumer = await async_get_consumer()
+        self._consumer = await async_get_consumer()
 
-        async_poll = sync_to_async(consumer.poll)
-        async_commit = sync_to_async(consumer.commit)
+        async_poll = sync_to_async(self._consumer.poll)
+        async_commit = sync_to_async(self._consumer.commit)

Review Comment:
   Potential race: `run()` captures `poll`/`commit` callables from 
`self._consumer`, but `cleanup()` can concurrently close the consumer (e.g., on 
cancellation) while `run()` is still looping, leading to poll/commit calls on a 
closed consumer and noisy failures. A more robust pattern is to have 
`cleanup()` only signal shutdown (e.g., set a stop flag/event) and perform the 
actual `consumer.close()` in `run()`'s `finally` after the loop exits, so close 
cannot interleave with poll/commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to