Hi Walker,

Thanks for updating the KIP!

1. I would add response REPLACE_STREAM_THREAD to the StreamsUncaughtExceptionHandlerResponse enum to start a new stream thread that replaces the failed one. I suspect you did not add it because it depends on KIP-663. A dependency to another unfinished KIP should not stop you from adding this response.

2. Why does the Kafka Streams client transit to NOT_RUNNING when it is shutdown due to SHUTDOWN_KAFKA_STREAMS_CLIENT and SHUTDOWN_KAFKA_STREAMS_APPLICATION? I would rather expect that it transits to ERROR, since we are exclusively talking about error cases now. I would also not emulate the current behavior of close(), since close() is not intended to be used in the error case due to deadlocks you could run into.

3. Since the motivation of the KIP changed quite a lot, I think you should remove KAFKA-4748 from the motivation or make it clear that this KIP does only cover the shutdown of the Kafka Streams application in the error case.

4. I would just overload method setUncaughtExceptionHandler() and not introduce a method with a new name.

5. I agree with Guozhang that we should deprecate the overload with the Java-specific handler. I am sure you wanted to deprecate the method and just forgot about it.

6. I agree with Guozhang that the RocksDB metrics recording thread should also be shut down. To be fair, when Walker asked me about it, I thought it is not strictly necessary to shut it down, but thinking about it again, it also does not make a lot of sense to keep it running, because the RocksDB metrics would have all be removed at that point.

7. I think we should provide a default implementation of the handler. However, the default implementation should just return SHUTDOWN_STREAM_THREAD which corresponds to the current behavior. If we want to provide a more elaborated default handler, I would propose to discuss that on a separate KIP to not block this KIP on that discussion.

Best,
Bruno

On 29.09.20 05:35, Guozhang Wang wrote:
Hello Walker,

Thanks for the updated KIP proposal. A few more comments below:

1. "The RocksDB metrics recording thread is not shutdown." Why it should
not be shut down in either client or application shutdown cases?

2. Should we deprecate the existing overloaded function with the java
UncaughtExceptionHandler?

3. Should we consider providing a default implementation of this handler
interface which is automatically set if not overridden by users, e.g. one
that would choose to SHUTDOWN_KAFKA_STREAMS_APPLICATION upon
MissingSourceTopicException in KIP-662.


Guozhang


On Mon, Sep 28, 2020 at 3:57 PM Walker Carlson <wcarl...@confluent.io>
wrote:

I think that Guozhang and Matthias make good points.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Introduce+Kafka+Streams+Specific+Uncaught+Exception+Handler

I have updated the kip to include a StreamsUncaughtExceptionHandler



On Sun, Sep 27, 2020 at 7:28 PM Guozhang Wang <wangg...@gmail.com> wrote:

I think single-threaded clients may be common in practice, and what
Matthias raised is a valid concern.

We had a related discussion in KIP-663, that maybe we can tweak the
`UncaughtExceptionExceptionHandler` a bit such that instead of just
registered users' function into the individual threads, we trigger them
BEFORE the thread dies in the "catch (Exception)" block. It was proposed
originally to make sure that in the function if a user calls
localThreadMetadata() the dying / throwing thread would still be
included,
but maybe we could consider this reason as well.


Guozhang


On Fri, Sep 25, 2020 at 3:20 PM Matthias J. Sax <mj...@apache.org>
wrote:

I am wondering about the usage pattern of this new method.

As already discussed, the method only works if there is at least one
running thread... Do we have any sense how many apps actually run
multi-threaded vs single-threaded? It seems that the feature might be
quite limited without having a handler that is called _before_ the
thread dies? However, for this case, I am wondering if it might be
easier to just return a enum type from such a handler instead of
calling
`KakfaStreams#initiateClosingAllClients()`?

In general, it seems that there is some gap between the case of
stopping
all instances from "outside" (as proposed in the KIP), vs from "inside"
(what I though was the original line of thinking for this KIP?).

For the network partitioning case, should we at least shutdown all
local
threads? It might be sufficient that only one thread sends the
"shutdown
signal" while all others just shut down directly? Why should the other
thread wait for shutdown signal for a rebalance? Or should we recommend
to call `initiateClosingAllClients()` followed to `close()` to make
sure
that at least the local threads stop (what might be a little bit odd)?

-Matthias

On 9/24/20 7:51 AM, John Roesler wrote:
Hello all,

Thanks for bringing this up, Bruno. It’s a really good point that a
disconnected node would miss the signal and then resurrect a
single-node
“zombie cluster” when it reconnects.

Offhand, I can’t think of a simple and reliable way to distinguish
this
case from one in which an operator starts a node manually after a prior
shutdown signal. Can you? Right now, I’m inclined to agree with Walker
that
we should leave this as a problem for the future.

It should certainly be mentioned in the kip, and it also deserves
special mention in our javadoc and html docs for this feature.

Thanks!
John

On Wed, Sep 23, 2020, at 17:49, Walker Carlson wrote:
Bruno,

I think that we can't guarantee that the message will get
propagated perfectly in every case of, say network partitioning,
though
it
will work for many cases. So I would say it's best effort and I will
mention it in the kip.

As for when to use it I think we can discuss if this will be
sufficient when we come to it, as long as we document its
capabilities.

I hope this answers your question,

Walker

On Tue, Sep 22, 2020 at 12:33 AM Bruno Cadonna <br...@confluent.io>
wrote:

Walker,

I am sorry, but I still have a comment on the KIP although you have
already started voting.

What happens when a consumer of the group skips the rebalancing
that
propagates the shutdown request? Do you give a guarantee that all
Kafka
Streams clients are shutdown or is it best effort? If it is best
effort,
I guess the proposed method might not be used in critical cases
where
stopping record consumption may prevent or limit damage. I am not
saying
that it must be a guarantee, but this question should be answered
in
the
KIP, IMO.

Best,
Bruno

On 22.09.20 01:14, Walker Carlson wrote:
The error code right now is the assignor error, 2 is coded for
shutdown
but it could be expanded to encode the causes or for other errors
that
need
to be communicated. For example we can add error code 3 to close
the
thread
but leave the client in an error state if we choose to do so in
the
future.

On Mon, Sep 21, 2020 at 3:43 PM Boyang Chen <
reluctanthero...@gmail.com>
wrote:

Thanks for the KIP Walker.

In the KIP we mentioned "In order to communicate the shutdown
request
from
one client to the others we propose to update the
SubcriptionInfoData to
include a short field which will encode an error code.", is
there a
dedicated error code that we should define here, or it is
case-by-case?

On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <
wcarl...@confluent.io>
wrote:

I am changing the name to "Add method to Shutdown entire Streams
Application" since we are no longer using an Exception, it seems
more
appropriate.

Also it looks like the discussion is pretty much finished so I
will
be
calling it to a vote.

thanks,
Walker

On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang <
wangg...@gmail.com

wrote:

Sounds good to me. I also feel that this call should be
non-blocking
but
I
guess I was confused from the discussion thread that the API is
designed
in
a blocking fashion which contradicts with my perspective and
hence
I
asked
for clarification :)

Guozhang


On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <
wcarl...@confluent.io

wrote:

Hello Guozhang,

As for the logging I plan on having three logs. First, the
client
log
that
it is requesting an application shutdown, second, the leader
log
processId
of the invoker, third, then the StreamRebalanceListener it
logs
that
it
is
closing because of an `stream.appShutdown`. Hopefully this
will
be
enough
to make the cause of the close clear.

I see what you mean about the name being dependent on the
behavior of
the
method so I will try to clarify.  This is how I currently
envision
the
call
working.

It is not an option to directly initiate a shutdown through a
StreamThread
object from a KafkaStreams object because "KafkaConsumer is
not
safe
for
multi-threaded access". So how it works is that the method in
KafkaStreams
finds the first alive thread and sets a flag in the
StreamThread.
The
StreamThread will receive the flag in its runloop then set the
error
code
and trigger a rebalance, afterwards it will stop processing.
After
the
KafkaStreams has set the flag it will return true and continue
running.
If
there are no alive threads the shutdown will fail and return
false.

What do you think the blocking behavior should be? I think
that
the
StreamThread should definitely stop to prevent any of the
corruption
we
are
trying to avoid by shutting down, but I don't see any
advantage
of
the
KafkaStreams call blocking.

You are correct to be concerned about the uncaught exception
handler.
If
there are no live StreamThreads the rebalance will not be
started
at
all
and this would be a problem. However the user should be aware
of
this
because of the return of false and react appropriately. This
would
also
be
fixed if we implemented our own handler so we can rebalance
before
the
StreamThread closes.

With that in mind I believe that `initiateClosingAllClients`
would be
an
appropriate name. WDYT?

Walker


On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Walker,

Thanks for the updated KIP. Previously I'm also a bit
hesitant
on
the
newly
added public exception to communicate user-requested whole
app
shutdown,
but the reason I did not bring this up is that I feel there's
still a
need
from operational aspects that we can differentiate the
scenario
where
an
instance is closed because of a) local `streams.close()`
triggered,
or
b) a
remote instance's `stream.shutdownApp` triggered. So if we
are
going
to
remove that exception (which I'm also in favor), we should at
least
differentiate from the log4j levels.

Regarding the semantics that "It should wait to receive the
shutdown
request in the rebalance it triggers." I'm not sure I fully
understand,
since this may be triggered from the stream thread's uncaught
exception
handler, if that thread is already dead then maybe a
rebalance
listener
would not even be fired at all. Although I know this is some
implementation
details that you probably abstract away from the proposal,
I'd
like
to
make
sure that we are on the same page regarding its blocking
behavior
since
it
is quite crucial to users as well. Could you elaborate a bit
more?

Regarding the function name, I guess my personal preference
would
depend
on
its actual blocking behavior as above :)


Guozhang




On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <
wcarl...@confluent.io

wrote:

Hello all again,

I have updated the kip to no longer use an exception and
instead
add
a
method to the KafkaStreams class, this seems to satisfy
everyone's
concerns
about how and when the functionality will be invoked.

There is still a question over the name. We must decide
between
"shutdownApplication", "initateCloseAll", "closeAllInstaces"
or
some
variation.

I am rather indifferent to the name. I think that they all
get
the
point
across. The most clear to me would be shutdownApplicaiton or
closeAllInstacnes but WDYT?

Walker



On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <
wcarl...@confluent.io>
wrote:

Hello Guozhang and Bruno,

Thanks for the feedback.

I will respond in two parts but I would like to clarify
that
I
am
not
tied
down to any of these names, but since we are still deciding
if
we
want
to
have an exception or not I would rather not get tripped up
on
choosing
a
name just yet.

Guozhang:
1)  you are right about the
INCOMPLETE_SOURCE_TOPIC_METADATA
error. I
am
not planning on changing the behavior of handling source
topic
deletion.
That is being down in kip-662 by Bruno. He is enabling the
user
to
create
their own handler and shutdownApplication is giving them
the
option
to
shutdown.

2) It seems that we will remove the Exception entirely so
this
won't
matter (below)

3) It should wait to receive the shutdown request in the
rebalance
it
triggers. That might be a better name. I am torn between
using
"application" or "all Instances" in a couple places. I
think
we
should
pick
one and be consistent but I am unsure which is more
descriptive.

Bruno:
I agree that in principle Exceptions should be used in
exception
cases.
And I have added a method in KafkaStreams to handle cases
where
an
Exception would not be appropriate. I guess you think that
users
should
never throw a Streams Exception then they could always
throw
and
catch
their own exception and call shutdown Application from
there.
This
would
allow them to exit a processor if they wanted to shutdown
from
there. I
will update the Kip to remove the exception.

I would like to add that in the case of trying to shutdown
from
the
uncaught exception handler that we need at least one
StreamThread
to
be
alive. So having our own handler instead of using the
default
one
after
the
thread has died would let us always close the application.

Walker

On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <
br...@confluent.io>
wrote:

Hi Walker,

Thank you for the KIP!

I like the motivation of the KIP and the method to
request a
shutdown
of
all Kafka Streams clients of Kafka Streams application. I
think
we
really need such functionality to react on errors.
However,
I
am
not
convinced that throwing an exception to shutdown all
clients
is
a
good
idea.

An exception signals an exceptional situation to which we
can
react
in
multiple ways depending on the context. The exception that
you
propose
seems rather a well defined user command than a
exceptional
situation
to
me. IMO, we should not use exceptions to control program
flow
because
it
mixes cause and effect. Hence, I would propose an
invariant
for
public
exceptions in Kafka Streams. The public exceptions in
Kafka
Streams
should be caught by users, not thrown. But maybe I am
missing
the
big
advantage of using an exception here.

I echo Guozhang's third point about clarifying the
behavior
of
the
method and the naming.

Best,
Bruno

On 16.09.20 06:28, Guozhang Wang wrote:
Hello Walker,

Thanks for proposing the KIP! I have a couple more
comments:

1. ShutdownRequestedException: my understanding is that
this
exception
is
only used if the application-shutdown was initiated by by
the
user
triggered "shutdownApplication()", otherwise e.g. if it
is
due
to
source
topic not found and Streams library decides to close the
whole
application
automatically, we would still throw the original
exception
a.k.a. MissingSourceTopicException to the uncaught
exception
handling.
Is
that the case? Also for this exception, which package are
you
proposing
to
add to?

2. ShutdownRequestedException: for its constructor, I'm
wondering
what
Throwable "root cause" could it ever be? Since I'm
guessing
here
that
we
would just use a single error code in the protocol still
to
tell
other
instances to shutdown, and that error code would not
allow
us
to
encode
any
more information like root causes at all, it seems that
parameter
would
always be null.

3. shutdownApplication: again I'd like to clarify, would
this
function
block on the local instance to complete shutting down all
its
threads
like
`close()` as well, or would it just to initiate the
shutdown
and
not
wait
for local threads at all? Also a nit suggestion regarding
the
name,
if
it
is only for initiating the shutdown, maybe naming as
"initiateCloseAll"
would be more specific?


Guozhang




On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson <
wcarl...@confluent.io>
wrote:

Hello Matthias and Sophie,

You both make good points. I will respond to the
separately
below.


Matthias:
That is a fair point. KIP-662
<











https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
,
which
is accepted, will make it so Source topic deletion will
make
it
to
the
uncaught exception handler. Shutdown can be initiated
from
there.
However
this would mean that the stream thread is already dead.
So
I
would
have to
rethink the exception for this use case, perhaps it
would
be
needed
in
the
KakfaStreams object. But this still leaves the case
where
there
is
only one
stream thread. I will think about it.

Maybe the source topics are a bad example as it makes
this
kip
dependent on
Kip-662 getting implemented in a certain way. However
this
is
not
the
only
reason this could be useful here
<https://issues.apache.org/jira/browse/KAFKA-4748> is a
jira
ticket
asking
for the same functionality. I have added a few other use
cases
to
the
kip.
Although I will still be rethinking where I want to add
this
functionality
and whether it should be an exception or not.

Sophie:
I agree that shutting down an instance could also be
useful.
There
was
some
discussion about this on KIP-663. It seems that we came
to
the
conclusion
that close(Duration.ZERO) would be sufficient. link
<











https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e

to
thread

Also I am not set on the name ShutdownRequested. If we
decide
to
keep
at as
an exception your idea is probably a better name.


Thanks for the feedback,
Walker


On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <
mj...@apache.org

wrote:

Thanks for the KIP.

It seem that the new exception would need to be thrown
by
user
code?
However, in the motivation you mention the scenario of
a
missing
source
topic that a user cannot detect, but KafkaStreams
runtime
would
be
responsible to handle.

How do both things go together?


-Matthias

On 9/11/20 10:31 AM, Walker Carlson wrote:
Hello all,

I have created KIP-671 to give the option to shutdown
a
streams
application in response to an error.













https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown

This is because of the Jira ticket
<https://issues.apache.org/jira/browse/KAFKA-9331>

Please give it a look and let me know if you have any
feedback.

Thanks,
Walker











--
-- Guozhang




--
-- Guozhang









--
-- Guozhang




Reply via email to