Hi Nick,

sorry for not beeing so helpfull. I don't quite understand what _this_ would be in your email.

Is this the part in question?

/interface TopicPrioritizer {
  List<TopicPriority> prioritize(List<TopicPriority> topicPriorities);
}
//
//public void registerTopicPrioritizer(TopicPrioritizer topicPrioritizer);//
/
this is basically the same as

/|public|////|void|///|/subscribe(//java.util.List<TopicPriority <https://cwiki.apache.org/confluence/display/KAFKA/java.util.List%3CTopicPriority>//> topicPriorities);//
/
what I meant is litterally this interface:

https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html

ontop I would change choose to return a wrapper that would allow for pausing / resuming a topic partition (connect kinda style) and gets called immediately after again.

with that one could prevent OOM when update only gives messages that you don't need ATM.

Was that more helpfull?

Best Jan
|||


On 04.09.2018 15:06, n...@afshartous.com wrote:
@Jan - can you comment on whether or not this is what you had in mind ?
--
       Nick

On Aug 30, 2018, at 10:18 AM, n...@afshartous.com wrote:


Just clarifying that the API below would be in addition to the API specified in 
KIP-349

   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics>
   --
       Nick


On Aug 30, 2018, at 9:57 AM, n...@afshartous.com <mailto:n...@afshartous.com> 
wrote:

Here’s an attempt at incorporating a Samza MessageChooser type interface.
--
      Nick


New interface TopicPrioritizer allows one to create a method implementation 
that prioritizes topics. The topic priorities that were assigned with method 
KafkaConsumer.subscribe may or may not be used.  The input is the list of 
subscribed topics, and output is ordered list of topics. The ordering 
represents the priority that the TopicPrioritizer implementation has assigned.  
Calls to KafkaConsumer.poll will use the TopicPrioritizer to determine the 
priority of topics.

interface TopicPrioritizer {
   List<TopicPriority> prioritize(List<TopicPriority> topicPriorities);
}


New method KafkaConsumer.registerTopicPrioritizer is used to register the 
TopicPrioritizer

  public void registerTopicPrioritizer(TopicPrioritizer topicPrioritizer);










Reply via email to