Bruno,

I think that we can't guarantee that the message will get
propagated perfectly in every case of, say network partitioning, though it
will work for many cases. So I would say it's best effort and I will
mention it in the kip.

As for when to use it I think we can discuss if this will be
sufficient when we come to it, as long as we document its capabilities.

I hope this answers your question,

Walker

On Tue, Sep 22, 2020 at 12:33 AM Bruno Cadonna <br...@confluent.io> wrote:

> Walker,
>
> I am sorry, but I still have a comment on the KIP although you have
> already started voting.
>
> What happens when a consumer of the group skips the rebalancing that
> propagates the shutdown request? Do you give a guarantee that all Kafka
> Streams clients are shutdown or is it best effort? If it is best effort,
> I guess the proposed method might not be used in critical cases where
> stopping record consumption may prevent or limit damage. I am not saying
> that it must be a guarantee, but this question should be answered in the
> KIP, IMO.
>
> Best,
> Bruno
>
> On 22.09.20 01:14, Walker Carlson wrote:
> > The error code right now is the assignor error, 2 is coded for shutdown
> > but it could be expanded to encode the causes or for other errors that
> need
> > to be communicated. For example we can add error code 3 to close the
> thread
> > but leave the client in an error state if we choose to do so in the
> future.
> >
> > On Mon, Sep 21, 2020 at 3:43 PM Boyang Chen <reluctanthero...@gmail.com>
> > wrote:
> >
> >> Thanks for the KIP Walker.
> >>
> >> In the KIP we mentioned "In order to communicate the shutdown request
> from
> >> one client to the others we propose to update the SubcriptionInfoData to
> >> include a short field which will encode an error code.", is there a
> >> dedicated error code that we should define here, or it is case-by-case?
> >>
> >> On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <wcarl...@confluent.io>
> >> wrote:
> >>
> >>> I am changing the name to "Add method to Shutdown entire Streams
> >>> Application" since we are no longer using an Exception, it seems more
> >>> appropriate.
> >>>
> >>> Also it looks like the discussion is pretty much finished so I will be
> >>> calling it to a vote.
> >>>
> >>> thanks,
> >>> Walker
> >>>
> >>> On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>
> >>>> Sounds good to me. I also feel that this call should be non-blocking
> >> but
> >>> I
> >>>> guess I was confused from the discussion thread that the API is
> >> designed
> >>> in
> >>>> a blocking fashion which contradicts with my perspective and hence I
> >>> asked
> >>>> for clarification :)
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <
> wcarl...@confluent.io
> >>>
> >>>> wrote:
> >>>>
> >>>>> Hello Guozhang,
> >>>>>
> >>>>> As for the logging I plan on having three logs. First, the client log
> >>>> that
> >>>>> it is requesting an application shutdown, second, the leader log
> >>>> processId
> >>>>> of the invoker, third, then the StreamRebalanceListener it logs that
> >> it
> >>>> is
> >>>>> closing because of an `stream.appShutdown`. Hopefully this will be
> >>> enough
> >>>>> to make the cause of the close clear.
> >>>>>
> >>>>> I see what you mean about the name being dependent on the behavior of
> >>> the
> >>>>> method so I will try to clarify.  This is how I currently envision
> >> the
> >>>> call
> >>>>> working.
> >>>>>
> >>>>> It is not an option to directly initiate a shutdown through a
> >>>> StreamThread
> >>>>> object from a KafkaStreams object because "KafkaConsumer is not safe
> >>> for
> >>>>> multi-threaded access". So how it works is that the method in
> >>>> KafkaStreams
> >>>>> finds the first alive thread and sets a flag in the StreamThread. The
> >>>>> StreamThread will receive the flag in its runloop then set the error
> >>> code
> >>>>> and trigger a rebalance, afterwards it will stop processing. After
> >> the
> >>>>> KafkaStreams has set the flag it will return true and continue
> >> running.
> >>>> If
> >>>>> there are no alive threads the shutdown will fail and return false.
> >>>>>
> >>>>> What do you think the blocking behavior should be? I think that the
> >>>>> StreamThread should definitely stop to prevent any of the corruption
> >> we
> >>>> are
> >>>>> trying to avoid by shutting down, but I don't see any advantage of
> >> the
> >>>>> KafkaStreams call blocking.
> >>>>>
> >>>>> You are correct to be concerned about the uncaught exception handler.
> >>> If
> >>>>> there are no live StreamThreads the rebalance will not be started at
> >>> all
> >>>>> and this would be a problem. However the user should be aware of this
> >>>>> because of the return of false and react appropriately. This would
> >> also
> >>>> be
> >>>>> fixed if we implemented our own handler so we can rebalance before
> >> the
> >>>>> StreamThread closes.
> >>>>>
> >>>>> With that in mind I believe that `initiateClosingAllClients` would be
> >>> an
> >>>>> appropriate name. WDYT?
> >>>>>
> >>>>> Walker
> >>>>>
> >>>>>
> >>>>> On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hello Walker,
> >>>>>>
> >>>>>> Thanks for the updated KIP. Previously I'm also a bit hesitant on
> >> the
> >>>>> newly
> >>>>>> added public exception to communicate user-requested whole app
> >>>> shutdown,
> >>>>>> but the reason I did not bring this up is that I feel there's
> >> still a
> >>>>> need
> >>>>>> from operational aspects that we can differentiate the scenario
> >> where
> >>>> an
> >>>>>> instance is closed because of a) local `streams.close()` triggered,
> >>> or
> >>>>> b) a
> >>>>>> remote instance's `stream.shutdownApp` triggered. So if we are
> >> going
> >>> to
> >>>>>> remove that exception (which I'm also in favor), we should at least
> >>>>>> differentiate from the log4j levels.
> >>>>>>
> >>>>>> Regarding the semantics that "It should wait to receive the
> >> shutdown
> >>>>>> request in the rebalance it triggers." I'm not sure I fully
> >>> understand,
> >>>>>> since this may be triggered from the stream thread's uncaught
> >>> exception
> >>>>>> handler, if that thread is already dead then maybe a rebalance
> >>> listener
> >>>>>> would not even be fired at all. Although I know this is some
> >>>>> implementation
> >>>>>> details that you probably abstract away from the proposal, I'd like
> >>> to
> >>>>> make
> >>>>>> sure that we are on the same page regarding its blocking behavior
> >>> since
> >>>>> it
> >>>>>> is quite crucial to users as well. Could you elaborate a bit more?
> >>>>>>
> >>>>>> Regarding the function name, I guess my personal preference would
> >>>> depend
> >>>>> on
> >>>>>> its actual blocking behavior as above :)
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <
> >>> wcarl...@confluent.io
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hello all again,
> >>>>>>>
> >>>>>>> I have updated the kip to no longer use an exception and instead
> >>> add
> >>>> a
> >>>>>>> method to the KafkaStreams class, this seems to satisfy
> >> everyone's
> >>>>>> concerns
> >>>>>>> about how and when the functionality will be invoked.
> >>>>>>>
> >>>>>>> There is still a question over the name. We must decide between
> >>>>>>> "shutdownApplication", "initateCloseAll", "closeAllInstaces" or
> >>> some
> >>>>>>> variation.
> >>>>>>>
> >>>>>>> I am rather indifferent to the name. I think that they all get
> >> the
> >>>>> point
> >>>>>>> across. The most clear to me would be shutdownApplicaiton or
> >>>>>>> closeAllInstacnes but WDYT?
> >>>>>>>
> >>>>>>> Walker
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <
> >>>> wcarl...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello Guozhang and Bruno,
> >>>>>>>>
> >>>>>>>> Thanks for the feedback.
> >>>>>>>>
> >>>>>>>> I will respond in two parts but I would like to clarify that I
> >> am
> >>>> not
> >>>>>>> tied
> >>>>>>>> down to any of these names, but since we are still deciding if
> >> we
> >>>>> want
> >>>>>> to
> >>>>>>>> have an exception or not I would rather not get tripped up on
> >>>>> choosing
> >>>>>> a
> >>>>>>>> name just yet.
> >>>>>>>>
> >>>>>>>> Guozhang:
> >>>>>>>> 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA
> >>>> error. I
> >>>>>> am
> >>>>>>>> not planning on changing the behavior of handling source topic
> >>>>>> deletion.
> >>>>>>>> That is being down in kip-662 by Bruno. He is enabling the user
> >>> to
> >>>>>> create
> >>>>>>>> their own handler and shutdownApplication is giving them the
> >>> option
> >>>>> to
> >>>>>>>> shutdown.
> >>>>>>>>
> >>>>>>>> 2) It seems that we will remove the Exception entirely so this
> >>>> won't
> >>>>>>>> matter (below)
> >>>>>>>>
> >>>>>>>> 3) It should wait to receive the shutdown request in the
> >>> rebalance
> >>>> it
> >>>>>>>> triggers. That might be a better name. I am torn between using
> >>>>>>>> "application" or "all Instances" in a couple places. I think we
> >>>>> should
> >>>>>>> pick
> >>>>>>>> one and be consistent but I am unsure which is more
> >> descriptive.
> >>>>>>>>
> >>>>>>>> Bruno:
> >>>>>>>> I agree that in principle Exceptions should be used in
> >> exception
> >>>>> cases.
> >>>>>>>> And I have added a method in KafkaStreams to handle cases where
> >>> an
> >>>>>>>> Exception would not be appropriate. I guess you think that
> >> users
> >>>>> should
> >>>>>>>> never throw a Streams Exception then they could always throw
> >> and
> >>>>> catch
> >>>>>>>> their own exception and call shutdown Application from there.
> >>> This
> >>>>>> would
> >>>>>>>> allow them to exit a processor if they wanted to shutdown from
> >>>>> there. I
> >>>>>>>> will update the Kip to remove the exception.
> >>>>>>>>
> >>>>>>>> I would like to add that in the case of trying to shutdown from
> >>> the
> >>>>>>>> uncaught exception handler that we need at least one
> >> StreamThread
> >>>> to
> >>>>> be
> >>>>>>>> alive. So having our own handler instead of using the default
> >> one
> >>>>> after
> >>>>>>> the
> >>>>>>>> thread has died would let us always close the application.
> >>>>>>>>
> >>>>>>>> Walker
> >>>>>>>>
> >>>>>>>> On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <
> >>> br...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Walker,
> >>>>>>>>>
> >>>>>>>>> Thank you for the KIP!
> >>>>>>>>>
> >>>>>>>>> I like the motivation of the KIP and the method to request a
> >>>>> shutdown
> >>>>>> of
> >>>>>>>>> all Kafka Streams clients of Kafka Streams application. I
> >> think
> >>> we
> >>>>>>>>> really need such functionality to react on errors. However, I
> >> am
> >>>> not
> >>>>>>>>> convinced that throwing an exception to shutdown all clients
> >> is
> >>> a
> >>>>> good
> >>>>>>>>> idea.
> >>>>>>>>>
> >>>>>>>>> An exception signals an exceptional situation to which we can
> >>>> react
> >>>>> in
> >>>>>>>>> multiple ways depending on the context. The exception that you
> >>>>> propose
> >>>>>>>>> seems rather a well defined user command than a exceptional
> >>>>> situation
> >>>>>> to
> >>>>>>>>> me. IMO, we should not use exceptions to control program flow
> >>>>> because
> >>>>>> it
> >>>>>>>>> mixes cause and effect. Hence, I would propose an invariant
> >> for
> >>>>> public
> >>>>>>>>> exceptions in Kafka Streams. The public exceptions in Kafka
> >>>> Streams
> >>>>>>>>> should be caught by users, not thrown. But maybe I am missing
> >>> the
> >>>>> big
> >>>>>>>>> advantage of using an exception here.
> >>>>>>>>>
> >>>>>>>>> I echo Guozhang's third point about clarifying the behavior of
> >>> the
> >>>>>>>>> method and the naming.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>> On 16.09.20 06:28, Guozhang Wang wrote:
> >>>>>>>>>> Hello Walker,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for proposing the KIP! I have a couple more comments:
> >>>>>>>>>>
> >>>>>>>>>> 1. ShutdownRequestedException: my understanding is that this
> >>>>>> exception
> >>>>>>>>> is
> >>>>>>>>>> only used if the application-shutdown was initiated by by
> >> the
> >>>> user
> >>>>>>>>>> triggered "shutdownApplication()", otherwise e.g. if it is
> >> due
> >>>> to
> >>>>>>> source
> >>>>>>>>>> topic not found and Streams library decides to close the
> >> whole
> >>>>>>>>> application
> >>>>>>>>>> automatically, we would still throw the original exception
> >>>>>>>>>> a.k.a. MissingSourceTopicException to the uncaught exception
> >>>>>> handling.
> >>>>>>>>> Is
> >>>>>>>>>> that the case? Also for this exception, which package are
> >> you
> >>>>>>> proposing
> >>>>>>>>> to
> >>>>>>>>>> add to?
> >>>>>>>>>>
> >>>>>>>>>> 2. ShutdownRequestedException: for its constructor, I'm
> >>>> wondering
> >>>>>> what
> >>>>>>>>>> Throwable "root cause" could it ever be? Since I'm guessing
> >>> here
> >>>>>> that
> >>>>>>> we
> >>>>>>>>>> would just use a single error code in the protocol still to
> >>> tell
> >>>>>> other
> >>>>>>>>>> instances to shutdown, and that error code would not allow
> >> us
> >>> to
> >>>>>>> encode
> >>>>>>>>> any
> >>>>>>>>>> more information like root causes at all, it seems that
> >>>> parameter
> >>>>>>> would
> >>>>>>>>>> always be null.
> >>>>>>>>>>
> >>>>>>>>>> 3. shutdownApplication: again I'd like to clarify, would
> >> this
> >>>>>> function
> >>>>>>>>>> block on the local instance to complete shutting down all
> >> its
> >>>>>> threads
> >>>>>>>>> like
> >>>>>>>>>> `close()` as well, or would it just to initiate the shutdown
> >>> and
> >>>>> not
> >>>>>>>>> wait
> >>>>>>>>>> for local threads at all? Also a nit suggestion regarding
> >> the
> >>>>> name,
> >>>>>> if
> >>>>>>>>> it
> >>>>>>>>>> is only for initiating the shutdown, maybe naming as
> >>>>>>> "initiateCloseAll"
> >>>>>>>>>> would be more specific?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson <
> >>>>>>> wcarl...@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello Matthias and Sophie,
> >>>>>>>>>>>
> >>>>>>>>>>> You both make good points. I will respond to the separately
> >>>>> below.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Matthias:
> >>>>>>>>>>> That is a fair point. KIP-662
> >>>>>>>>>>> <
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> >>>>>>>>>>>> ,
> >>>>>>>>>>> which
> >>>>>>>>>>> is accepted, will make it so Source topic deletion will
> >> make
> >>> it
> >>>>> to
> >>>>>>> the
> >>>>>>>>>>> uncaught exception handler. Shutdown can be initiated from
> >>>> there.
> >>>>>>>>> However
> >>>>>>>>>>> this would mean that the stream thread is already dead. So
> >> I
> >>>>> would
> >>>>>>>>> have to
> >>>>>>>>>>> rethink the exception for this use case, perhaps it would
> >> be
> >>>>> needed
> >>>>>>> in
> >>>>>>>>> the
> >>>>>>>>>>> KakfaStreams object. But this still leaves the case where
> >>> there
> >>>>> is
> >>>>>>>>> only one
> >>>>>>>>>>> stream thread. I will think about it.
> >>>>>>>>>>>
> >>>>>>>>>>> Maybe the source topics are a bad example as it makes this
> >>> kip
> >>>>>>>>> dependent on
> >>>>>>>>>>> Kip-662 getting implemented in a certain way. However this
> >> is
> >>>> not
> >>>>>> the
> >>>>>>>>> only
> >>>>>>>>>>> reason this could be useful here
> >>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-4748> is a
> >> jira
> >>>>>> ticket
> >>>>>>>>> asking
> >>>>>>>>>>> for the same functionality. I have added a few other use
> >>> cases
> >>>> to
> >>>>>> the
> >>>>>>>>> kip.
> >>>>>>>>>>> Although I will still be rethinking where I want to add
> >> this
> >>>>>>>>> functionality
> >>>>>>>>>>> and whether it should be an exception or not.
> >>>>>>>>>>>
> >>>>>>>>>>> Sophie:
> >>>>>>>>>>> I agree that shutting down an instance could also be
> >> useful.
> >>>>> There
> >>>>>>> was
> >>>>>>>>> some
> >>>>>>>>>>> discussion about this on KIP-663. It seems that we came to
> >>> the
> >>>>>>>>> conclusion
> >>>>>>>>>>> that close(Duration.ZERO) would be sufficient. link
> >>>>>>>>>>> <
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e
> >>>>>>>>>>>>
> >>>>>>>>>>> to
> >>>>>>>>>>> thread
> >>>>>>>>>>>
> >>>>>>>>>>> Also I am not set on the name ShutdownRequested. If we
> >> decide
> >>>> to
> >>>>>> keep
> >>>>>>>>> at as
> >>>>>>>>>>> an exception your idea is probably a better name.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the feedback,
> >>>>>>>>>>> Walker
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <
> >>>>> mj...@apache.org
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> It seem that the new exception would need to be thrown by
> >>> user
> >>>>>> code?
> >>>>>>>>>>>> However, in the motivation you mention the scenario of a
> >>>> missing
> >>>>>>>>> source
> >>>>>>>>>>>> topic that a user cannot detect, but KafkaStreams runtime
> >>>> would
> >>>>> be
> >>>>>>>>>>>> responsible to handle.
> >>>>>>>>>>>>
> >>>>>>>>>>>> How do both things go together?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 9/11/20 10:31 AM, Walker Carlson wrote:
> >>>>>>>>>>>>> Hello all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have created KIP-671 to give the option to shutdown a
> >>>> streams
> >>>>>>>>>>>>> application in response to an error.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This is because of the Jira ticket
> >>>>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-9331>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Please give it a look and let me know if you have any
> >>>> feedback.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Walker
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >
>

Reply via email to