Hi Matthias,

I replied inline.

Best,
Bruno

On 02.09.20 22:06, Matthias J. Sax wrote:
Thanks for updating the KIP.

Why do you propose to return `boolean` from addStreamThread() if the
thread could not be started? As an alternative, we could also throw an
exception if the client is not in state RUNNING? -- I guess both are
valid options: just want to see what the pros/cons of each approach
would be?


I prefer to return a boolean because it is nothing exceptional if a stream thread cannot be added due to an inappropriate state. State changes are expected in Streams. Furthermore, users should not be forced to control their program flow by catching exceptions. Let me give you some examples for returning a boolean and throwing an exception:

returning a boolean

while (!kafkaStreams.addStreamThread() &&
       kafkaStreams.state() != State.NOT_RUNNING &&
       kafkaStreams.state() != State.ERROR) {
}


throwing an exception

boolean added = false;
while (!added &&
       kafkaStreams.state() != State.NOT_RUNNING &&
       kafkaStreams.state() != State.ERROR) {

    try {
        kafkaStreams.addStreamThread();
        added = true;
    } catch (final Exception ex) {
        // do nothing
    }
}

IMO the first example is more readable than the second.


Btw: should we allow to add a new thread if the state is REBALANCING,
too? I actually don't see a reason why we should not allow this?


I guess you are right. I will update the KIP and include REBALANCING.


For removeStreamThread(), might it be worth to actually guarantee that
the thread with the largest index is stopped instead of leaving if
unspecified? It does not seem to be a big burden on the implementation
and given that we plan to reused indices of died threads, it might be
nice to have a contract? Or would there be any negative impact if we
guarantee it?


I left unspecified which stream thread is removed since I could not find any good reason for a guarantee. Also in your comment, I do not see what advantage, we would have if we guaranteed that the stream thread with the largest index is stopped. It would not guarantee that the next added stream thread would get the largest index, because another stream thread with a lower index could have failed in the meanwhile and now two indices are up for grabs. Leaving unspecified which stream thread is removed also gives us the possibility to choose the stream thread to remove according to other aspects like for example the one with the least local state.


Another thought: should we add a parameter `numberOfThreads` to each
method to allow users to start/stop multiple threads at once?


I would keep it simple for now and add overloads if users request them.


What happens if there is zero running threads and one calls
removeStreamThread()? Should we also add a `boolean` flag and return
`false` for this case (or throw an exception)?


Yeah, I think this is a good idea for the programmatical removal of all threads. However, I would not throw an exception for the reasons I pointed out above.



For the metric name, I would prefer "failed" over "crashed". Thoughts?


I think I like "failed" more than "crashed" and it is also more consistent with other parts of the code like the ProductionExceptionHandlerResponse.FAIL.



Side remark for the PR: can we make sure to update the description of
`num.stream.threads` to explain that it's the _initial_ number of
threads on startup?


Good point!


-Matthias


On 9/1/20 2:01 PM, Walker Carlson wrote:
Hi Bruno,

I read through your updated KIP and it looks good to me. I agree with
adding the metric to keep track of crashed streams in replace of a list of
dead streams.

best,
Wlaker :)

On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna <br...@confluent.io> wrote:

Hi John,

your proposal makes sense! I will update the KIP.

Best,
Bruno

On 01.09.20 17:31, John Roesler wrote:
Hello Bruno,

Thanks for the update! The KIP looks good to me; I only have
a grammatical complaint about the proposed metric name.

"Died" is a verb, the past tense of "to die", but in the
expression,"x stream threads", x should be an adjective. To
be fair, "died" is also the past participle of "to die", and
participles can usually be used as adjectives. Maybe it
sounds wrong to me because there's already a specifically
adjectival form: "dead". So "dead-stream-threads" seems more
natural.

However, I'm not sure if that captures the specific meaning
you're shooting for, namely that the metric counts only the
threads that died exceptionally, vs. from calling
"removeStreamThread()". What do you think of "crashed-
stream-threads" instead?

Thanks,
-John

On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
Hi,

I updated the KIP with the feedback so far. I removed the API to close
the Kafka Streams client asynchronously, since it should be possible to
avoid the deadlock with the existing method and without a KIP.

Please have a look at the updated KIP and let me know what you think.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads

Best,
Bruno

On 26.08.20 16:31, Bruno Cadonna wrote:
Hi,

I would like to propose the following KIP to start and shut down stream
threads during execution as well as to shut down asynchronously a Kafka
Streams client from an uncaught exception handler.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients


Best,
Bruno




Reply via email to