Hi John,

I totally agree with you and Walker. I also think that we should leave this as a problem for the future and that we should document this limitation.

Best,
Bruno

On 24.09.20 16:51, John Roesler wrote:
Hello all,

Thanks for bringing this up, Bruno. It’s a really good point that a 
disconnected node would miss the signal and then resurrect a single-node 
“zombie cluster” when it reconnects.

Offhand, I can’t think of a simple and reliable way to distinguish this case 
from one in which an operator starts a node manually after a prior shutdown 
signal. Can you? Right now, I’m inclined to agree with Walker that we should 
leave this as a problem for the future.

It should certainly be mentioned in the kip, and it also deserves special 
mention in our javadoc and html docs for this feature.

Thanks!
John

On Wed, Sep 23, 2020, at 17:49, Walker Carlson wrote:
Bruno,

I think that we can't guarantee that the message will get
propagated perfectly in every case of, say network partitioning, though it
will work for many cases. So I would say it's best effort and I will
mention it in the kip.

As for when to use it I think we can discuss if this will be
sufficient when we come to it, as long as we document its capabilities.

I hope this answers your question,

Walker

On Tue, Sep 22, 2020 at 12:33 AM Bruno Cadonna <br...@confluent.io> wrote:

Walker,

I am sorry, but I still have a comment on the KIP although you have
already started voting.

What happens when a consumer of the group skips the rebalancing that
propagates the shutdown request? Do you give a guarantee that all Kafka
Streams clients are shutdown or is it best effort? If it is best effort,
I guess the proposed method might not be used in critical cases where
stopping record consumption may prevent or limit damage. I am not saying
that it must be a guarantee, but this question should be answered in the
KIP, IMO.

Best,
Bruno

On 22.09.20 01:14, Walker Carlson wrote:
The error code right now is the assignor error, 2 is coded for shutdown
but it could be expanded to encode the causes or for other errors that
need
to be communicated. For example we can add error code 3 to close the
thread
but leave the client in an error state if we choose to do so in the
future.

On Mon, Sep 21, 2020 at 3:43 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

Thanks for the KIP Walker.

In the KIP we mentioned "In order to communicate the shutdown request
from
one client to the others we propose to update the SubcriptionInfoData to
include a short field which will encode an error code.", is there a
dedicated error code that we should define here, or it is case-by-case?

On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <wcarl...@confluent.io>
wrote:

I am changing the name to "Add method to Shutdown entire Streams
Application" since we are no longer using an Exception, it seems more
appropriate.

Also it looks like the discussion is pretty much finished so I will be
calling it to a vote.

thanks,
Walker

On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang <wangg...@gmail.com>
wrote:

Sounds good to me. I also feel that this call should be non-blocking
but
I
guess I was confused from the discussion thread that the API is
designed
in
a blocking fashion which contradicts with my perspective and hence I
asked
for clarification :)

Guozhang


On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <
wcarl...@confluent.io

wrote:

Hello Guozhang,

As for the logging I plan on having three logs. First, the client log
that
it is requesting an application shutdown, second, the leader log
processId
of the invoker, third, then the StreamRebalanceListener it logs that
it
is
closing because of an `stream.appShutdown`. Hopefully this will be
enough
to make the cause of the close clear.

I see what you mean about the name being dependent on the behavior of
the
method so I will try to clarify.  This is how I currently envision
the
call
working.

It is not an option to directly initiate a shutdown through a
StreamThread
object from a KafkaStreams object because "KafkaConsumer is not safe
for
multi-threaded access". So how it works is that the method in
KafkaStreams
finds the first alive thread and sets a flag in the StreamThread. The
StreamThread will receive the flag in its runloop then set the error
code
and trigger a rebalance, afterwards it will stop processing. After
the
KafkaStreams has set the flag it will return true and continue
running.
If
there are no alive threads the shutdown will fail and return false.

What do you think the blocking behavior should be? I think that the
StreamThread should definitely stop to prevent any of the corruption
we
are
trying to avoid by shutting down, but I don't see any advantage of
the
KafkaStreams call blocking.

You are correct to be concerned about the uncaught exception handler.
If
there are no live StreamThreads the rebalance will not be started at
all
and this would be a problem. However the user should be aware of this
because of the return of false and react appropriately. This would
also
be
fixed if we implemented our own handler so we can rebalance before
the
StreamThread closes.

With that in mind I believe that `initiateClosingAllClients` would be
an
appropriate name. WDYT?

Walker


On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com>
wrote:

Hello Walker,

Thanks for the updated KIP. Previously I'm also a bit hesitant on
the
newly
added public exception to communicate user-requested whole app
shutdown,
but the reason I did not bring this up is that I feel there's
still a
need
from operational aspects that we can differentiate the scenario
where
an
instance is closed because of a) local `streams.close()` triggered,
or
b) a
remote instance's `stream.shutdownApp` triggered. So if we are
going
to
remove that exception (which I'm also in favor), we should at least
differentiate from the log4j levels.

Regarding the semantics that "It should wait to receive the
shutdown
request in the rebalance it triggers." I'm not sure I fully
understand,
since this may be triggered from the stream thread's uncaught
exception
handler, if that thread is already dead then maybe a rebalance
listener
would not even be fired at all. Although I know this is some
implementation
details that you probably abstract away from the proposal, I'd like
to
make
sure that we are on the same page regarding its blocking behavior
since
it
is quite crucial to users as well. Could you elaborate a bit more?

Regarding the function name, I guess my personal preference would
depend
on
its actual blocking behavior as above :)


Guozhang




On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <
wcarl...@confluent.io

wrote:

Hello all again,

I have updated the kip to no longer use an exception and instead
add
a
method to the KafkaStreams class, this seems to satisfy
everyone's
concerns
about how and when the functionality will be invoked.

There is still a question over the name. We must decide between
"shutdownApplication", "initateCloseAll", "closeAllInstaces" or
some
variation.

I am rather indifferent to the name. I think that they all get
the
point
across. The most clear to me would be shutdownApplicaiton or
closeAllInstacnes but WDYT?

Walker



On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <
wcarl...@confluent.io>
wrote:

Hello Guozhang and Bruno,

Thanks for the feedback.

I will respond in two parts but I would like to clarify that I
am
not
tied
down to any of these names, but since we are still deciding if
we
want
to
have an exception or not I would rather not get tripped up on
choosing
a
name just yet.

Guozhang:
1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA
error. I
am
not planning on changing the behavior of handling source topic
deletion.
That is being down in kip-662 by Bruno. He is enabling the user
to
create
their own handler and shutdownApplication is giving them the
option
to
shutdown.

2) It seems that we will remove the Exception entirely so this
won't
matter (below)

3) It should wait to receive the shutdown request in the
rebalance
it
triggers. That might be a better name. I am torn between using
"application" or "all Instances" in a couple places. I think we
should
pick
one and be consistent but I am unsure which is more
descriptive.

Bruno:
I agree that in principle Exceptions should be used in
exception
cases.
And I have added a method in KafkaStreams to handle cases where
an
Exception would not be appropriate. I guess you think that
users
should
never throw a Streams Exception then they could always throw
and
catch
their own exception and call shutdown Application from there.
This
would
allow them to exit a processor if they wanted to shutdown from
there. I
will update the Kip to remove the exception.

I would like to add that in the case of trying to shutdown from
the
uncaught exception handler that we need at least one
StreamThread
to
be
alive. So having our own handler instead of using the default
one
after
the
thread has died would let us always close the application.

Walker

On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <
br...@confluent.io>
wrote:

Hi Walker,

Thank you for the KIP!

I like the motivation of the KIP and the method to request a
shutdown
of
all Kafka Streams clients of Kafka Streams application. I
think
we
really need such functionality to react on errors. However, I
am
not
convinced that throwing an exception to shutdown all clients
is
a
good
idea.

An exception signals an exceptional situation to which we can
react
in
multiple ways depending on the context. The exception that you
propose
seems rather a well defined user command than a exceptional
situation
to
me. IMO, we should not use exceptions to control program flow
because
it
mixes cause and effect. Hence, I would propose an invariant
for
public
exceptions in Kafka Streams. The public exceptions in Kafka
Streams
should be caught by users, not thrown. But maybe I am missing
the
big
advantage of using an exception here.

I echo Guozhang's third point about clarifying the behavior of
the
method and the naming.

Best,
Bruno

On 16.09.20 06:28, Guozhang Wang wrote:
Hello Walker,

Thanks for proposing the KIP! I have a couple more comments:

1. ShutdownRequestedException: my understanding is that this
exception
is
only used if the application-shutdown was initiated by by
the
user
triggered "shutdownApplication()", otherwise e.g. if it is
due
to
source
topic not found and Streams library decides to close the
whole
application
automatically, we would still throw the original exception
a.k.a. MissingSourceTopicException to the uncaught exception
handling.
Is
that the case? Also for this exception, which package are
you
proposing
to
add to?

2. ShutdownRequestedException: for its constructor, I'm
wondering
what
Throwable "root cause" could it ever be? Since I'm guessing
here
that
we
would just use a single error code in the protocol still to
tell
other
instances to shutdown, and that error code would not allow
us
to
encode
any
more information like root causes at all, it seems that
parameter
would
always be null.

3. shutdownApplication: again I'd like to clarify, would
this
function
block on the local instance to complete shutting down all
its
threads
like
`close()` as well, or would it just to initiate the shutdown
and
not
wait
for local threads at all? Also a nit suggestion regarding
the
name,
if
it
is only for initiating the shutdown, maybe naming as
"initiateCloseAll"
would be more specific?


Guozhang




On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson <
wcarl...@confluent.io>
wrote:

Hello Matthias and Sophie,

You both make good points. I will respond to the separately
below.


Matthias:
That is a fair point. KIP-662
<








https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
,
which
is accepted, will make it so Source topic deletion will
make
it
to
the
uncaught exception handler. Shutdown can be initiated from
there.
However
this would mean that the stream thread is already dead. So
I
would
have to
rethink the exception for this use case, perhaps it would
be
needed
in
the
KakfaStreams object. But this still leaves the case where
there
is
only one
stream thread. I will think about it.

Maybe the source topics are a bad example as it makes this
kip
dependent on
Kip-662 getting implemented in a certain way. However this
is
not
the
only
reason this could be useful here
<https://issues.apache.org/jira/browse/KAFKA-4748> is a
jira
ticket
asking
for the same functionality. I have added a few other use
cases
to
the
kip.
Although I will still be rethinking where I want to add
this
functionality
and whether it should be an exception or not.

Sophie:
I agree that shutting down an instance could also be
useful.
There
was
some
discussion about this on KIP-663. It seems that we came to
the
conclusion
that close(Duration.ZERO) would be sufficient. link
<








https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e

to
thread

Also I am not set on the name ShutdownRequested. If we
decide
to
keep
at as
an exception your idea is probably a better name.


Thanks for the feedback,
Walker


On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <
mj...@apache.org

wrote:

Thanks for the KIP.

It seem that the new exception would need to be thrown by
user
code?
However, in the motivation you mention the scenario of a
missing
source
topic that a user cannot detect, but KafkaStreams runtime
would
be
responsible to handle.

How do both things go together?


-Matthias

On 9/11/20 10:31 AM, Walker Carlson wrote:
Hello all,

I have created KIP-671 to give the option to shutdown a
streams
application in response to an error.










https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown

This is because of the Jira ticket
<https://issues.apache.org/jira/browse/KAFKA-9331>

Please give it a look and let me know if you have any
feedback.

Thanks,
Walker











--
-- Guozhang




--
-- Guozhang






Reply via email to