If some users need the message content to do user-defined actions, we need
to ensure the user can't use the `peekMessage` to do things like ack
because the message are still in the blockingQueue, return just a
content-copy?

introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`

JiangHaiting <jianghait...@foxmail.com> 于2021年10月26日周二 下午2:20写道:

> I'm totally +1 for the feature to check if we can get
> message&nbsp;immediately from consumer, this is to say we have message
> locally.
>
>
> In my understanding, it's useful to implement some user-defined order to
> consume messages among different topics, in your case, the "distributed
> flow control ability".
> But in the past few years, I've met some users have defined the consume
> order of different topics by part of the message content, like
> some&nbsp;critical property value.&nbsp;
> In these situations, a `peek` method is more suitable.
>
>
> Further more, peek is not effectively equals to `consumer.receive(0,
> TimeUnit.SECONDS)`. As you will have to store the message somewhere else if
> you find that it's not the most priority message to process.
>
>
> One last thing, put the concept of "receiverQueue" in the api of consumer,
> seems a little bit strange, IMHO.
>
>
>
>
> ------------------&nbsp;Original&nbsp;------------------
> From:
>                                                   "dev"
>                                                                 <
> shoot...@gmail.com&gt;;
> Date:&nbsp;Tue, Oct 26, 2021 12:54 PM
> To:&nbsp;"dev"<dev@pulsar.apache.org&gt;;
>
> Subject:&nbsp;Re: [DISCUSSION] PIP-108: Add method to help user judge if
> consumer queue has message
>
>
>
> 3. Our solution implements the distributed flow control ability at client
> side, so we don't use the listener way.
> 2. Per customer per consumer in different tenants and namespace, and the
> `flow-control` need(Some of our customer's machines can't work on high
> traffic), So `Multi-topic` can't use.
> 1. We want to use this api to judge if there's messages to receive, like
> that pseudo code
> if (consumer.hasMessage()) {
>  .submit(() -&gt; {
>  consumer.pollMessagesAccordingToTheDistributedFlowControl()
>  })
> }
>
> Matteo Merli <matteo.me...@gmail.com&gt; 于2021年10月26日周二 下午12:15写道:
>
> &gt; I'm a bit hesitant about this because I think there are already at
> &gt; least 3 different ways to handle similar scenarios.
> &gt;
> &gt;&nbsp; 1. Using listener and avoid calling receive directly
> &gt;&nbsp; 2. Use multi-topic consumer, so there's a single `Consumer`
> instance
> &gt; exposed
> &gt;&nbsp; 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for
> message
> &gt;
> &gt;
> &gt; --
> &gt; Matteo Merli
> &gt; <matteo.me...@gmail.com&gt;
> &gt;
> &gt; On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <shoot...@gmail.com&gt;
> wrote:
> &gt; &gt;
> &gt; &gt; I think it's better to add the method to Consumer interface
> instead of
> &gt; let
> &gt; &gt; user casting it to `ConsumerBase`.
> &gt; &gt; `peek` is most complexly,&nbsp; for the reason, I can use the
> `peek` object to
> &gt; &gt; ack、negative ack, but when to remove from the `BlockingQueue`?
> &gt; &gt; IMHO, people use this api are just to judge if has the message,
> &gt; otherwise,
> &gt; &gt; they can just use `receive(0,TimeUnit)
> &gt; &gt;
> &gt; &gt; JiangHaiting <jianghait...@foxmail.com&gt; 于2021年10月26日周二
> 上午10:19写道:
> &gt; &gt;
> &gt; &gt; &gt; Can this method
> &gt; &gt; &gt;
> "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> &gt; do
> &gt; &gt; &gt; the trick? Though you have to change the type to
> ConsumerBase.
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt; And maybe `peek` is more suitable and useful to add to the
> Consumer
> &gt; &gt; &gt; interface?
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> ------------------&amp;nbsp;Original&amp;nbsp;------------------
> &gt; &gt; &gt; From:
> &gt; &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "dev"
> &gt; &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> <
> &gt; &gt; &gt; shoot...@gmail.com&amp;gt;;
> &gt; &gt; &gt; Date:&amp;nbsp;Mon, Oct 25, 2021 07:24 PM
> &gt; &gt; &gt; To:&amp;nbsp;"dev"<dev@pulsar.apache.org&amp;gt;;
> &gt; &gt; &gt;
> &gt; &gt; &gt; Subject:&amp;nbsp;[DISCUSSION] PIP-108: Add method to help
> user judge if
> &gt; &gt; &gt; consumer queue has message
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt; https://github.com/apache/pulsar/issues/12479
> &gt; &gt; &gt;
> &gt; &gt; &gt; --- Pasted here for quoting convenience ---
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## Motivation
> &gt; &gt; &gt; Currently, I have an application that manages ten thousand
> of
> &gt; consumers,
> &gt; &gt; &gt; and a logic to schedule consumers's receive. It would be
> helpful to
> &gt; know if
> &gt; &gt; &gt; one of the consumers have message to recive.
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## Goal
> &gt; &gt; &gt; To make `Consumer` can judge if there are unreceiving
> messages
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## API Changes
> &gt; &gt; &gt;
> &gt; &gt; &gt; Add `hasMessageInReceiverQueue` on the `Consumer` interface.
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## Implementation
> &gt; &gt; &gt;
> &gt; &gt; &gt; For `ZeroQueueConsumerImpl` return false, Others, judge the
> &gt; &gt; &gt; `receiveQueueSize` greater than zero.
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt; ## Reject Alternatives
> &gt; &gt; &gt;
> &gt; &gt; &gt; No alternatives yet.
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt;
> &gt; &gt; &gt; ---
> &gt; &gt; &gt; Thanks,
> &gt; &gt; &gt; Haiting Jiang (Github: Jason918)
> &gt;
>
>
>
>
> ---
> Thanks,
> Haiting Jiang (Github: Jason918)

Reply via email to