Some users want to use this api to judge if there's messages to receive,
like that pseudo code:
if (consumer.hasMessage()) {
.submit(() -> {
consumer.pollMessagesAccordingToTheDistributedFlowControl()
})
}
don't want to remove the message from queue.
PengHui Li <[email protected]> 于2021年10月27日周三 下午7:43写道:
> @ZhangJian He, as Matteo mentioned, Use `consumer.receive(0,
> TimeUnit.SECONDS)` can achieve the same purpose for checking if there are
> messages in the local cache.
>
> Thanks
> Penghui
>
> On Tue, Oct 26, 2021 at 2:35 PM ZhangJian He <[email protected]> wrote:
>
> > If some users need the message content to do user-defined actions, we
> need
> > to ensure the user can't use the `peekMessage` to do things like ack
> > because the message are still in the blockingQueue, return just a
> > content-copy?
> >
> > introduced `localBuffer` might be good ? `hasMessagesInLocalBuffer`
> >
> > JiangHaiting <[email protected]> 于2021年10月26日周二 下午2:20写道:
> >
> > > I'm totally +1 for the feature to check if we can get
> > > message immediately from consumer, this is to say we have message
> > > locally.
> > >
> > >
> > > In my understanding, it's useful to implement some user-defined order
> to
> > > consume messages among different topics, in your case, the "distributed
> > > flow control ability".
> > > But in the past few years, I've met some users have defined the consume
> > > order of different topics by part of the message content, like
> > > some critical property value.
> > > In these situations, a `peek` method is more suitable.
> > >
> > >
> > > Further more, peek is not effectively equals to `consumer.receive(0,
> > > TimeUnit.SECONDS)`. As you will have to store the message somewhere
> else
> > if
> > > you find that it's not the most priority message to process.
> > >
> > >
> > > One last thing, put the concept of "receiverQueue" in the api of
> > consumer,
> > > seems a little bit strange, IMHO.
> > >
> > >
> > >
> > >
> > > ------------------ Original ------------------
> > > From:
> > > "dev"
> > > <
> > > [email protected]>;
> > > Date: Tue, Oct 26, 2021 12:54 PM
> > > To: "dev"<[email protected]>;
> > >
> > > Subject: Re: [DISCUSSION] PIP-108: Add method to help user judge
> if
> > > consumer queue has message
> > >
> > >
> > >
> > > 3. Our solution implements the distributed flow control ability at
> client
> > > side, so we don't use the listener way.
> > > 2. Per customer per consumer in different tenants and namespace, and
> the
> > > `flow-control` need(Some of our customer's machines can't work on high
> > > traffic), So `Multi-topic` can't use.
> > > 1. We want to use this api to judge if there's messages to receive,
> like
> > > that pseudo code
> > > if (consumer.hasMessage()) {
> > > .submit(() -> {
> > > consumer.pollMessagesAccordingToTheDistributedFlowControl()
> > > })
> > > }
> > >
> > > Matteo Merli <[email protected]> 于2021年10月26日周二 下午12:15写道:
> > >
> > > > I'm a bit hesitant about this because I think there are already at
> > > > least 3 different ways to handle similar scenarios.
> > > >
> > > > 1. Using listener and avoid calling receive directly
> > > > 2. Use multi-topic consumer, so there's a single `Consumer`
> > > instance
> > > > exposed
> > > > 3. Use `consumer.receive(0, TimeUnit.SECONDS)` to probe for
> > > message
> > > >
> > > >
> > > > --
> > > > Matteo Merli
> > > > <[email protected]>
> > > >
> > > > On Mon, Oct 25, 2021 at 7:34 PM ZhangJian He <[email protected]
> > >
> > > wrote:
> > > > >
> > > > > I think it's better to add the method to Consumer interface
> > > instead of
> > > > let
> > > > > user casting it to `ConsumerBase`.
> > > > > `peek` is most complexly, for the reason, I can use the
> > > `peek` object to
> > > > > ack、negative ack, but when to remove from the
> `BlockingQueue`?
> > > > > IMHO, people use this api are just to judge if has the
> message,
> > > > otherwise,
> > > > > they can just use `receive(0,TimeUnit)
> > > > >
> > > > > JiangHaiting <[email protected]> 于2021年10月26日周二
> > > 上午10:19写道:
> > > > >
> > > > > > Can this method
> > > > > >
> > > "org.apache.pulsar.client.impl.ConsumerBase#getTotalIncomingMessages"
> > > > do
> > > > > > the trick? Though you have to change the type to
> > > ConsumerBase.
> > > > > >
> > > > > >
> > > > > > And maybe `peek` is more suitable and useful to add to
> the
> > > Consumer
> > > > > > interface?
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > ------------------&nbsp;Original&nbsp;------------------
> > > > > > From:
> > > > >
> > >
> >
> >
> > > "dev"
> > > > >
> > >
> >
> >
> > > <
> > > > > > [email protected]&gt;;
> > > > > > Date:&nbsp;Mon, Oct 25, 2021 07:24 PM
> > > > > > To:&nbsp;"dev"<[email protected]&gt;;
> > > > > >
> > > > > > Subject:&nbsp;[DISCUSSION] PIP-108: Add method to
> help
> > > user judge if
> > > > > > consumer queue has message
> > > > > >
> > > > > >
> > > > > >
> > > > > > https://github.com/apache/pulsar/issues/12479
> > > > > >
> > > > > > --- Pasted here for quoting convenience ---
> > > > > >
> > > > > > ## Motivation
> > > > > > Currently, I have an application that manages ten
> thousand
> > > of
> > > > consumers,
> > > > > > and a logic to schedule consumers's receive. It would be
> > > helpful to
> > > > know if
> > > > > > one of the consumers have message to recive.
> > > > > >
> > > > > > ## Goal
> > > > > > To make `Consumer` can judge if there are unreceiving
> > > messages
> > > > > >
> > > > > > ## API Changes
> > > > > >
> > > > > > Add `hasMessageInReceiverQueue` on the `Consumer`
> > interface.
> > > > > >
> > > > > > ## Implementation
> > > > > >
> > > > > > For `ZeroQueueConsumerImpl` return false, Others, judge
> > the
> > > > > > `receiveQueueSize` greater than zero.
> > > > > >
> > > > > >
> > > > > > ## Reject Alternatives
> > > > > >
> > > > > > No alternatives yet.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > ---
> > > > > > Thanks,
> > > > > > Haiting Jiang (Github: Jason918)
> > > >
> > >
> > >
> > >
> > >
> > > ---
> > > Thanks,
> > > Haiting Jiang (Github: Jason918)
> >
>