Hi Sophie,

Thank you for the feedback! I replied inline.

Best,
Bruno

On 16.09.20 19:19, Sophie Blee-Goldman wrote:

We guarantee that the metadata of the dead stream threads  will be
returned by KafkaStreams#localThreadsMetadata() at least until the next
call to KafkaStreams#addStreamThread() or
KafkaStreams#removeStreamThread() after the stream thread transited to
DEAD


This seems kind of tricky...personally I would find it pretty odd if I
queried the
local thread metadata and found two threads, A (alive) and B (dead), and
then
called removeStreamThread() and now suddenly I have zero. Or if I call
addStreamThread and now I still have two threads.


The behavior might be unusual, but it is well defined and not random by any means.

Both of those results seem to indicate that only live threads "count" and
are returned
by localThreadsMetadata(). But in reality we do temporarily keep the dead
thread,
but only for the arbitrary amount of time until the next time you want to
add or
remove some other stream thread? That seems like a weird side effect of the
add/removeStreamThread APIs.


This is not a side effect that just happens to occur. This is a guarantee that users get. It gives users the possibility to retrieve the metadata of the dead stream threads since the last call to add/removeStreamThread. Admittedly, this guarantee overlap with the current/planned implementation. But that is more a coincidence.

I would be more concerned about when add/removeStreamThread is called from different threads which could happen if an uncaught exception handler is called that wants to replace a stream thread and a thread that is responsible for automated scaling up is running.

If we really think users might want to log the metadata of dead threads,
then
let's just do that for them or give them a way to do exactly that.


Logging the metatdata of dead stream threads for the user is a valid alternative. Giving users the way to do exactly that is hard because the StreamThread class is not part of the public API. They would always need to call a method on the KafkaStreams object where we already have localThreadsMetadata().

I'm not that concerned about the backwards compatibility of removing dead
threads from the localThreadsMetadata, because I find it hard to believe
that
users do anything other than just skip over them in the list (set?) that
gets
returned. But maybe someone can chime in with an example use case.


I am also not too much concerned about backwards compatibility. That would indeed be a side effect of the current proposal.

I'm actually even a little skeptical that any users might want to log the
metadata of a
dead thread, since all of the metadata is only useful for IQ on live
threads or
already covered by other easily discoverable logging elsewhere, or both.


Said all of the above, I actually agree with you that there is not that much information in the metadata of a dead stream thread that is interesting. The name of the stream thread is known in the uncaught exception handler. The names of the clients, like consumer etc., used by the stream thread can be derived from the name of the stream thread. Finally, the sets of active and standby tasks should be empty for a dead stream thread.

Hence, I backpedal and propose to filter out dead stream threads from localThreadsMetadata(). WDYT?

On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna <br...@confluent.io> wrote:

Hi again,

I just realized that if we filter out DEAD stream threads in
localThreadsMetadata(), users cannot log the metadata of dying stream
threads in the uncaught exception handler.

I realized this thanks to the example Guozhang requested in the KIP.
Thank you for that, Guozhang!

Hence, I adapted the KIP as follows:

- We do not filter out DEAD stream threads in
KafkaStreams#localThreadsMetadata()

- We guarantee that the metadata of the dead stream threads  will be
returned by KafkaStreams#localThreadsMetadata() at least until the next
call to KafkaStreams#addStreamThread() or
KafkaStreams#removeStreamThread() after the stream thread transited to
DEAD. Besides giving users the opportunity to log the metadata of a
dying stream thread in its uncaught exception handler, this guarantee
makes KafkaStreams#localThreadsMetadata() completely backward compatible
to the current behavior, because if KafkaStreams#addStreamThread() and
KafkaStreams#removeStreamThread() are never called,
KafkaStreams#localThreadsMetadata() will also return the metadata of all
streams threads that have ever died which corresponds to the current
behavior.

- We guarantee that dead stream threads are removed from a Kafka Streams
client at latest after the next call to KafkaStreams#addStreamThread()
or KafkaStreams#removeStreamThread() following the transition of the
stream thread to DEAD. This guarantees that the number of maintained
stream threads does not grow indefinitely.


Best,
Bruno



On 16.09.20 09:23, Bruno Cadonna wrote:
Hi Guozhang,

Good point! I would propose to filter out DEAD stream threads in
localThreadsMetadata() to get consistent results that do not depend on
timing. I will update the KIP accordingly.

Best,
Bruno

On 16.09.20 06:02, Guozhang Wang wrote:
Thanks Bruno, your replies make sense to me. As for
localThreadsMetadata() itself,
I'd like to clarify if it would return any still-bookkept threads, or
would
it specifically filter out those DEAD threads even if they are not yet
removed.

Otherwise, the KIP LGTM.

Guozhang

On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna <br...@confluent.io>
wrote:

Hi Guozhang,

Thank you for your feedback. I replied inline.

Best,
Bruno

On 09.09.20 23:43, Guozhang Wang wrote:
Hello Bruno,

Finally got some time to review your KIP and the discussion thread
now..
a
few comments below:

1) I'm with Matthias about the newly added numberOfAliveStreamThreads
v.s.
existing localThreadsMetadata: to me it seems we can always achieve
the
first based on the second. It seems not worthy to provide some "syntax
sugar" to the API but just let users do the filtering themselves.

I am not married to that method. I removed it.

Furthermore, I'm wondering what's the rationale behind removing the
DEAD
threads from localThreadsMetadata()? Personally I feel returning all
threads, including those who are ever closed, either due to
exception or
due to removeStreamThread, would be beneficial for debugging
purposes, as
long as within the lifetime of an instance we expect the amount of
such
dead threads will not increase linearly --- and if we agree with that,
maybe we can rename "removeStreamThread" to sth. like
"terminateStreamThread" indicating it is only terminated but not
removed
--- and of course if users do not want to see those DEAD threads
they can
always filter them out. I'm just proposing that we should still
leave the
door open for those who want to check those ever terminated threads.


I actually think the number of dead stream threads might increase
linearly. Assume users have a systematic error that continuously kills
a
stream thread and they blindly start a new stream thread in the
uncaught
exception handler. This scenario might be a mistake but if the
systematic error does not occur at a high rate, it could also be a
strategy to keep the application running during the investigation of
the
systematic error.

IMO, removing dead stream threads makes Kafka Streams more robust
because it prevent a possibly unbounded increase of memory usage. If
users want to debug the dead stream threads they can monitor the number
of dead threads with the metric proposed in the KIP and they could
additionally log the metadata of the dying stream thread in the
uncaught
exception handler. I do not think that there is need to keep dead
stream
threads around.

2) I think it would help to write down some example user code in
exception
handler e.g. to illustrate how this would be implemented -- e.g. we
know
that practically the handler need to maintain a "this" reference of
the
instance anyways in order to shutdown the whole instance or,
add/terminate
threads dynamically, but I want to see if we have listed all possible
call
paths like: a) a thread's handler logic to terminate another thread,
b) a
thread handler to add new threads, etc are all appropriately supported
without deadlocks.


I added an example for an uncaught exception handler that adds a stream
thread to the KIP. Removing a stream thread in an uncaught exception
handler doesn't seem a common use case to me. Nevertheless, we need to
make sure that we do not run in a deadlock in that case. I will
consider
that during the implementation and write tests to check for deadlocks.

Shutting down the Kafka Streams client from inside an uncaught
exception
handler is outside the scope of this KIP. In the beginning it was part
of the KIP, but during the discussion it turned out that we can fix our
existing close() method to accomplish the shutdown from inside an
uncaught exception handler. But I completely agree with you that we
need
to ensure that we do not run into a deadlock in this case.



Guozhang


On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax <mj...@apache.org>
wrote:

I would prefer to not add a new method. It seems unnecessary.
`localThreadMetadata` does return all threads in all states(*) and
thus
provides full insight.

(*) A thread in state DEAD could be returned as long as it's not
removed
yet.

I don't see any advantage to pre-filter threads and to exclude
threads
in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not
started yet, it is still "alive" in a broader sense. For example, if
a
user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in
state CREATED, a user won't need to add 2 more threads -- there are
already 10 threads.

For PENDING_SHUTDOWN and scale in it would be different I guess, as
the
proposal would be to filter them out right away. However, filtering
them
seems actually not to be "correct", as a thread in PENDING_SHUTDOWN
might still process data and it's thus still "alive".

If there is still a need later to add a new method about "alive
thread"
we can always add as a follow up -- removing things is much harder.

I also don't think that there is value in returning names of dead
threads, as we recycle names.


-Matthias


On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote:
I agree that the current behavior of localThreadsMetadata() does not
seem
to match, but it seems like we will be forced to change it to only
return
currently-alive threads. For one thing, we plan to recycle old
thread
names.
It would be pretty confusing for a user to get two (or more)
ThreadMetadata
objects returned with the same name, since AFAICT this is the only
distinguishing identifier of stream threads. I think we should
enforce
that
only live threads are returned by localThreadsMetadata(). Plus, as
Matthias
pointed out, we plan to remove dead threads from the KafkaStreams
client,
so still returning them in the metadata would be extremely odd.

If we think that there might be some use case that requires knowing
which
threads have died, we could consider adding a method that returns
the
names of dead threads. But the only use case I can imagine would
probably
be better served by a callback that gets invoked when the thread
dies,
which
we already have.

On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna <br...@confluent.io>
wrote:

Hi Matthias and Sophie,

I agree that localThreadsMetadata() can be used here. However,
localThreadsMetadata() returns all stream threads irrespectively of
their states. Alive stream threads are specified as being in one of
the
following states: RUNNING, STARTING, PARTITIONS_REVOKED, and
PARTITIONS_ASSIGNED. Hence, users would need to filter the result
of
localThreadsMetadata(). I thought, it would be neat to have a
method
that hides this filtering and returns the number of alive stream
threads, because that is the most basic information you might
need to
decide about adding or removing stream threads. For all more
advanced
use cases users should use localThreadsMetadata(). I am also happy
with
removing the method. WDYT?

Best,
Bruno

On 09.09.20 03:51, Matthias J. Sax wrote:
Currently we, don't cleanup dead threads, but the KIP proposes to
change
this:

Stream threads that are in state DEAD will be removed from the
stream
threads of a Kafka Streams client.


-Matthias

On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote:
Ah, I forgot about localThreadsMetadata(). In that. case I agree,
there's
no reason
to introduce a new method when we can get both the names and
number
of
all
running threads from this.

I assume that we would update localThreadsMetadata to only return
currently
alive threads as part of this KIP -- at a quick glance, it seems
like
we
don't do
any pruning of dead threads at the moment

On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax <mj...@apache.org

wrote:

I am not sure if we need a new method? There is already
`localThreadsMetadata()`. What do we gain by adding a new one?

Returning the thread's name (as `Optional<String>`) for both
add()
and
remove() is fine with me.


-Matthias

On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote:
Sorry Bruno, I think I missed the end of your message with the
numberOfAliveStreamThreads()
proposal. I agree, that would be better than the alternatives I
listed.
That said:

They rather suggest that the method returns a list of
handles to
the
stream threads.

I hadn't thought of that originally, but now that you mention
it,
this
might be a good idea.
I don't think we should return actual handles on the threads,
but
maybe a
list of the thread
names rather than a single number of currently alive threads.

Since we seem to think it would be difficult if not
impossible to
keep
track of the number
of running stream threads, we should apply the same reasoning
to
the
names
and not
assume the user can/will keep track of every thread returned by
addStreamThread() or
removeStreamThread(). Users should generally take any required
action
immediately
after adding/removing the thread -- eg deregistering the thread
metrics
--
but it might
still be useful to provide a convenience method listing all
of the
current
threads

And of course you could still get the number of threads
easily by
invoking
size() on the
returned list (or ordered set?).

On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna
<br...@confluent.io

wrote:

Thank you again for the feedback Sophie!

As I tried to point out in my previous e-mail, removing a
stream
thread
from a Kafka Streams client that does not have alive stream
threads
is
nothing exceptional for the client per se. However, it can
become
exceptional within the context of the user. For example, if
users
want
to remove a stream thread from a client without alive stream
threads
because one if their metrics say so, then this is
exceptional in
the
context of that user metric not in the context of the Kafka
Streams
client. In that case, users should throw an exception and
handle
it.

Regarding returning null, I do not like to return null because
from a
development point of view there is no distinction between
returning
null
because we have a bug in the code or returning null because
there
are no
alive stream threads. Additionally, Optional<String> makes it
more
explicit that the result could also be empty.

Thank you for the alternative method names! However, with the
names
you
propose it is not immediately clear that the method returns an
amount of
stream threads. They rather suggest that the method returns a
list
of
handles to the stream threads. I chose to use
"aliveStreamThreads"
to be
consistent with the client-level metric "alive-stream-threads"
which
reports the same number of stream threads that
numberOfAliveStreamThreads() should report. If others also
think
that
the proposed name in the KIP is too clumsy, I am open to
rename
it,
though.

Best,
Bruno


On 08.09.20 20:12, Sophie Blee-Goldman wrote:
it's never a good sign when the discussion moves into the
vote
thread

Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS]
threads
strikes
again.
Thanks for redirecting me Bruno

I suppose it's unfair to expect the callers to keep perfect
track
of
the
current
      number of stream threads, but it also seems like you
shouldn't
be
calling
removeStreamThread() when there are no threads left. Either
you're
just
haphazardly removing threads and could unintentionally slip
into a
state
of
no
running threads without realizing it, or more realistically,
you're
carefully
removing threads based on some metric(s) that convey
whether the
system
is
over or under-provisioned. If your metrics say you're
over-provisioned
but
there's
not one thread running, well, that certainly sounds
exceptional
to
me.
Or
you might
be right in that the cluster is over-provisioned but have
just
been
directing the
removeStreamThread() and addStreamThread() calls to
instances at
random,
and
end up with one massive instance and one with no threads at
all.
Again,
this
probably merits some human intervention (or system redesign)

That said, I don't think there's any real harm to just
returning
null
in
this case, but I hope
that users would pay attention to this since it seems
likely to
indicate
something has gone
seriously wrong. I suppose Optional<String> would be a
reasonable
compromise.

As for the method name, what about activeStreamThreads() or
liveStreamThreads() ?

On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna <
br...@confluent.io>
wrote:

Hi John,

I agree with you except for checking null. I would rather
prefer
to
use
Optional<String> as the return type to both methods.

I changed the subject from [VOTE] to [DISCUSS] so that we
can
follow
up
in the discussion thread.

Best,
Bruno

On 04.09.20 23:12, John Roesler wrote:
Hi Sophie,

Uh, oh, it's never a good sign when the discussion moves
into the vote thread :)

I agree with you, it seems like a good touch for
removeStreamThread() to return the name of the thread that
got removed, rather than a boolean flag. Maybe the return
value would be `null` if there is no thread to remove.

If we go that way, I'd suggest that addStreamThread() also
return the name of the newly created thread, or null if no
thread can be created right now.

I'm not completely sure if I think that callers of this
method would know exactly how many threads there are. Sure,
if a human being is sitting there looking at the metrics or
logs and decides to call the method, it would work out, but
I'd expect this kind of method to find its way into
automated tooling that reacts to things like current system
load or resource saturation. Those kinds of toolchains
often
are part of a distributed system, and it's probably not
that
easy to guarantee that the thread count they observe is
fully consistent with the number of threads that are
actually running. Therefore, an in-situ `int
numStreamThreads()` method might not be a bad idea. Then
again, it seems sort of optional. A caller can catch an
exception or react to a `null` return value just the same
either way. Having both add/remove methods behave similarly
is probably more valuable.

Thanks,
-John


On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
wrote:
Hey, sorry for the late reply, I just have one minor
suggestion.
Since
we
don't
make any guarantees about which thread gets removed or
allow
the
user
to
specify, I think we should return either the index or full
name
of
the
thread
that does get removed by removeThread().

I know you just updated the KIP to return true/false if
there
are/aren't any
threads to be removed, but I think this would be more
appropriate as
an
exception than as a return type. I think it's reasonable
to
expect
users to
have some sense to how many threads are remaining, and not
try
to
remove
a thread when there is none left. To me, that indicates
something
wrong
with the user application code and should be treated as an
exceptional
case.
I don't think the same code clarify argument applies
here as
to
the
addStreamThread() case, as there's no reason for an
application
to
be
looping and retrying removeStreamThread()  since if that
fails,
it's
because
there are no threads left and thus it will continue to
always
fail.
And
if
the
user actually wants to shut down all threads, they should
just
close
the
whole application rather than call removeStreamThread()
in a
loop.

While I generally think it should be straightforward for
users
to
track
how
many stream threads they have running, maybe it would be
nice
to
add
a small utility method that does this for them. Something
like

// Returns the number of currently alive threads
boolean runningStreamThreads();

On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax <
mj...@apache.org

wrote:

+1 (binding)

On 9/3/20 6:16 AM, Bruno Cadonna wrote:
Hi,

I would like to start the voting on KIP-663 that
proposes
to
add
methods
to the Kafka Streams client to add and remove stream
threads
during
execution.









https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads


Best,
Bruno



















Reply via email to