Hi Nick,
sorry for not beeing so helpfull. I don't quite understand what _this_
would be in your email.
Is this the part in question?
/interface TopicPrioritizer {
List<TopicPriority> prioritize(List<TopicPriority> topicPriorities);
}
//
//public void registerTopicPrioritizer(TopicPrioritizer topicPrioritizer);//
/
this is basically the same as
/|public|////|void|///|/subscribe(//java.util.List<TopicPriority
<https://cwiki.apache.org/confluence/display/KAFKA/java.util.List%3CTopicPriority>//>
topicPriorities);//
/
what I meant is litterally this interface:
https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
ontop I would change choose to return a wrapper that would allow for
pausing / resuming a topic partition (connect kinda style) and gets
called immediately after again.
with that one could prevent OOM when update only gives messages that you
don't need ATM.
Was that more helpfull?
Best Jan
|||
On 04.09.2018 15:06, n...@afshartous.com wrote:
@Jan - can you comment on whether or not this is what you had in mind ?
--
Nick
On Aug 30, 2018, at 10:18 AM, n...@afshartous.com wrote:
Just clarifying that the API below would be in addition to the API specified in
KIP-349
https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics>
--
Nick
On Aug 30, 2018, at 9:57 AM, n...@afshartous.com <mailto:n...@afshartous.com>
wrote:
Here’s an attempt at incorporating a Samza MessageChooser type interface.
--
Nick
New interface TopicPrioritizer allows one to create a method implementation
that prioritizes topics. The topic priorities that were assigned with method
KafkaConsumer.subscribe may or may not be used. The input is the list of
subscribed topics, and output is ordered list of topics. The ordering
represents the priority that the TopicPrioritizer implementation has assigned.
Calls to KafkaConsumer.poll will use the TopicPrioritizer to determine the
priority of topics.
interface TopicPrioritizer {
List<TopicPriority> prioritize(List<TopicPriority> topicPriorities);
}
New method KafkaConsumer.registerTopicPrioritizer is used to register the
TopicPrioritizer
public void registerTopicPrioritizer(TopicPrioritizer topicPrioritizer);