jason810496 commented on issue #64205:
URL: https://github.com/apache/airflow/issues/64205#issuecomment-4169799333

   Hi @spencerhuang,
   
   Thank you for the detailed description.
   
   > We now have two Separate competing “Sources of Truth” (Kafka offsets vs. 
Airflow Asset state) that are fundamentally out of sync.⁠
   
   After testing this myself, I would suggest implementing a custom trigger to 
handle the processing and DLQ logic atomically if you really need strict 
guarantees of messaging delivery semantics. Since the Airflow Triggerer handles 
fired triggers in batches, it commits the offset before creating the 
‎`AssetEvent`. This means that if you set ‎`commit_offset=True` on the current 
community ‎`AwaitMessageTrigger`, there is still a chance that a corresponding 
‎`AssetEvent` will not be produced even though the offset has already been 
committed.
   
   After implementing the custom trigger that handles the processing and DLQ 
logic atomically, you can then implement ‎`BaseMessageQueueProvider` to 
register it for ‎`MessageQueueTriggers`:
   
   
https://github.com/apache/airflow/blob/1790ec41eeaff6e33f78b39ad50b2bcd98107e67/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/kafka.py#L34
   
   Additionally, may I ask which Airflow Kafka Provider version you're using?
   
   Since the latest Kafka Provider introduced ‎`commit_offset=True` as a 
default, it effectively became a breaking change for all users. From my 
perspective, I plan to yank the latest Kafka Provider and set the new 
‎`commit_offset` parameter to ‎`False` by default.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to