Bruno, I think that we can't guarantee that the message will get propagated perfectly in every case of, say network partitioning, though it will work for many cases. So I would say it's best effort and I will mention it in the kip.
As for when to use it I think we can discuss if this will be sufficient when we come to it, as long as we document its capabilities. I hope this answers your question, Walker On Tue, Sep 22, 2020 at 12:33 AM Bruno Cadonna <br...@confluent.io> wrote: > Walker, > > I am sorry, but I still have a comment on the KIP although you have > already started voting. > > What happens when a consumer of the group skips the rebalancing that > propagates the shutdown request? Do you give a guarantee that all Kafka > Streams clients are shutdown or is it best effort? If it is best effort, > I guess the proposed method might not be used in critical cases where > stopping record consumption may prevent or limit damage. I am not saying > that it must be a guarantee, but this question should be answered in the > KIP, IMO. > > Best, > Bruno > > On 22.09.20 01:14, Walker Carlson wrote: > > The error code right now is the assignor error, 2 is coded for shutdown > > but it could be expanded to encode the causes or for other errors that > need > > to be communicated. For example we can add error code 3 to close the > thread > > but leave the client in an error state if we choose to do so in the > future. > > > > On Mon, Sep 21, 2020 at 3:43 PM Boyang Chen <reluctanthero...@gmail.com> > > wrote: > > > >> Thanks for the KIP Walker. > >> > >> In the KIP we mentioned "In order to communicate the shutdown request > from > >> one client to the others we propose to update the SubcriptionInfoData to > >> include a short field which will encode an error code.", is there a > >> dedicated error code that we should define here, or it is case-by-case? > >> > >> On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <wcarl...@confluent.io> > >> wrote: > >> > >>> I am changing the name to "Add method to Shutdown entire Streams > >>> Application" since we are no longer using an Exception, it seems more > >>> appropriate. > >>> > >>> Also it looks like the discussion is pretty much finished so I will be > >>> calling it to a vote. > >>> > >>> thanks, > >>> Walker > >>> > >>> On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>> > >>>> Sounds good to me. I also feel that this call should be non-blocking > >> but > >>> I > >>>> guess I was confused from the discussion thread that the API is > >> designed > >>> in > >>>> a blocking fashion which contradicts with my perspective and hence I > >>> asked > >>>> for clarification :) > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson < > wcarl...@confluent.io > >>> > >>>> wrote: > >>>> > >>>>> Hello Guozhang, > >>>>> > >>>>> As for the logging I plan on having three logs. First, the client log > >>>> that > >>>>> it is requesting an application shutdown, second, the leader log > >>>> processId > >>>>> of the invoker, third, then the StreamRebalanceListener it logs that > >> it > >>>> is > >>>>> closing because of an `stream.appShutdown`. Hopefully this will be > >>> enough > >>>>> to make the cause of the close clear. > >>>>> > >>>>> I see what you mean about the name being dependent on the behavior of > >>> the > >>>>> method so I will try to clarify. This is how I currently envision > >> the > >>>> call > >>>>> working. > >>>>> > >>>>> It is not an option to directly initiate a shutdown through a > >>>> StreamThread > >>>>> object from a KafkaStreams object because "KafkaConsumer is not safe > >>> for > >>>>> multi-threaded access". So how it works is that the method in > >>>> KafkaStreams > >>>>> finds the first alive thread and sets a flag in the StreamThread. The > >>>>> StreamThread will receive the flag in its runloop then set the error > >>> code > >>>>> and trigger a rebalance, afterwards it will stop processing. After > >> the > >>>>> KafkaStreams has set the flag it will return true and continue > >> running. > >>>> If > >>>>> there are no alive threads the shutdown will fail and return false. > >>>>> > >>>>> What do you think the blocking behavior should be? I think that the > >>>>> StreamThread should definitely stop to prevent any of the corruption > >> we > >>>> are > >>>>> trying to avoid by shutting down, but I don't see any advantage of > >> the > >>>>> KafkaStreams call blocking. > >>>>> > >>>>> You are correct to be concerned about the uncaught exception handler. > >>> If > >>>>> there are no live StreamThreads the rebalance will not be started at > >>> all > >>>>> and this would be a problem. However the user should be aware of this > >>>>> because of the return of false and react appropriately. This would > >> also > >>>> be > >>>>> fixed if we implemented our own handler so we can rebalance before > >> the > >>>>> StreamThread closes. > >>>>> > >>>>> With that in mind I believe that `initiateClosingAllClients` would be > >>> an > >>>>> appropriate name. WDYT? > >>>>> > >>>>> Walker > >>>>> > >>>>> > >>>>> On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hello Walker, > >>>>>> > >>>>>> Thanks for the updated KIP. Previously I'm also a bit hesitant on > >> the > >>>>> newly > >>>>>> added public exception to communicate user-requested whole app > >>>> shutdown, > >>>>>> but the reason I did not bring this up is that I feel there's > >> still a > >>>>> need > >>>>>> from operational aspects that we can differentiate the scenario > >> where > >>>> an > >>>>>> instance is closed because of a) local `streams.close()` triggered, > >>> or > >>>>> b) a > >>>>>> remote instance's `stream.shutdownApp` triggered. So if we are > >> going > >>> to > >>>>>> remove that exception (which I'm also in favor), we should at least > >>>>>> differentiate from the log4j levels. > >>>>>> > >>>>>> Regarding the semantics that "It should wait to receive the > >> shutdown > >>>>>> request in the rebalance it triggers." I'm not sure I fully > >>> understand, > >>>>>> since this may be triggered from the stream thread's uncaught > >>> exception > >>>>>> handler, if that thread is already dead then maybe a rebalance > >>> listener > >>>>>> would not even be fired at all. Although I know this is some > >>>>> implementation > >>>>>> details that you probably abstract away from the proposal, I'd like > >>> to > >>>>> make > >>>>>> sure that we are on the same page regarding its blocking behavior > >>> since > >>>>> it > >>>>>> is quite crucial to users as well. Could you elaborate a bit more? > >>>>>> > >>>>>> Regarding the function name, I guess my personal preference would > >>>> depend > >>>>> on > >>>>>> its actual blocking behavior as above :) > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson < > >>> wcarl...@confluent.io > >>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hello all again, > >>>>>>> > >>>>>>> I have updated the kip to no longer use an exception and instead > >>> add > >>>> a > >>>>>>> method to the KafkaStreams class, this seems to satisfy > >> everyone's > >>>>>> concerns > >>>>>>> about how and when the functionality will be invoked. > >>>>>>> > >>>>>>> There is still a question over the name. We must decide between > >>>>>>> "shutdownApplication", "initateCloseAll", "closeAllInstaces" or > >>> some > >>>>>>> variation. > >>>>>>> > >>>>>>> I am rather indifferent to the name. I think that they all get > >> the > >>>>> point > >>>>>>> across. The most clear to me would be shutdownApplicaiton or > >>>>>>> closeAllInstacnes but WDYT? > >>>>>>> > >>>>>>> Walker > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson < > >>>> wcarl...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hello Guozhang and Bruno, > >>>>>>>> > >>>>>>>> Thanks for the feedback. > >>>>>>>> > >>>>>>>> I will respond in two parts but I would like to clarify that I > >> am > >>>> not > >>>>>>> tied > >>>>>>>> down to any of these names, but since we are still deciding if > >> we > >>>>> want > >>>>>> to > >>>>>>>> have an exception or not I would rather not get tripped up on > >>>>> choosing > >>>>>> a > >>>>>>>> name just yet. > >>>>>>>> > >>>>>>>> Guozhang: > >>>>>>>> 1) you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA > >>>> error. I > >>>>>> am > >>>>>>>> not planning on changing the behavior of handling source topic > >>>>>> deletion. > >>>>>>>> That is being down in kip-662 by Bruno. He is enabling the user > >>> to > >>>>>> create > >>>>>>>> their own handler and shutdownApplication is giving them the > >>> option > >>>>> to > >>>>>>>> shutdown. > >>>>>>>> > >>>>>>>> 2) It seems that we will remove the Exception entirely so this > >>>> won't > >>>>>>>> matter (below) > >>>>>>>> > >>>>>>>> 3) It should wait to receive the shutdown request in the > >>> rebalance > >>>> it > >>>>>>>> triggers. That might be a better name. I am torn between using > >>>>>>>> "application" or "all Instances" in a couple places. I think we > >>>>> should > >>>>>>> pick > >>>>>>>> one and be consistent but I am unsure which is more > >> descriptive. > >>>>>>>> > >>>>>>>> Bruno: > >>>>>>>> I agree that in principle Exceptions should be used in > >> exception > >>>>> cases. > >>>>>>>> And I have added a method in KafkaStreams to handle cases where > >>> an > >>>>>>>> Exception would not be appropriate. I guess you think that > >> users > >>>>> should > >>>>>>>> never throw a Streams Exception then they could always throw > >> and > >>>>> catch > >>>>>>>> their own exception and call shutdown Application from there. > >>> This > >>>>>> would > >>>>>>>> allow them to exit a processor if they wanted to shutdown from > >>>>> there. I > >>>>>>>> will update the Kip to remove the exception. > >>>>>>>> > >>>>>>>> I would like to add that in the case of trying to shutdown from > >>> the > >>>>>>>> uncaught exception handler that we need at least one > >> StreamThread > >>>> to > >>>>> be > >>>>>>>> alive. So having our own handler instead of using the default > >> one > >>>>> after > >>>>>>> the > >>>>>>>> thread has died would let us always close the application. > >>>>>>>> > >>>>>>>> Walker > >>>>>>>> > >>>>>>>> On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna < > >>> br...@confluent.io> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Walker, > >>>>>>>>> > >>>>>>>>> Thank you for the KIP! > >>>>>>>>> > >>>>>>>>> I like the motivation of the KIP and the method to request a > >>>>> shutdown > >>>>>> of > >>>>>>>>> all Kafka Streams clients of Kafka Streams application. I > >> think > >>> we > >>>>>>>>> really need such functionality to react on errors. However, I > >> am > >>>> not > >>>>>>>>> convinced that throwing an exception to shutdown all clients > >> is > >>> a > >>>>> good > >>>>>>>>> idea. > >>>>>>>>> > >>>>>>>>> An exception signals an exceptional situation to which we can > >>>> react > >>>>> in > >>>>>>>>> multiple ways depending on the context. The exception that you > >>>>> propose > >>>>>>>>> seems rather a well defined user command than a exceptional > >>>>> situation > >>>>>> to > >>>>>>>>> me. IMO, we should not use exceptions to control program flow > >>>>> because > >>>>>> it > >>>>>>>>> mixes cause and effect. Hence, I would propose an invariant > >> for > >>>>> public > >>>>>>>>> exceptions in Kafka Streams. The public exceptions in Kafka > >>>> Streams > >>>>>>>>> should be caught by users, not thrown. But maybe I am missing > >>> the > >>>>> big > >>>>>>>>> advantage of using an exception here. > >>>>>>>>> > >>>>>>>>> I echo Guozhang's third point about clarifying the behavior of > >>> the > >>>>>>>>> method and the naming. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Bruno > >>>>>>>>> > >>>>>>>>> On 16.09.20 06:28, Guozhang Wang wrote: > >>>>>>>>>> Hello Walker, > >>>>>>>>>> > >>>>>>>>>> Thanks for proposing the KIP! I have a couple more comments: > >>>>>>>>>> > >>>>>>>>>> 1. ShutdownRequestedException: my understanding is that this > >>>>>> exception > >>>>>>>>> is > >>>>>>>>>> only used if the application-shutdown was initiated by by > >> the > >>>> user > >>>>>>>>>> triggered "shutdownApplication()", otherwise e.g. if it is > >> due > >>>> to > >>>>>>> source > >>>>>>>>>> topic not found and Streams library decides to close the > >> whole > >>>>>>>>> application > >>>>>>>>>> automatically, we would still throw the original exception > >>>>>>>>>> a.k.a. MissingSourceTopicException to the uncaught exception > >>>>>> handling. > >>>>>>>>> Is > >>>>>>>>>> that the case? Also for this exception, which package are > >> you > >>>>>>> proposing > >>>>>>>>> to > >>>>>>>>>> add to? > >>>>>>>>>> > >>>>>>>>>> 2. ShutdownRequestedException: for its constructor, I'm > >>>> wondering > >>>>>> what > >>>>>>>>>> Throwable "root cause" could it ever be? Since I'm guessing > >>> here > >>>>>> that > >>>>>>> we > >>>>>>>>>> would just use a single error code in the protocol still to > >>> tell > >>>>>> other > >>>>>>>>>> instances to shutdown, and that error code would not allow > >> us > >>> to > >>>>>>> encode > >>>>>>>>> any > >>>>>>>>>> more information like root causes at all, it seems that > >>>> parameter > >>>>>>> would > >>>>>>>>>> always be null. > >>>>>>>>>> > >>>>>>>>>> 3. shutdownApplication: again I'd like to clarify, would > >> this > >>>>>> function > >>>>>>>>>> block on the local instance to complete shutting down all > >> its > >>>>>> threads > >>>>>>>>> like > >>>>>>>>>> `close()` as well, or would it just to initiate the shutdown > >>> and > >>>>> not > >>>>>>>>> wait > >>>>>>>>>> for local threads at all? Also a nit suggestion regarding > >> the > >>>>> name, > >>>>>> if > >>>>>>>>> it > >>>>>>>>>> is only for initiating the shutdown, maybe naming as > >>>>>>> "initiateCloseAll" > >>>>>>>>>> would be more specific? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Guozhang > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson < > >>>>>>> wcarl...@confluent.io> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hello Matthias and Sophie, > >>>>>>>>>>> > >>>>>>>>>>> You both make good points. I will respond to the separately > >>>>> below. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Matthias: > >>>>>>>>>>> That is a fair point. KIP-662 > >>>>>>>>>>> < > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted > >>>>>>>>>>>> , > >>>>>>>>>>> which > >>>>>>>>>>> is accepted, will make it so Source topic deletion will > >> make > >>> it > >>>>> to > >>>>>>> the > >>>>>>>>>>> uncaught exception handler. Shutdown can be initiated from > >>>> there. > >>>>>>>>> However > >>>>>>>>>>> this would mean that the stream thread is already dead. So > >> I > >>>>> would > >>>>>>>>> have to > >>>>>>>>>>> rethink the exception for this use case, perhaps it would > >> be > >>>>> needed > >>>>>>> in > >>>>>>>>> the > >>>>>>>>>>> KakfaStreams object. But this still leaves the case where > >>> there > >>>>> is > >>>>>>>>> only one > >>>>>>>>>>> stream thread. I will think about it. > >>>>>>>>>>> > >>>>>>>>>>> Maybe the source topics are a bad example as it makes this > >>> kip > >>>>>>>>> dependent on > >>>>>>>>>>> Kip-662 getting implemented in a certain way. However this > >> is > >>>> not > >>>>>> the > >>>>>>>>> only > >>>>>>>>>>> reason this could be useful here > >>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-4748> is a > >> jira > >>>>>> ticket > >>>>>>>>> asking > >>>>>>>>>>> for the same functionality. I have added a few other use > >>> cases > >>>> to > >>>>>> the > >>>>>>>>> kip. > >>>>>>>>>>> Although I will still be rethinking where I want to add > >> this > >>>>>>>>> functionality > >>>>>>>>>>> and whether it should be an exception or not. > >>>>>>>>>>> > >>>>>>>>>>> Sophie: > >>>>>>>>>>> I agree that shutting down an instance could also be > >> useful. > >>>>> There > >>>>>>> was > >>>>>>>>> some > >>>>>>>>>>> discussion about this on KIP-663. It seems that we came to > >>> the > >>>>>>>>> conclusion > >>>>>>>>>>> that close(Duration.ZERO) would be sufficient. link > >>>>>>>>>>> < > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e > >>>>>>>>>>>> > >>>>>>>>>>> to > >>>>>>>>>>> thread > >>>>>>>>>>> > >>>>>>>>>>> Also I am not set on the name ShutdownRequested. If we > >> decide > >>>> to > >>>>>> keep > >>>>>>>>> at as > >>>>>>>>>>> an exception your idea is probably a better name. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the feedback, > >>>>>>>>>>> Walker > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax < > >>>>> mj...@apache.org > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>> > >>>>>>>>>>>> It seem that the new exception would need to be thrown by > >>> user > >>>>>> code? > >>>>>>>>>>>> However, in the motivation you mention the scenario of a > >>>> missing > >>>>>>>>> source > >>>>>>>>>>>> topic that a user cannot detect, but KafkaStreams runtime > >>>> would > >>>>> be > >>>>>>>>>>>> responsible to handle. > >>>>>>>>>>>> > >>>>>>>>>>>> How do both things go together? > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> On 9/11/20 10:31 AM, Walker Carlson wrote: > >>>>>>>>>>>>> Hello all, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I have created KIP-671 to give the option to shutdown a > >>>> streams > >>>>>>>>>>>>> application in response to an error. > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown > >>>>>>>>>>>>> > >>>>>>>>>>>>> This is because of the Jira ticket > >>>>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-9331> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Please give it a look and let me know if you have any > >>>> feedback. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Walker > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > > >