JosephCatrambone-Gridware commented on issue #56494:
URL: https://github.com/apache/airflow/issues/56494#issuecomment-3386762985

   Certainly.  We had a ConsumeFromTopicOperator which was gathering messages 
from a bunch of Kafka topics, dropping some irrelevant ones, then processing 
them en-masse to take advantage of bulk-parallel processing, and pushing the 
resultant list of N elements to the next task downstream.  
   
   The structure looked very similar to the elements in the first message:
   
   ```
   message_batch = ConsumeFromTopicOperator(
       task_id=config.KAFKA_FILTER_ACCUMULATE_TID,
       topics=[config.LISTENER_TOPIC,],
       apply_function="filter_accumulate_process",
   )
   
   message_batch >> postprocess_batch
   ```
   
   We were trying to hack our way around this with 
   
   ```
   def filter_accumulate_process(message, ti):
       ...
       ti.xcom_push(key='results', value=[... filtered and accumulated messages 
...])
   
   message_batch = ConsumeFromTopicOperator(
       task_id=config.KAFKA_FILTER_ACCUMULATE_TID,
       topics=[config.LISTENER_TOPIC,],
       apply_function="filter_accumulate_process",
       apply_function_kwargs={'ti': '{{ti}}'},
   )
   ```
   
   but it seems like it would be a straightforward chang eto the 
ConsumerFromTopicOperator to just do a gather of the resulting values and 
return that list or empty.
   
   Looking at the AssetWatchers and Asset it feels like those might possibly be 
a good fit, though I'd need to investigate more.  I think I remember trying 
them early-on but had an issue with things not triggering when items were added 
to the Kafka queue.  Perhaps it's worth revisiting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to