The error code right now is the assignor error, 2 is coded for shutdown but it could be expanded to encode the causes or for other errors that need to be communicated. For example we can add error code 3 to close the thread but leave the client in an error state if we choose to do so in the future.
On Mon, Sep 21, 2020 at 3:43 PM Boyang Chen <[email protected]> wrote: > Thanks for the KIP Walker. > > In the KIP we mentioned "In order to communicate the shutdown request from > one client to the others we propose to update the SubcriptionInfoData to > include a short field which will encode an error code.", is there a > dedicated error code that we should define here, or it is case-by-case? > > On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <[email protected]> > wrote: > > > I am changing the name to "Add method to Shutdown entire Streams > > Application" since we are no longer using an Exception, it seems more > > appropriate. > > > > Also it looks like the discussion is pretty much finished so I will be > > calling it to a vote. > > > > thanks, > > Walker > > > > On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang <[email protected]> > wrote: > > > > > Sounds good to me. I also feel that this call should be non-blocking > but > > I > > > guess I was confused from the discussion thread that the API is > designed > > in > > > a blocking fashion which contradicts with my perspective and hence I > > asked > > > for clarification :) > > > > > > Guozhang > > > > > > > > > On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <[email protected] > > > > > wrote: > > > > > > > Hello Guozhang, > > > > > > > > As for the logging I plan on having three logs. First, the client log > > > that > > > > it is requesting an application shutdown, second, the leader log > > > processId > > > > of the invoker, third, then the StreamRebalanceListener it logs that > it > > > is > > > > closing because of an `stream.appShutdown`. Hopefully this will be > > enough > > > > to make the cause of the close clear. > > > > > > > > I see what you mean about the name being dependent on the behavior of > > the > > > > method so I will try to clarify. This is how I currently envision > the > > > call > > > > working. > > > > > > > > It is not an option to directly initiate a shutdown through a > > > StreamThread > > > > object from a KafkaStreams object because "KafkaConsumer is not safe > > for > > > > multi-threaded access". So how it works is that the method in > > > KafkaStreams > > > > finds the first alive thread and sets a flag in the StreamThread. The > > > > StreamThread will receive the flag in its runloop then set the error > > code > > > > and trigger a rebalance, afterwards it will stop processing. After > the > > > > KafkaStreams has set the flag it will return true and continue > running. > > > If > > > > there are no alive threads the shutdown will fail and return false. > > > > > > > > What do you think the blocking behavior should be? I think that the > > > > StreamThread should definitely stop to prevent any of the corruption > we > > > are > > > > trying to avoid by shutting down, but I don't see any advantage of > the > > > > KafkaStreams call blocking. > > > > > > > > You are correct to be concerned about the uncaught exception handler. > > If > > > > there are no live StreamThreads the rebalance will not be started at > > all > > > > and this would be a problem. However the user should be aware of this > > > > because of the return of false and react appropriately. This would > also > > > be > > > > fixed if we implemented our own handler so we can rebalance before > the > > > > StreamThread closes. > > > > > > > > With that in mind I believe that `initiateClosingAllClients` would be > > an > > > > appropriate name. WDYT? > > > > > > > > Walker > > > > > > > > > > > > On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <[email protected]> > > > wrote: > > > > > > > > > Hello Walker, > > > > > > > > > > Thanks for the updated KIP. Previously I'm also a bit hesitant on > the > > > > newly > > > > > added public exception to communicate user-requested whole app > > > shutdown, > > > > > but the reason I did not bring this up is that I feel there's > still a > > > > need > > > > > from operational aspects that we can differentiate the scenario > where > > > an > > > > > instance is closed because of a) local `streams.close()` triggered, > > or > > > > b) a > > > > > remote instance's `stream.shutdownApp` triggered. So if we are > going > > to > > > > > remove that exception (which I'm also in favor), we should at least > > > > > differentiate from the log4j levels. > > > > > > > > > > Regarding the semantics that "It should wait to receive the > shutdown > > > > > request in the rebalance it triggers." I'm not sure I fully > > understand, > > > > > since this may be triggered from the stream thread's uncaught > > exception > > > > > handler, if that thread is already dead then maybe a rebalance > > listener > > > > > would not even be fired at all. Although I know this is some > > > > implementation > > > > > details that you probably abstract away from the proposal, I'd like > > to > > > > make > > > > > sure that we are on the same page regarding its blocking behavior > > since > > > > it > > > > > is quite crucial to users as well. Could you elaborate a bit more? > > > > > > > > > > Regarding the function name, I guess my personal preference would > > > depend > > > > on > > > > > its actual blocking behavior as above :) > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson < > > [email protected] > > > > > > > > > wrote: > > > > > > > > > > > Hello all again, > > > > > > > > > > > > I have updated the kip to no longer use an exception and instead > > add > > > a > > > > > > method to the KafkaStreams class, this seems to satisfy > everyone's > > > > > concerns > > > > > > about how and when the functionality will be invoked. > > > > > > > > > > > > There is still a question over the name. We must decide between > > > > > > "shutdownApplication", "initateCloseAll", "closeAllInstaces" or > > some > > > > > > variation. > > > > > > > > > > > > I am rather indifferent to the name. I think that they all get > the > > > > point > > > > > > across. The most clear to me would be shutdownApplicaiton or > > > > > > closeAllInstacnes but WDYT? > > > > > > > > > > > > Walker > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson < > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > Hello Guozhang and Bruno, > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > I will respond in two parts but I would like to clarify that I > am > > > not > > > > > > tied > > > > > > > down to any of these names, but since we are still deciding if > we > > > > want > > > > > to > > > > > > > have an exception or not I would rather not get tripped up on > > > > choosing > > > > > a > > > > > > > name just yet. > > > > > > > > > > > > > > Guozhang: > > > > > > > 1) you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA > > > error. I > > > > > am > > > > > > > not planning on changing the behavior of handling source topic > > > > > deletion. > > > > > > > That is being down in kip-662 by Bruno. He is enabling the user > > to > > > > > create > > > > > > > their own handler and shutdownApplication is giving them the > > option > > > > to > > > > > > > shutdown. > > > > > > > > > > > > > > 2) It seems that we will remove the Exception entirely so this > > > won't > > > > > > > matter (below) > > > > > > > > > > > > > > 3) It should wait to receive the shutdown request in the > > rebalance > > > it > > > > > > > triggers. That might be a better name. I am torn between using > > > > > > > "application" or "all Instances" in a couple places. I think we > > > > should > > > > > > pick > > > > > > > one and be consistent but I am unsure which is more > descriptive. > > > > > > > > > > > > > > Bruno: > > > > > > > I agree that in principle Exceptions should be used in > exception > > > > cases. > > > > > > > And I have added a method in KafkaStreams to handle cases where > > an > > > > > > > Exception would not be appropriate. I guess you think that > users > > > > should > > > > > > > never throw a Streams Exception then they could always throw > and > > > > catch > > > > > > > their own exception and call shutdown Application from there. > > This > > > > > would > > > > > > > allow them to exit a processor if they wanted to shutdown from > > > > there. I > > > > > > > will update the Kip to remove the exception. > > > > > > > > > > > > > > I would like to add that in the case of trying to shutdown from > > the > > > > > > > uncaught exception handler that we need at least one > StreamThread > > > to > > > > be > > > > > > > alive. So having our own handler instead of using the default > one > > > > after > > > > > > the > > > > > > > thread has died would let us always close the application. > > > > > > > > > > > > > > Walker > > > > > > > > > > > > > > On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna < > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > >> Hi Walker, > > > > > > >> > > > > > > >> Thank you for the KIP! > > > > > > >> > > > > > > >> I like the motivation of the KIP and the method to request a > > > > shutdown > > > > > of > > > > > > >> all Kafka Streams clients of Kafka Streams application. I > think > > we > > > > > > >> really need such functionality to react on errors. However, I > am > > > not > > > > > > >> convinced that throwing an exception to shutdown all clients > is > > a > > > > good > > > > > > >> idea. > > > > > > >> > > > > > > >> An exception signals an exceptional situation to which we can > > > react > > > > in > > > > > > >> multiple ways depending on the context. The exception that you > > > > propose > > > > > > >> seems rather a well defined user command than a exceptional > > > > situation > > > > > to > > > > > > >> me. IMO, we should not use exceptions to control program flow > > > > because > > > > > it > > > > > > >> mixes cause and effect. Hence, I would propose an invariant > for > > > > public > > > > > > >> exceptions in Kafka Streams. The public exceptions in Kafka > > > Streams > > > > > > >> should be caught by users, not thrown. But maybe I am missing > > the > > > > big > > > > > > >> advantage of using an exception here. > > > > > > >> > > > > > > >> I echo Guozhang's third point about clarifying the behavior of > > the > > > > > > >> method and the naming. > > > > > > >> > > > > > > >> Best, > > > > > > >> Bruno > > > > > > >> > > > > > > >> On 16.09.20 06:28, Guozhang Wang wrote: > > > > > > >> > Hello Walker, > > > > > > >> > > > > > > > >> > Thanks for proposing the KIP! I have a couple more comments: > > > > > > >> > > > > > > > >> > 1. ShutdownRequestedException: my understanding is that this > > > > > exception > > > > > > >> is > > > > > > >> > only used if the application-shutdown was initiated by by > the > > > user > > > > > > >> > triggered "shutdownApplication()", otherwise e.g. if it is > due > > > to > > > > > > source > > > > > > >> > topic not found and Streams library decides to close the > whole > > > > > > >> application > > > > > > >> > automatically, we would still throw the original exception > > > > > > >> > a.k.a. MissingSourceTopicException to the uncaught exception > > > > > handling. > > > > > > >> Is > > > > > > >> > that the case? Also for this exception, which package are > you > > > > > > proposing > > > > > > >> to > > > > > > >> > add to? > > > > > > >> > > > > > > > >> > 2. ShutdownRequestedException: for its constructor, I'm > > > wondering > > > > > what > > > > > > >> > Throwable "root cause" could it ever be? Since I'm guessing > > here > > > > > that > > > > > > we > > > > > > >> > would just use a single error code in the protocol still to > > tell > > > > > other > > > > > > >> > instances to shutdown, and that error code would not allow > us > > to > > > > > > encode > > > > > > >> any > > > > > > >> > more information like root causes at all, it seems that > > > parameter > > > > > > would > > > > > > >> > always be null. > > > > > > >> > > > > > > > >> > 3. shutdownApplication: again I'd like to clarify, would > this > > > > > function > > > > > > >> > block on the local instance to complete shutting down all > its > > > > > threads > > > > > > >> like > > > > > > >> > `close()` as well, or would it just to initiate the shutdown > > and > > > > not > > > > > > >> wait > > > > > > >> > for local threads at all? Also a nit suggestion regarding > the > > > > name, > > > > > if > > > > > > >> it > > > > > > >> > is only for initiating the shutdown, maybe naming as > > > > > > "initiateCloseAll" > > > > > > >> > would be more specific? > > > > > > >> > > > > > > > >> > > > > > > > >> > Guozhang > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson < > > > > > > [email protected]> > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> >> Hello Matthias and Sophie, > > > > > > >> >> > > > > > > >> >> You both make good points. I will respond to the separately > > > > below. > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> Matthias: > > > > > > >> >> That is a fair point. KIP-662 > > > > > > >> >> < > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted > > > > > > >> >>> , > > > > > > >> >> which > > > > > > >> >> is accepted, will make it so Source topic deletion will > make > > it > > > > to > > > > > > the > > > > > > >> >> uncaught exception handler. Shutdown can be initiated from > > > there. > > > > > > >> However > > > > > > >> >> this would mean that the stream thread is already dead. So > I > > > > would > > > > > > >> have to > > > > > > >> >> rethink the exception for this use case, perhaps it would > be > > > > needed > > > > > > in > > > > > > >> the > > > > > > >> >> KakfaStreams object. But this still leaves the case where > > there > > > > is > > > > > > >> only one > > > > > > >> >> stream thread. I will think about it. > > > > > > >> >> > > > > > > >> >> Maybe the source topics are a bad example as it makes this > > kip > > > > > > >> dependent on > > > > > > >> >> Kip-662 getting implemented in a certain way. However this > is > > > not > > > > > the > > > > > > >> only > > > > > > >> >> reason this could be useful here > > > > > > >> >> <https://issues.apache.org/jira/browse/KAFKA-4748> is a > jira > > > > > ticket > > > > > > >> asking > > > > > > >> >> for the same functionality. I have added a few other use > > cases > > > to > > > > > the > > > > > > >> kip. > > > > > > >> >> Although I will still be rethinking where I want to add > this > > > > > > >> functionality > > > > > > >> >> and whether it should be an exception or not. > > > > > > >> >> > > > > > > >> >> Sophie: > > > > > > >> >> I agree that shutting down an instance could also be > useful. > > > > There > > > > > > was > > > > > > >> some > > > > > > >> >> discussion about this on KIP-663. It seems that we came to > > the > > > > > > >> conclusion > > > > > > >> >> that close(Duration.ZERO) would be sufficient. link > > > > > > >> >> < > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%[email protected]%3e > > > > > > >> >>> > > > > > > >> >> to > > > > > > >> >> thread > > > > > > >> >> > > > > > > >> >> Also I am not set on the name ShutdownRequested. If we > decide > > > to > > > > > keep > > > > > > >> at as > > > > > > >> >> an exception your idea is probably a better name. > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> Thanks for the feedback, > > > > > > >> >> Walker > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax < > > > > [email protected] > > > > > > > > > > > > >> wrote: > > > > > > >> >> > > > > > > >> >>> Thanks for the KIP. > > > > > > >> >>> > > > > > > >> >>> It seem that the new exception would need to be thrown by > > user > > > > > code? > > > > > > >> >>> However, in the motivation you mention the scenario of a > > > missing > > > > > > >> source > > > > > > >> >>> topic that a user cannot detect, but KafkaStreams runtime > > > would > > > > be > > > > > > >> >>> responsible to handle. > > > > > > >> >>> > > > > > > >> >>> How do both things go together? > > > > > > >> >>> > > > > > > >> >>> > > > > > > >> >>> -Matthias > > > > > > >> >>> > > > > > > >> >>> On 9/11/20 10:31 AM, Walker Carlson wrote: > > > > > > >> >>>> Hello all, > > > > > > >> >>>> > > > > > > >> >>>> I have created KIP-671 to give the option to shutdown a > > > streams > > > > > > >> >>>> application in response to an error. > > > > > > >> >>>> > > > > > > >> >>> > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown > > > > > > >> >>>> > > > > > > >> >>>> This is because of the Jira ticket > > > > > > >> >>>> <https://issues.apache.org/jira/browse/KAFKA-9331> > > > > > > >> >>>> > > > > > > >> >>>> Please give it a look and let me know if you have any > > > feedback. > > > > > > >> >>>> > > > > > > >> >>>> Thanks, > > > > > > >> >>>> Walker > > > > > > >> >>>> > > > > > > >> >>> > > > > > > >> >>> > > > > > > >> >> > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >
