Thanks for your analysis, Yunze. I identified above that the messages
in the batch container were not getting completed correctly, so I put
together a PR to fix the problematic behavior. This PR will be valid
regardless of our decision to add flush logic to the close method.

Here is the PR: https://github.com/apache/pulsar/pull/12259.

Thanks,
Michael


On Thu, Sep 30, 2021 at 10:27 PM Yunze Xu <y...@streamnative.io.invalid> wrote:
>
> You're right that before a CommandCloseProducer request was completed, the
> pending messages should be persisted before this close request was completed
> in normal cases. It’s guaranteed by broker side.
>
> Then there’s no inconsistency between the implementation and JavaDocs now.
>
> The key point is whether should we flush the messages in batch container. I 
> prefer
> keeping the current semantics. But I found the messages in batch container
> never failed. We need to fix the problem. For example, here’s my unit test:
>
> ```java
>     @Test
>     public void test() throws Exception {
>         final Producer<String> producer = 
> pulsarClient.newProducer(Schema.STRING)
>                 .topic("topic")
>                 .batchingMaxMessages(10000)
>                 .batchingMaxBytes(10000000)
>                 .batchingMaxPublishDelay(100, TimeUnit.SECONDS)
>                 .sendTimeout(1, TimeUnit.SECONDS)
>                 .create();
>         final CountDownLatch latch = new CountDownLatch(10);
>         final Map<Integer, Throwable> throwableMap = new 
> ConcurrentHashMap<>();
>         for (int i = 0; i < 10; i++) {
>             final Integer index = i;
>             producer.sendAsync("msg-" + i).whenComplete((id, e) -> {
>                 if (e != null) {
>                     throwableMap.put(index, e);
>                 }
>                 latch.countDown();
>             });
>         }
>         producer.close();
>         latch.await();
>         throwableMap.forEach((i, e) -> {
>             log.info("Message {} failed with {}", i, e);
>         });
>     }
> ```
>
> The test would block forever.
>
> > 2021年10月1日 上午4:22,Michael Marshall <mikemars...@gmail.com> 写道:
> >
> > Following up here. I am pretty sure part of this conversation has been
> > based on a misunderstanding of the code. From what I can tell, the
> > behavior for `Producer#closeAsync` in the client (mostly) aligns with
> > the current Javadocs.
> >
> >> The existing implementation of producer close is:
> >> 1. Cancel all timers, including send and batch container 
> >> (`batchMessageContainer`).
> >> 2. Complete all pending messages (`pendingMessages`) with 
> >> `AlreadyCloseException`.
> >
> > I agree with 1, but I think 2 is only partially correct. The client
> > will only exceptionally complete pending messages if the connection is
> > null or not ready, or after the broker responds to the
> > `CLOSE_PRODUCER` command or a timeout passes. This behavior seems
> > right to me.
> >
> > Here is the relevant code:
> > https://github.com/apache/pulsar/blob/9d309145f342bc416b8b4663125e1216903a3e83/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L875-L909
> >
> > The only remaining question: does close imply flush? If not, we'll
> > need update the logic to fail the messages contained in the
> > `batchMessageContainer` during close. Otherwise, we'll update the
> > logic to call flush before sending the `CLOSE_PRODUCER` command and
> > everything should work as documented. In both cases, we should update
> > the Javadocs to make the behavior clearer.
> >
> > Thanks,
> > Michael
> >
> >
> >
> > On Thu, Sep 30, 2021 at 11:55 AM Michael Marshall <mikemars...@gmail.com> 
> > wrote:
> >>
> >> I have two questions:
> >>
> >> 1. Does close imply immediate shutdown?
> >> 2. Does close imply flush?
> >>
> >> There is not yet consensus on 1, and 2 is only relevant if 1's answer is 
> >> "no".
> >>
> >> Thus far, the conversation has centered on the `Producer#close`
> >> method. I'd like to broaden the discussion to include some other
> >> methods from the `PulsarClient` interface: `shutdown` and `close`.
> >>
> >> The Javadoc for `PulsarClient#shutdown` describes the "shutdown
> >> immediately" behavior. It says:
> >>
> >>> Release all the resources and close all the producer, consumer and
> >>> reader instances without waiting for ongoing operations to complete.
> >>
> >> The Javadoc for `PulsarClient#close` describes waiting for
> >> pending/in-flight messages to complete before returning. It says:
> >>
> >>> This operation will trigger a graceful close of all producer, consumer
> >>> and reader instances that this client has currently active. That implies
> >>> that close will block and wait until all pending producer send requests
> >>> are persisted.
> >>
> >> One question that follows from the above: why does the `Producer` not
> >> have a `shutdown` method? I think this is because the "immediate
> >> shutdown" behavior is not necessary for a single producer. When
> >> immediate shutdown semantics are required, the `PulsarClient#shutdown`
> >> method is sufficient because it is used when shutting down the whole
> >> application. (If this is not correct, perhaps we should add a
> >> `shutdown` method to the producer?)
> >>
> >> Since immediate shutdown semantics are already available via our
> >> client API, I posit that the answer to question 1 is no, `close` does
> >> not imply immediate shutdown. At the very least, `close` in the Pulsar
> >> Client has not historically implied immediate shutdown.
> >>
> >> Additionally, it is relevant to point out that the `Producer#close`
> >> method is already sending a `CLOSE_PRODUCER` command and waiting on a
> >> response back from the broker. The broker's producer close method has
> >> the following Javadoc:
> >>
> >>> Close the producer immediately if: a. the connection is dropped
> >>> b. it's a graceful close and no pending publish acks are left
> >>> else wait for pending publish acks
> >>
> >> Since we're already waiting on the broker to respond to the producer's
> >> `CLOSE_PRODUCER` request, I see no reason to fail pending/in-flight
> >> messages immediately, especially because we should get a response back
> >> for those messages before getting the `SUCCESS` response from the
> >> broker since the responses will come on the same TCP connection. We
> >> could even simplify the close logic so that when the `CLOSE_PRODUCER`
> >> request completes (either successfully or because of a failure), we
> >> fail all remaining pending message futures.
> >>
> >> Ultimately, we need to decide whether to update the implementation to
> >> match the existing Javadocs, or to update the Javadocs to indicate
> >> that `close` means an immediate shutdown, which includes failing all
> >> outstanding message futures immediately. My vote is to make the
> >> implementation align with the Javadocs.
> >>
> >> Regarding question 2, I prefer that `close` implies flush because it
> >> is only a single (batched) message being flushed. If we do flush this
> >> message, we'll need to make sure that the message is sent before
> >> the `CLOSE_PRODUCER` command is sent.
> >>
> >> Thanks,
> >> Michael
> >>
> >>
> >> On Wed, Sep 29, 2021 at 7:04 AM Enrico Olivelli <eolive...@gmail.com> 
> >> wrote:
> >>>
> >>> I agree that we must ensure that every pending callback must be completed
> >>> eventually (timeout or another error is not a problem),
> >>> because we cannot let the client application hang forever.
> >>> I believe that the application can perform a flush() explicitly and also
> >>> wait for every callback to be executed if that is the requirement.
> >>>
> >>> Usually you call close() when:
> >>> 1. you have a serious problem: you already know that there is a hard 
> >>> error,
> >>> and you want to close the Producer or the Application and possibly start a
> >>> new one to recover
> >>> 2. you are shutting down your application or component: you have control
> >>> over the callbacks, so you can wait for them to complete
> >>>
> >>> So case 2. can be covered by the application. We have to support case 1:
> >>> fail fast and close (no need for flush()) .
> >>>
> >>> In my experience trying to implement "graceful stops" adds only complexity
> >>> and false hopes to the users.
> >>>
> >>> Enrico
> >>>
> >>>
> >>>
> >>> Il giorno mer 29 set 2021 alle ore 13:58 Nirvana 
> >>> <1572139...@qq.com.invalid>
> >>> ha scritto:
> >>>
> >>>> I agree to try to ensure ”at most once“ when closing。
> >>>>
> >>>>
> >>>> &gt; That would still get controlled by send timeout, after that, the 
> >>>> send
> >>>> will fail and close should proceed.
> >>>> This sounds more in line with “at most once”。
> >>>>
> >>>>
> >>>> ------------------&nbsp;原始邮件&nbsp;------------------
> >>>> 发件人:
> >>>>                                                  "dev"
> >>>>                                                                <
> >>>> matteo.me...@gmail.com&gt;;
> >>>> 发送时间:&nbsp;2021年9月29日(星期三) 下午3:55
> >>>> 收件人:&nbsp;"Dev"<dev@pulsar.apache.org&gt;;
> >>>>
> >>>> 主题:&nbsp;Re: Correct semantics of producer close
> >>>>
> >>>>
> >>>>
> >>>> &gt; but equally they might be
> >>>> &gt; surprised when closeAsync doesn't complete because the pending
> >>>> messages
> >>>> &gt; can't be cleared
> >>>>
> >>>> That would still get controlled by send timeout, after that, the send
> >>>> will fail and close should proceed.
> >>>>
> >>>> --
> >>>> Matteo Merli
> >>>> <matteo.me...@gmail.com&gt;
> >>>>
> >>>> On Wed, Sep 29, 2021 at 12:52 AM Jack Vanlightly
> >>>> <jvanligh...@splunk.com.invalid&gt; wrote:
> >>>> &gt;
> >>>> &gt; I can see both sides of the argument regarding whether to flush
> >>>> pending
> >>>> &gt; messages or not. But I think what is definitely in the contract is
> >>>> not to
> >>>> &gt; discard any callbacks causing user code to block forever. No matter
> >>>> what,
> >>>> &gt; we must always call the callbacks.
> >>>> &gt;
> >>>> &gt; Personally, I am in favour of a close operation not flushing pending
> >>>> &gt; messages (and I define pending here as any message that has a
> >>>> callback).
> >>>> &gt; The reason is that if we wait for all pending messages to be sent
> >>>> then we
> >>>> &gt; now face a number of edge cases that could cause the close operation
> >>>> to
> >>>> &gt; take a very long time to complete. What if the user code really just
> >>>> needs
> >>>> &gt; to close the producer right now? If we amend the documentation to
> >>>> make it
> >>>> &gt; clear that close does not flush pending messages then the user is 
> >>>> now
> >>>> able
> >>>> &gt; to explicitly craft the behaviour they need. If they want all 
> >>>> messages
> >>>> &gt; flushed first then chaing flushAsync-&gt;closeAsync else just
> >>>> closeAsync.
> >>>> &gt;
> >>>> &gt; Unfortunately I think user expectation, regardless of the current
> >>>> javadoc,
> >>>> &gt; is that close would flush everything and in an ideal world it would.
> >>>> We
> >>>> &gt; have the Principle of Least Surprise but we also have Safe By 
> >>>> Default.
> >>>> &gt; Users might be surprised that when calling closeAsync, a load of 
> >>>> their
> >>>> &gt; pending messages get ConnectionAlreadyClosed, but equally they might
> >>>> be
> >>>> &gt; surprised when closeAsync doesn't complete because the pending
> >>>> messages
> >>>> &gt; can't be cleared. Failing pending messages is the safer option. User
> >>>> code
> >>>> &gt; must handle failure responses and cannot claim data loss with a
> >>>> &gt; non-positive response. But if they can't close a producer, that 
> >>>> could
> >>>> &gt; result in a wider impact on their system, not to mention more issues
> >>>> &gt; created in GitHub.
> >>>> &gt;
> >>>> &gt; Jack
> >>>> &gt;
> >>>> &gt; On Wed, Sep 29, 2021 at 7:05 AM Joe F <joefranc...@gmail.com&gt;
> >>>> wrote:
> >>>> &gt;
> >>>> &gt; &gt; [ External sender. Exercise caution. ]
> >>>> &gt; &gt;
> >>>> &gt; &gt; &gt;I don't think that implementing `closeAsync` with graceful
> >>>> shutdown
> >>>> &gt; &gt; logic implies a guarantee of message publishing. Rather, it
> >>>> guarantees
> >>>> &gt; &gt; that failures will be the result of a real exception or a
> >>>> timeout.
> >>>> &gt; &gt;
> >>>> &gt; &gt; I think that's beside the point.&nbsp;&nbsp;&nbsp;&nbsp; There
> >>>> is no definition of "real"
> >>>> &gt; &gt; exceptions.&nbsp;&nbsp; At that point the app is publishing on 
> >>>> a
> >>>> best effort basis,
> >>>> &gt; &gt; and there are no guarantees anywhere in client or server.
> >>>> &gt; &gt;
> >>>> &gt; &gt; There is no concept&nbsp; of&nbsp; "maybe published". OR
> >>>> &gt; &gt; "published-if-no_real_errors".&nbsp; What does that even
> >>>> mean?&nbsp; That is only a
> >>>> &gt; &gt; can of worms which is going to add to developer confusion and
> >>>> lead to
> >>>> &gt; &gt; Pulsar users finding in the worst possible way that something
> >>>> got lost
> >>>> &gt; &gt; because it never got published.&nbsp; It's a poor experience
> >>>> when you find it.
> >>>> &gt; &gt; I have a real life experience where a user used async APIs (in 
> >>>> a
> >>>> lambda),
> >>>> &gt; &gt; which hummed along fine.&nbsp; One day much later, the cloud 
> >>>> had
> >>>> a hitch, and
> >>>> &gt; &gt; they discovered a message was&nbsp; not delivered.
> >>>> &gt; &gt;
> >>>> &gt; &gt; I am more concerned about developers discovering at the worst
> >>>> possible time
> >>>> &gt; &gt; that&nbsp; ""published-if-no_real_errors"&nbsp; is a concept.
> >>>> &gt; &gt;
> >>>> &gt; &gt; My suggestion is to make this simple for developers.
> >>>> &gt; &gt;
> >>>> &gt; &gt; ----The sync/async nature of the close() [ or any other API, 
> >>>> for
> >>>> that
> >>>> &gt; &gt; matter ]&nbsp; is completely orthogonal to the API semantics,
> >>>> and is just a
> >>>> &gt; &gt; programmatic choice to deal with&nbsp; how resources are 
> >>>> managed
> >>>> within the
> >>>> &gt; &gt; program. That's not material here.---
> >>>> &gt; &gt;
> >>>> &gt; &gt; A close() is an action that is shutting down the producer right
> >>>> now, not
> >>>> &gt; &gt; even waiting for any acks of inflight messages. A willingness 
> >>>> to
> >>>> lose
> >>>> &gt; &gt; pending/inflight messages is explicit in that call.&nbsp; The
> >>>> producer will&nbsp; not
> >>>> &gt; &gt; be around to deal with errors or to retry failed messages once
> >>>> close() is
> >>>> &gt; &gt; invoked.
> >>>> &gt; &gt;
> >>>> &gt; &gt; On the contrary, if the client does not want to deal with
> >>>> message loss,
> >>>> &gt; &gt; then flush(), stick around to gather the acks, deal with errors
> >>>> and retries
> >>>> &gt; &gt; etc and then do close() . Then close() will be just a resource
> >>>> management
> >>>> &gt; &gt; action on the client.
> >>>> &gt; &gt;
> >>>> &gt; &gt; So update the documentation to reflect that. ---&gt; if close()
> >>>> is called on a
> >>>> &gt; &gt; producer with messages pending acks, those messages are left
> >>>> indoubt. Avoid
> >>>> &gt; &gt; all mention of flushes, best effort etc.&nbsp; Users must buy
> >>>> into&nbsp; uncertainty,
> >>>> &gt; &gt; without any qualifications.
> >>>> &gt; &gt;
> >>>> &gt; &gt; I would at all costs avoid using the term "graceful"
> >>>> anywhere.&nbsp; That word
> >>>> &gt; &gt; has specific semantics associated with it in the 
> >>>> systems/storage
> >>>> domain ,
> >>>> &gt; &gt; and what is being proposed here is nothing like that.
> >>>> &gt; &gt;
> >>>> &gt; &gt; -j
> >>>> &gt; &gt;
> >>>> &gt; &gt;
> >>>> &gt; &gt; On Tue, Sep 28, 2021 at 7:05 PM Yunze Xu
> >>>> <y...@streamnative.io.invalid&gt;
> >>>> &gt; &gt; wrote:
> >>>> &gt; &gt;
> >>>> &gt; &gt; &gt; It’s a good point that
> >>>> `ProducerImpl#failPendingBatchMessages` treats
> >>>> &gt; &gt; &gt; messages in batch container also as pending messages.
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; I agree with your definition of "graceful close”. It’s 
> >>>> more
> >>>> like a “at
> >>>> &gt; &gt; &gt; most once”
> >>>> &gt; &gt; &gt; semantics, like the original JavaDoc said
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; pending writes will not be retried
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; Thanks,
> >>>> &gt; &gt; &gt; Yunze
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; 2021年9月29日 上午5:24,Michael Marshall <
> >>>> mikemars...@gmail.com&gt; 写道:
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; Thanks for bringing this thread to the mailing list,
> >>>> Yunze.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; I think the right change is to update the 
> >>>> `closeAsync`
> >>>> method to first
> >>>> &gt; &gt; &gt; &gt; flush `batchMessageContainer` and to then
> >>>> asynchronously wait for the
> >>>> &gt; &gt; &gt; &gt; `pendingMessages` queue to drain. We could add a new
> >>>> timeout or rely
> >>>> &gt; &gt; &gt; &gt; on the already implemented `sendTimeout` config to 
> >>>> put
> >>>> an upper time
> >>>> &gt; &gt; &gt; &gt; limit on `closeAsync`. My reasoning as well as
> >>>> responses to Joe and
> >>>> &gt; &gt; &gt; &gt; Yunze follow:
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt;&gt; we need to define the behavior for how to process
> >>>> `pendingMessages`
> >>>> &gt; &gt; &gt; &gt;&gt; and `batchMessageContainer` when producer call
> >>>> `closeAsync`.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; Yes, this is exactly the clarification required, and 
> >>>> I
> >>>> agree that the
> >>>> &gt; &gt; &gt; &gt; Javadoc is ambiguous and that the implementation
> >>>> doesn't align with
> >>>> &gt; &gt; &gt; &gt; the Javadoc.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; If we view the Javadoc as binding, then the
> >>>> fundamental question is
> >>>> &gt; &gt; &gt; &gt; what messages are "pending". The `pendingMessages`
> >>>> seem pretty easy to
> >>>> &gt; &gt; &gt; &gt; classify as "pending" given that they are already in
> >>>> flight on the
> >>>> &gt; &gt; &gt; &gt; network.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; I also consider `batchMessageContainer` to be
> >>>> "pending" because a
> >>>> &gt; &gt; &gt; &gt; client application already has callbacks for the
> >>>> messages in this
> >>>> &gt; &gt; &gt; &gt; container. These callbacks are expected to complete
> >>>> when the batch
> >>>> &gt; &gt; &gt; &gt; message delivery completes. Since the client
> >>>> application already has a
> >>>> &gt; &gt; &gt; &gt; reference to a callback, it isn't a problem that the
> >>>> producer
> >>>> &gt; &gt; &gt; &gt; implementation initiates the flush logic. (Note that
> >>>> the current
> >>>> &gt; &gt; &gt; &gt; design fails the `pendingMessages` but does not fail
> >>>> the
> >>>> &gt; &gt; &gt; &gt; `batchMessageContainer` when `closeAsync` is called,
> >>>> so the callbacks
> >>>> &gt; &gt; &gt; &gt; for that container are currently left incomplete
> >>>> forever if the client
> >>>> &gt; &gt; &gt; &gt; is closed with an unsent batch. We will need to
> >>>> address this design in
> >>>> &gt; &gt; &gt; &gt; the work that comes from this discussion.)
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; Further, the `ProducerImpl#failPendingMessages` 
> >>>> method
> >>>> includes logic
> >>>> &gt; &gt; &gt; &gt; to call `ProducerImpl#failPendingBatchMessages`, 
> >>>> which
> >>>> implies that
> >>>> &gt; &gt; &gt; &gt; these batched, but not sent, messages have been
> >>>> historically
> >>>> &gt; &gt; &gt; &gt; considered "pending".
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; If we view the Javadoc as non-binding, I think my
> >>>> guiding influence
> >>>> &gt; &gt; &gt; &gt; for the new design would be that the `closeAsync`
> >>>> method should result
> >>>> &gt; &gt; &gt; &gt; in a "graceful" shutdown of the client.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt;&gt; What exactly does "graceful" convey here?
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; This is a great question, and will likely drive the
> >>>> design here. I
> >>>> &gt; &gt; &gt; &gt; view graceful to mean that the producer attempts to
> >>>> avoid artificial
> >>>> &gt; &gt; &gt; &gt; failures. That means trying to drain the queue 
> >>>> instead
> >>>> of
> >>>> &gt; &gt; &gt; &gt; automatically failing all of the queue's callbacks.
> >>>> The tradeoff is
> >>>> &gt; &gt; &gt; &gt; that closing the producer takes longer. This 
> >>>> reasoning
> >>>> would justify
> >>>> &gt; &gt; &gt; &gt; my claim that we should first flush the
> >>>> `batchMessageContainer`
> >>>> &gt; &gt; &gt; &gt; instead of failing the batch without any effort at
> >>>> delivery, as that
> >>>> &gt; &gt; &gt; &gt; would be artificial.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt;&gt; There is no guarantee that either case will 
> >>>> ensure
> >>>> the message
> >>>> &gt; &gt; &gt; &gt;&gt; is published.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; I don't think that implementing `closeAsync` with
> >>>> graceful shutdown
> >>>> &gt; &gt; &gt; &gt; logic implies a guarantee of message publishing.
> >>>> Rather, it guarantees
> >>>> &gt; &gt; &gt; &gt; that failures will be the result of a real exception
> >>>> or a timeout.
> >>>> &gt; &gt; &gt; &gt; Since calling `closeAsync` prevents additional
> >>>> messages from
> >>>> &gt; &gt; &gt; &gt; delivering, users leveraging this functionality might
> >>>> be operating
> >>>> &gt; &gt; &gt; &gt; with "at most once" delivery semantics where they'd
> >>>> prefer to deliver
> >>>> &gt; &gt; &gt; &gt; the messages if possible, but they aren't going to
> >>>> delay application
> >>>> &gt; &gt; &gt; &gt; shutdown indefinitely to deliver its last messages. 
> >>>> If
> >>>> users need
> >>>> &gt; &gt; &gt; &gt; stronger guarantees about whether their messages are
> >>>> delivered, they
> >>>> &gt; &gt; &gt; &gt; are probably already using the flush methods to 
> >>>> ensure
> >>>> that the
> >>>> &gt; &gt; &gt; &gt; producer's queues are empty before calling
> >>>> `closeAsync`.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; I also agree that in all of these cases, we're
> >>>> assuming that users are
> >>>> &gt; &gt; &gt; &gt; capturing references to the async callbacks and then
> >>>> making business
> >>>> &gt; &gt; &gt; &gt; logic decisions based on the results of those
> >>>> callbacks.
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; Thanks,
> >>>> &gt; &gt; &gt; &gt; Michael
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt; On Tue, Sep 28, 2021 at 4:58 AM Yunze Xu
> >>>> <y...@streamnative.io.invalid
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; wrote:
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt; I can’t agree more, just like what I’ve said in 
> >>>> PR
> >>>> 12195:
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; At any case, when you choose `sendAsync`, you
> >>>> should always make use
> >>>> &gt; &gt; &gt; of the returned future to confirm the result of all
> >>>> messages. In Kafka,
> >>>> &gt; &gt; &gt; it's the send callback.
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt; But I found many users are confused about the
> >>>> current behavior,
> >>>> &gt; &gt; &gt; especially
> >>>> &gt; &gt; &gt; &gt;&gt; those are used to Kafka’s close semantics. They
> >>>> might expect a simple
> >>>> &gt; &gt; &gt; try
> >>>> &gt; &gt; &gt; &gt;&gt; to flush existing messages, which works at a
> >>>> simple test environment,
> >>>> &gt; &gt; &gt; even
> >>>> &gt; &gt; &gt; &gt;&gt; there's no guarantee for exception cases.
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; 2021年9月28日 下午4:37,Joe F 
> >>>> <joefranc...@gmail.com&gt;
> >>>> 写道:
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; Clients should not depend on any of this
> >>>> behaviour, since the broker
> >>>> &gt; &gt; &gt; is at
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; the other end of an unreliable&nbsp; network
> >>>> connection. The
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; semantic differences are kind of meaningless
> >>>> from a usability point,
> >>>> &gt; &gt; &gt; since
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; flushing on close =/= published.&nbsp; What
> >>>> exactly does "graceful" convey
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; here?&nbsp; Flush the&nbsp; buffer on the
> >>>> client end and hope it makes it to
> >>>> &gt; &gt; the
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; server.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; Is there a&nbsp; difference whether you
> >>>> flush(or process) pending messages
> >>>> &gt; &gt; &gt; or
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; not? There is no guarantee that either case
> >>>> will ensure the message
> >>>> &gt; &gt; is
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; published.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; The only way to ensure that messages are
> >>>> published is to wait for the
> >>>> &gt; &gt; &gt; ack.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; The correct model should be to wait for 
> >>>> return
> >>>> on the blocking API,
> >>>> &gt; &gt; or
> >>>> &gt; &gt; &gt; wait
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; for future completion of the async API, then
> >>>> handle any publish
> >>>> &gt; &gt; errors
> >>>> &gt; &gt; &gt; and
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; then only close the producer.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; On Mon, Sep 27, 2021 at 8:50 PM Yunze Xu
> >>>> &gt; &gt; <y...@streamnative.io.invalid
> >>>> &gt; &gt; &gt; &gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt; wrote:
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Hi all,
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Recently I found a PR (
> >>>> https://github.com/apache/pulsar/pull/12195
> >>>> &gt <https://github.com/apache/pulsar/pull/12195&gt>; &gt; <
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> https://github.com/apache/pulsar/pull/12195&gt;) that
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; modifies the existing semantics of
> >>>> producer close. There're already
> >>>> &gt; &gt; &gt; some
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; communications in this PR, but I think
> >>>> it's better to start a
> >>>> &gt; &gt; &gt; discussion
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; here
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; to let more know.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; The existing implementation of producer
> >>>> close is:
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. Cancel all timers, including send and
> >>>> batch container
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`batchMessageContainer`).
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. Complete all pending messages
> >>>> (`pendingMessages`) with
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `AlreadyCloseException`.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; See `ProducerImpl#closeAsync` for 
> >>>> details.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; But the JavaDoc of `Producer#closeAsync`
> >>>> is:
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;&gt; No more writes will be accepted from
> >>>> this producer. Waits until all
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; pending write request are persisted.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Anyway, the document and implementation
> >>>> are inconsistent. But
> >>>> &gt; &gt; &gt; specifically,
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; we need to define the behavior for how to
> >>>> process `pendingMessages`
> >>>> &gt; &gt; &gt; and
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; `batchMessageContainer` when producer 
> >>>> call
> >>>> `closeAsync`.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 1. batchMessageContainer: contains the
> >>>> buffered single messages
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; (`Message<T&gt;`).
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; 2. pendingMessages: all inflight messages
> >>>> (`OpSendMsg`) in network.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; IMO, from the JavaDoc, only
> >>>> `pendingMessages` should be processed
> >>>> &gt; &gt; and
> >>>> &gt; &gt; &gt; the
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; messages in `batchMessageContainer` 
> >>>> should
> >>>> be discarded.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Since other clients might have already
> >>>> implemented the similar
> >>>> &gt; &gt; &gt; semantics of
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Java clients. If we changed the semantics
> >>>> now, the behaviors among
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; different
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; clients might be inconsistent.
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Should we add a configuration to support
> >>>> graceful close to follow
> >>>> &gt; &gt; the
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; docs? Or
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; just change the current behavior?
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt;
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Thanks,
> >>>> &gt; &gt; &gt; &gt;&gt;&gt;&gt; Yunze
> >>>> &gt; &gt; &gt; &gt;&gt;
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt; &gt;
> >>>> &gt; &gt;
>

Reply via email to