> but equally they might be
> surprised when closeAsync doesn't complete because the pending messages
> can't be cleared

That would still get controlled by send timeout, after that, the send
will fail and close should proceed.

--
Matteo Merli
<matteo.me...@gmail.com>

On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly
<jvanligh...@splunk.com.invalid> wrote:
>
> I can see both sides of the argument regarding whether to flush pending
> messages or not. But I think what is definitely in the contract is not to
> discard any callbacks causing user code to block forever. No matter what,
> we must always call the callbacks.
>
> Personally, I am in favour of a close operation not flushing pending
> messages (and I define pending here as any message that has a callback).
> The reason is that if we wait for all pending messages to be sent then we
> now face a number of edge cases that could cause the close operation to
> take a very long time to complete. What if the user code really just needs
> to close the producer right now? If we amend the documentation to make it
> clear that close does not flush pending messages then the user is now able
> to explicitly craft the behaviour they need. If they want all messages
> flushed first then chaing flushAsync->closeAsync else just closeAsync.
>
> Unfortunately I think user expectation, regardless of the current javadoc,
> is that close would flush everything and in an ideal world it would. We
> have the Principle of Least Surprise but we also have Safe By Default.
> Users might be surprised that when calling closeAsync, a load of their
> pending messages get ConnectionAlreadyClosed, but equally they might be
> surprised when closeAsync doesn't complete because the pending messages
> can't be cleared. Failing pending messages is the safer option. User code
> must handle failure responses and cannot claim data loss with a
> non-positive response. But if they can't close a producer, that could
> result in a wider impact on their system, not to mention more issues
> created in GitHub.
>
> Jack
>
> On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefranc...@gmail.com> wrote:
>
> > [ External sender. Exercise caution. ]
> >
> > >I don't think that implementing `closeAsync` with graceful shutdown
> > logic implies a guarantee of message publishing. Rather, it guarantees
> > that failures will be the result of a real exception or a timeout.
> >
> > I think that's beside the point.     There is no definition of "real"
> > exceptions.   At that point the app is publishing on a best effort basis,
> > and there are no guarantees anywhere in client or server.
> >
> > There is no concept  of  "maybe published". OR
> > "published-if-no_real_errors".  What does that even mean?  That is only a
> > can of worms which is going to add to developer confusion and lead to
> > Pulsar users finding in the worst possible way that something got lost
> > because it never got published.  It's a poor experience when you find it.
> > I have a real life experience where a user used async APIs (in a lambda),
> > which hummed along fine.  One day much later, the cloud had a hitch, and
> > they discovered a message was  not delivered.
> >
> > I am more concerned about developers discovering at the worst possible time
> > that  ""published-if-no_real_errors"  is a concept.
> >
> > My suggestion is to make this simple for developers.
> >
> > ----The sync/async nature of the close() [ or any other API, for that
> > matter ]  is completely orthogonal to the API semantics, and is just a
> > programmatic choice to deal with  how resources are managed within the
> > program. That's not material here.---
> >
> > A close() is an action that is shutting down the producer right now, not
> > even waiting for any acks of inflight messages. A willingness to lose
> > pending/inflight messages is explicit in that call.  The producer will  not
> > be around to deal with errors or to retry failed messages once close() is
> > invoked.
> >
> > On the contrary, if the client does not want to deal with message loss,
> > then flush(), stick around to gather the acks, deal with errors and retries
> > etc and then do close() . Then close() will be just a resource management
> > action on the client.
> >
> > So update the documentation to reflect that. ---> if close() is called on a
> > producer with messages pending acks, those messages are left indoubt. Avoid
> > all mention of flushes, best effort etc.  Users must buy into  uncertainty,
> > without any qualifications.
> >
> > I would at all costs avoid using the term "graceful" anywhere.  That word
> > has specific semantics associated with it in the systems/storage domain ,
> > and what is being proposed here is nothing like that.
> >
> > -j
> >
> >
> > On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu <y...@streamnative.io.invalid>
> > wrote:
> >
> > > It’s a good point that `ProducerImpl#failPendingBatchMessages` treats
> > > messages in batch container also as pending messages.
> > >
> > > I agree with your definition of "graceful close”. It’s more like a “at
> > > most once”
> > > semantics, like the original JavaDoc said
> > >
> > > > pending writes will not be retried
> > >
> > > Thanks,
> > > Yunze
> > >
> > > > 2021年9月29日 上午5:24,Michael Marshall <mikemars...@gmail.com> 写道:
> > > >
> > > > Thanks for bringing this thread to the mailing list, Yunze.
> > > >
> > > > I think the right change is to update the `closeAsync` method to first
> > > > flush `batchMessageContainer` and to then asynchronously wait for the
> > > > `pendingMessages` queue to drain. We could add a new timeout or rely
> > > > on the already implemented `sendTimeout` config to put an upper time
> > > > limit on `closeAsync`. My reasoning as well as responses to Joe and
> > > > Yunze follow:
> > > >
> > > >> we need to define the behavior for how to process `pendingMessages`
> > > >> and `batchMessageContainer` when producer call `closeAsync`.
> > > >
> > > > Yes, this is exactly the clarification required, and I agree that the
> > > > Javadoc is ambiguous and that the implementation doesn't align with
> > > > the Javadoc.
> > > >
> > > > If we view the Javadoc as binding, then the fundamental question is
> > > > what messages are "pending". The `pendingMessages` seem pretty easy to
> > > > classify as "pending" given that they are already in flight on the
> > > > network.
> > > >
> > > > I also consider `batchMessageContainer` to be "pending" because a
> > > > client application already has callbacks for the messages in this
> > > > container. These callbacks are expected to complete when the batch
> > > > message delivery completes. Since the client application already has a
> > > > reference to a callback, it isn't a problem that the producer
> > > > implementation initiates the flush logic. (Note that the current
> > > > design fails the `pendingMessages` but does not fail the
> > > > `batchMessageContainer` when `closeAsync` is called, so the callbacks
> > > > for that container are currently left incomplete forever if the client
> > > > is closed with an unsent batch. We will need to address this design in
> > > > the work that comes from this discussion.)
> > > >
> > > > Further, the `ProducerImpl#failPendingMessages` method includes logic
> > > > to call `ProducerImpl#failPendingBatchMessages`, which implies that
> > > > these batched, but not sent, messages have been historically
> > > > considered "pending".
> > > >
> > > > If we view the Javadoc as non-binding, I think my guiding influence
> > > > for the new design would be that the `closeAsync` method should result
> > > > in a "graceful" shutdown of the client.
> > > >
> > > >> What exactly does "graceful" convey here?
> > > >
> > > > This is a great question, and will likely drive the design here. I
> > > > view graceful to mean that the producer attempts to avoid artificial
> > > > failures. That means trying to drain the queue instead of
> > > > automatically failing all of the queue's callbacks. The tradeoff is
> > > > that closing the producer takes longer. This reasoning would justify
> > > > my claim that we should first flush the `batchMessageContainer`
> > > > instead of failing the batch without any effort at delivery, as that
> > > > would be artificial.
> > > >
> > > >> There is no guarantee that either case will ensure the message
> > > >> is published.
> > > >
> > > > I don't think that implementing `closeAsync` with graceful shutdown
> > > > logic implies a guarantee of message publishing. Rather, it guarantees
> > > > that failures will be the result of a real exception or a timeout.
> > > > Since calling `closeAsync` prevents additional messages from
> > > > delivering, users leveraging this functionality might be operating
> > > > with "at most once" delivery semantics where they'd prefer to deliver
> > > > the messages if possible, but they aren't going to delay application
> > > > shutdown indefinitely to deliver its last messages. If users need
> > > > stronger guarantees about whether their messages are delivered, they
> > > > are probably already using the flush methods to ensure that the
> > > > producer's queues are empty before calling `closeAsync`.
> > > >
> > > > I also agree that in all of these cases, we're assuming that users are
> > > > capturing references to the async callbacks and then making business
> > > > logic decisions based on the results of those callbacks.
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu <y...@streamnative.io.invalid
> > >
> > > wrote:
> > > >>
> > > >> I can’t agree more, just like what I’ve said in PR 12195:
> > > >>
> > > >>> At any case, when you choose `sendAsync`, you should always make use
> > > of the returned future to confirm the result of all messages. In Kafka,
> > > it's the send callback.
> > > >>
> > > >> But I found many users are confused about the current behavior,
> > > especially
> > > >> those are used to Kafka’s close semantics. They might expect a simple
> > > try
> > > >> to flush existing messages, which works at a simple test environment,
> > > even
> > > >> there's no guarantee for exception cases.
> > > >>
> > > >>
> > > >>
> > > >>> 2021年9月28日 下午4:37,Joe F <joefranc...@gmail.com> 写道:
> > > >>>
> > > >>> Clients should not depend on any of this behaviour, since the broker
> > > is at
> > > >>> the other end of an unreliable  network connection. The
> > > >>> semantic differences are kind of meaningless from a usability point,
> > > since
> > > >>> flushing on close =/= published.  What exactly does "graceful" convey
> > > >>> here?  Flush the  buffer on the client end and hope it makes it to
> > the
> > > >>> server.
> > > >>>
> > > >>> Is there a  difference whether you flush(or process) pending messages
> > > or
> > > >>> not? There is no guarantee that either case will ensure the message
> > is
> > > >>> published.
> > > >>>
> > > >>> The only way to ensure that messages are published is to wait for the
> > > ack.
> > > >>> The correct model should be to wait for return on the blocking API,
> > or
> > > wait
> > > >>> for future completion of the async API, then handle any publish
> > errors
> > > and
> > > >>> then only close the producer.
> > > >>>
> > > >>>
> > > >>> On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu
> > <y...@streamnative.io.invalid
> > > >
> > > >>> wrote:
> > > >>>
> > > >>>> Hi all,
> > > >>>>
> > > >>>> Recently I found a PR (https://github.com/apache/pulsar/pull/12195
> > <
> > > >>>> https://github.com/apache/pulsar/pull/12195>) that
> > > >>>> modifies the existing semantics of producer close. There're already
> > > some
> > > >>>> communications in this PR, but I think it's better to start a
> > > discussion
> > > >>>> here
> > > >>>> to let more know.
> > > >>>>
> > > >>>> The existing implementation of producer close is:
> > > >>>> 1. Cancel all timers, including send and batch container
> > > >>>> (`batchMessageContainer`).
> > > >>>> 2. Complete all pending messages (`pendingMessages`) with
> > > >>>> `AlreadyCloseException`.
> > > >>>>
> > > >>>> See `ProducerImpl#closeAsync` for details.
> > > >>>>
> > > >>>> But the JavaDoc of `Producer#closeAsync` is:
> > > >>>>
> > > >>>>> No more writes will be accepted from this producer. Waits until all
> > > >>>> pending write request are persisted.
> > > >>>>
> > > >>>> Anyway, the document and implementation are inconsistent. But
> > > specifically,
> > > >>>> we need to define the behavior for how to process `pendingMessages`
> > > and
> > > >>>> `batchMessageContainer` when producer call `closeAsync`.
> > > >>>>
> > > >>>> 1. batchMessageContainer: contains the buffered single messages
> > > >>>> (`Message<T>`).
> > > >>>> 2. pendingMessages: all inflight messages (`OpSendMsg`) in network.
> > > >>>>
> > > >>>> IMO, from the JavaDoc, only `pendingMessages` should be processed
> > and
> > > the
> > > >>>> messages in `batchMessageContainer` should be discarded.
> > > >>>>
> > > >>>> Since other clients might have already implemented the similar
> > > semantics of
> > > >>>> Java clients. If we changed the semantics now, the behaviors among
> > > >>>> different
> > > >>>> clients might be inconsistent.
> > > >>>>
> > > >>>> Should we add a configuration to support graceful close to follow
> > the
> > > >>>> docs? Or
> > > >>>> just change the current behavior?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Yunze
> > > >>
> > >
> > >
> >

Reply via email to