I am changing the name to "Add method to Shutdown entire Streams
Application" since we are no longer using an Exception, it seems more
appropriate.

Also it looks like the discussion is pretty much finished so I will be
calling it to a vote.

thanks,
Walker

On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Sounds good to me. I also feel that this call should be non-blocking but I
> guess I was confused from the discussion thread that the API is designed in
> a blocking fashion which contradicts with my perspective and hence I asked
> for clarification :)
>
> Guozhang
>
>
> On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <wcarl...@confluent.io>
> wrote:
>
> > Hello Guozhang,
> >
> > As for the logging I plan on having three logs. First, the client log
> that
> > it is requesting an application shutdown, second, the leader log
> processId
> > of the invoker, third, then the StreamRebalanceListener it logs that it
> is
> > closing because of an `stream.appShutdown`. Hopefully this will be enough
> > to make the cause of the close clear.
> >
> > I see what you mean about the name being dependent on the behavior of the
> > method so I will try to clarify.  This is how I currently envision the
> call
> > working.
> >
> > It is not an option to directly initiate a shutdown through a
> StreamThread
> > object from a KafkaStreams object because "KafkaConsumer is not safe for
> > multi-threaded access". So how it works is that the method in
> KafkaStreams
> > finds the first alive thread and sets a flag in the StreamThread. The
> > StreamThread will receive the flag in its runloop then set the error code
> > and trigger a rebalance, afterwards it will stop processing. After the
> > KafkaStreams has set the flag it will return true and continue running.
> If
> > there are no alive threads the shutdown will fail and return false.
> >
> > What do you think the blocking behavior should be? I think that the
> > StreamThread should definitely stop to prevent any of the corruption we
> are
> > trying to avoid by shutting down, but I don't see any advantage of the
> > KafkaStreams call blocking.
> >
> > You are correct to be concerned about the uncaught exception handler. If
> > there are no live StreamThreads the rebalance will not be started at all
> > and this would be a problem. However the user should be aware of this
> > because of the return of false and react appropriately. This would also
> be
> > fixed if we implemented our own handler so we can rebalance before the
> > StreamThread closes.
> >
> > With that in mind I believe that `initiateClosingAllClients` would be an
> > appropriate name. WDYT?
> >
> > Walker
> >
> >
> > On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Walker,
> > >
> > > Thanks for the updated KIP. Previously I'm also a bit hesitant on the
> > newly
> > > added public exception to communicate user-requested whole app
> shutdown,
> > > but the reason I did not bring this up is that I feel there's still a
> > need
> > > from operational aspects that we can differentiate the scenario where
> an
> > > instance is closed because of a) local `streams.close()` triggered, or
> > b) a
> > > remote instance's `stream.shutdownApp` triggered. So if we are going to
> > > remove that exception (which I'm also in favor), we should at least
> > > differentiate from the log4j levels.
> > >
> > > Regarding the semantics that "It should wait to receive the shutdown
> > > request in the rebalance it triggers." I'm not sure I fully understand,
> > > since this may be triggered from the stream thread's uncaught exception
> > > handler, if that thread is already dead then maybe a rebalance listener
> > > would not even be fired at all. Although I know this is some
> > implementation
> > > details that you probably abstract away from the proposal, I'd like to
> > make
> > > sure that we are on the same page regarding its blocking behavior since
> > it
> > > is quite crucial to users as well. Could you elaborate a bit more?
> > >
> > > Regarding the function name, I guess my personal preference would
> depend
> > on
> > > its actual blocking behavior as above :)
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <wcarl...@confluent.io
> >
> > > wrote:
> > >
> > > > Hello all again,
> > > >
> > > > I have updated the kip to no longer use an exception and instead add
> a
> > > > method to the KafkaStreams class, this seems to satisfy everyone's
> > > concerns
> > > > about how and when the functionality will be invoked.
> > > >
> > > > There is still a question over the name. We must decide between
> > > > "shutdownApplication", "initateCloseAll", "closeAllInstaces" or some
> > > > variation.
> > > >
> > > > I am rather indifferent to the name. I think that they all get the
> > point
> > > > across. The most clear to me would be shutdownApplicaiton or
> > > > closeAllInstacnes but WDYT?
> > > >
> > > > Walker
> > > >
> > > >
> > > >
> > > > On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <
> wcarl...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hello Guozhang and Bruno,
> > > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > > I will respond in two parts but I would like to clarify that I am
> not
> > > > tied
> > > > > down to any of these names, but since we are still deciding if we
> > want
> > > to
> > > > > have an exception or not I would rather not get tripped up on
> > choosing
> > > a
> > > > > name just yet.
> > > > >
> > > > > Guozhang:
> > > > > 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA
> error. I
> > > am
> > > > > not planning on changing the behavior of handling source topic
> > > deletion.
> > > > > That is being down in kip-662 by Bruno. He is enabling the user to
> > > create
> > > > > their own handler and shutdownApplication is giving them the option
> > to
> > > > > shutdown.
> > > > >
> > > > > 2) It seems that we will remove the Exception entirely so this
> won't
> > > > > matter (below)
> > > > >
> > > > > 3) It should wait to receive the shutdown request in the rebalance
> it
> > > > > triggers. That might be a better name. I am torn between using
> > > > > "application" or "all Instances" in a couple places. I think we
> > should
> > > > pick
> > > > > one and be consistent but I am unsure which is more descriptive.
> > > > >
> > > > > Bruno:
> > > > > I agree that in principle Exceptions should be used in exception
> > cases.
> > > > > And I have added a method in KafkaStreams to handle cases where an
> > > > > Exception would not be appropriate. I guess you think that users
> > should
> > > > > never throw a Streams Exception then they could always throw and
> > catch
> > > > > their own exception and call shutdown Application from there. This
> > > would
> > > > > allow them to exit a processor if they wanted to shutdown from
> > there. I
> > > > > will update the Kip to remove the exception.
> > > > >
> > > > > I would like to add that in the case of trying to shutdown from the
> > > > > uncaught exception handler that we need at least one StreamThread
> to
> > be
> > > > > alive. So having our own handler instead of using the default one
> > after
> > > > the
> > > > > thread has died would let us always close the application.
> > > > >
> > > > > Walker
> > > > >
> > > > > On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <br...@confluent.io>
> > > > wrote:
> > > > >
> > > > >> Hi Walker,
> > > > >>
> > > > >> Thank you for the KIP!
> > > > >>
> > > > >> I like the motivation of the KIP and the method to request a
> > shutdown
> > > of
> > > > >> all Kafka Streams clients of Kafka Streams application. I think we
> > > > >> really need such functionality to react on errors. However, I am
> not
> > > > >> convinced that throwing an exception to shutdown all clients is a
> > good
> > > > >> idea.
> > > > >>
> > > > >> An exception signals an exceptional situation to which we can
> react
> > in
> > > > >> multiple ways depending on the context. The exception that you
> > propose
> > > > >> seems rather a well defined user command than a exceptional
> > situation
> > > to
> > > > >> me. IMO, we should not use exceptions to control program flow
> > because
> > > it
> > > > >> mixes cause and effect. Hence, I would propose an invariant for
> > public
> > > > >> exceptions in Kafka Streams. The public exceptions in Kafka
> Streams
> > > > >> should be caught by users, not thrown. But maybe I am missing the
> > big
> > > > >> advantage of using an exception here.
> > > > >>
> > > > >> I echo Guozhang's third point about clarifying the behavior of the
> > > > >> method and the naming.
> > > > >>
> > > > >> Best,
> > > > >> Bruno
> > > > >>
> > > > >> On 16.09.20 06:28, Guozhang Wang wrote:
> > > > >> > Hello Walker,
> > > > >> >
> > > > >> > Thanks for proposing the KIP! I have a couple more comments:
> > > > >> >
> > > > >> > 1. ShutdownRequestedException: my understanding is that this
> > > exception
> > > > >> is
> > > > >> > only used if the application-shutdown was initiated by by the
> user
> > > > >> > triggered "shutdownApplication()", otherwise e.g. if it is due
> to
> > > > source
> > > > >> > topic not found and Streams library decides to close the whole
> > > > >> application
> > > > >> > automatically, we would still throw the original exception
> > > > >> > a.k.a. MissingSourceTopicException to the uncaught exception
> > > handling.
> > > > >> Is
> > > > >> > that the case? Also for this exception, which package are you
> > > > proposing
> > > > >> to
> > > > >> > add to?
> > > > >> >
> > > > >> > 2. ShutdownRequestedException: for its constructor, I'm
> wondering
> > > what
> > > > >> > Throwable "root cause" could it ever be? Since I'm guessing here
> > > that
> > > > we
> > > > >> > would just use a single error code in the protocol still to tell
> > > other
> > > > >> > instances to shutdown, and that error code would not allow us to
> > > > encode
> > > > >> any
> > > > >> > more information like root causes at all, it seems that
> parameter
> > > > would
> > > > >> > always be null.
> > > > >> >
> > > > >> > 3. shutdownApplication: again I'd like to clarify, would this
> > > function
> > > > >> > block on the local instance to complete shutting down all its
> > > threads
> > > > >> like
> > > > >> > `close()` as well, or would it just to initiate the shutdown and
> > not
> > > > >> wait
> > > > >> > for local threads at all? Also a nit suggestion regarding the
> > name,
> > > if
> > > > >> it
> > > > >> > is only for initiating the shutdown, maybe naming as
> > > > "initiateCloseAll"
> > > > >> > would be more specific?
> > > > >> >
> > > > >> >
> > > > >> > Guozhang
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson <
> > > > wcarl...@confluent.io>
> > > > >> > wrote:
> > > > >> >
> > > > >> >> Hello Matthias and Sophie,
> > > > >> >>
> > > > >> >> You both make good points. I will respond to the separately
> > below.
> > > > >> >>
> > > > >> >>
> > > > >> >> Matthias:
> > > > >> >> That is a fair point. KIP-662
> > > > >> >> <
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> > > > >> >>> ,
> > > > >> >> which
> > > > >> >> is accepted, will make it so Source topic deletion will make it
> > to
> > > > the
> > > > >> >> uncaught exception handler. Shutdown can be initiated from
> there.
> > > > >> However
> > > > >> >> this would mean that the stream thread is already dead. So I
> > would
> > > > >> have to
> > > > >> >> rethink the exception for this use case, perhaps it would be
> > needed
> > > > in
> > > > >> the
> > > > >> >> KakfaStreams object. But this still leaves the case where there
> > is
> > > > >> only one
> > > > >> >> stream thread. I will think about it.
> > > > >> >>
> > > > >> >> Maybe the source topics are a bad example as it makes this kip
> > > > >> dependent on
> > > > >> >> Kip-662 getting implemented in a certain way. However this is
> not
> > > the
> > > > >> only
> > > > >> >> reason this could be useful here
> > > > >> >> <https://issues.apache.org/jira/browse/KAFKA-4748> is a jira
> > > ticket
> > > > >> asking
> > > > >> >> for the same functionality. I have added a few other use cases
> to
> > > the
> > > > >> kip.
> > > > >> >> Although I will still be rethinking where I want to add this
> > > > >> functionality
> > > > >> >> and whether it should be an exception or not.
> > > > >> >>
> > > > >> >> Sophie:
> > > > >> >> I agree that shutting down an instance could also be useful.
> > There
> > > > was
> > > > >> some
> > > > >> >> discussion about this on KIP-663. It seems that we came to the
> > > > >> conclusion
> > > > >> >> that close(Duration.ZERO) would be sufficient. link
> > > > >> >> <
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e
> > > > >> >>>
> > > > >> >> to
> > > > >> >> thread
> > > > >> >>
> > > > >> >> Also I am not set on the name ShutdownRequested. If we decide
> to
> > > keep
> > > > >> at as
> > > > >> >> an exception your idea is probably a better name.
> > > > >> >>
> > > > >> >>
> > > > >> >> Thanks for the feedback,
> > > > >> >> Walker
> > > > >> >>
> > > > >> >>
> > > > >> >> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <
> > mj...@apache.org
> > > >
> > > > >> wrote:
> > > > >> >>
> > > > >> >>> Thanks for the KIP.
> > > > >> >>>
> > > > >> >>> It seem that the new exception would need to be thrown by user
> > > code?
> > > > >> >>> However, in the motivation you mention the scenario of a
> missing
> > > > >> source
> > > > >> >>> topic that a user cannot detect, but KafkaStreams runtime
> would
> > be
> > > > >> >>> responsible to handle.
> > > > >> >>>
> > > > >> >>> How do both things go together?
> > > > >> >>>
> > > > >> >>>
> > > > >> >>> -Matthias
> > > > >> >>>
> > > > >> >>> On 9/11/20 10:31 AM, Walker Carlson wrote:
> > > > >> >>>> Hello all,
> > > > >> >>>>
> > > > >> >>>> I have created KIP-671 to give the option to shutdown a
> streams
> > > > >> >>>> application in response to an error.
> > > > >> >>>>
> > > > >> >>>
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > > > >> >>>>
> > > > >> >>>> This is because of the Jira ticket
> > > > >> >>>> <https://issues.apache.org/jira/browse/KAFKA-9331>
> > > > >> >>>>
> > > > >> >>>> Please give it a look and let me know if you have any
> feedback.
> > > > >> >>>>
> > > > >> >>>> Thanks,
> > > > >> >>>> Walker
> > > > >> >>>>
> > > > >> >>>
> > > > >> >>>
> > > > >> >>
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to