Thanks for the KIP, Jim!

This conversation seems to highlight that the KIP needs to specify
some of its behavior as well as its APIs, where the behavior is
observable and significant to users.

For example:

1. Do you plan to have a guarantee that immediately after
calling KafkaStreams.pause(), users should observe that the instance
stops processing new records? Or should they expect that the threads
will continue to process some records and pause asynchronously
(you already answered this in the thread earlier)?

2. Will the threads continue to poll new records until they naturally fill
up the task buffers, or will they immediately pause their Consumers
as well?

3. Will threads continue to call (system time) punctuators, or would
punctuations also be paused?

I realize that some of those questions simply may not have occurred to
you, so this is not a criticism for leaving them off; I'm just pointing out
that although we don't tend to mention implementation details in KIPs,
we also can't be too high level, since there are a lot of operational
details that users rely on to achieve various behaviors in Streams.

A couple more comments:

4. +1 to what Guozhang said. It seems like we should we also do a commit
before entering the paused state. That way, any open transactions would
be closed and not have to worry about timing out. Even under ALOS, it
seems best to go ahead and complete the processing of in-flight records
by committing. That way, if anything happens to die while it's paused, existing
work won't have to be repeated. Plus, if there are any processors with side
effects, users won't have to tolerate weird edge cases where a pause occurs
after a processor sees a record, but before the result is sent to its outputs.

5. I noticed that you proposed not to add a PAUSED state, but I didn't follow
the rationale. Adding a state seems beneficial for a number of reasons:
StreamThreads already use the thread state to determine whether to process
or not, so avoiding a new State would just mean adding a separate flag to track
and then checking your new flag in addition to the State in the thread. Also,
operating Streams applications is a non-trivial task, and users rely on the 
State
(and transitions) to understand Streams's behavior. Adding a PAUSED state
is an elegant way to communicate to operators what is happening with the
application. Note that the person digging though logs and metrics, trying
to understand why the application isn't doing anything is probably not going
to be the same person who is calling pause() and resume(). Also, if you add
a state, you don't need `isPaused()`.

5b. If you buy the arguments to go ahead and commit as well as the
argument to add a State, then I'd also suggest to follow the existing patterns
for the shutdown states by also adding PAUSING. That
way, you'll also expose a way to understand that Streams received the signal
to pause, and that it's still processing and committing some records in
preparation to enter a PAUSED state. I'm not sure if a RESUMING state would
also make sense.

And that's all I have to say about that. I hope you don't find my
long message offputting. I'm fundamentally in favor of your KIP,
and I think with a little more explanation in the KIP, and a few
small tweaks to the proposal, we'll be able to provide good
ergonomics to our users.

Thanks,
-John

On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote:
> I'm in favor of the "just pausing the instance itself“ option as well. As
> for EOS, the point is that when the processing is paused, we would not
> trigger any `producer.send` during the time, and the transaction timeout is
> sort of relying on that behavior, so my point was that it's probably better
> to also commit the processing before we pause it.
>
>
> Guozhang
>
> On Fri, May 6, 2022 at 6:12 PM Jim Hughes <jhug...@confluent.io.invalid>
> wrote:
>
>> Hi Matthias,
>>
>> Since the only thing which will be paused is processing the topology, I
>> think we can let commits happen naturally.
>>
>> Good point about getting the paused state to new members; it is seeming
>> like the "building block" approach is a good one to keep things simple at
>> first.
>>
>> Cheers,
>>
>> Jim
>>
>> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>> > I think it's tricky to propagate a pauseAll() via the rebalance
>> > protocol. New members joining the group would need to get paused, too?
>> > Could there be weird race conditions with overlapping pauseAll() and
>> > resumeAll() calls on different instanced while there could be a errors /
>> > network partitions or similar?
>> >
>> > I would argue that similar to IQ, we provide the basic building blocks,
>> > and leave it the user users to implement cross instance management for a
>> > pauseAll() scenario. -- Also, if there is really demand, we can always
>> > add pauseAll()/resumeAll() as follow up work.
>> >
>> > About named typologies: I agree to Jim to not include them in this KIP
>> > as they are not a public feature yet. If we make named typologies
>> > public, the corresponding KIP should extend the pause/resume feature
>> > (ie, APIs) accordingly. Of course, the code can (and should) already be
>> > setup to support it to be future proof.
>> >
>> > Good call out about commit and EOS -- to simplify it, I think it might
>> > be good to commit also for the at-least-once case?
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 5/6/22 1:05 PM, Jim Hughes wrote:
>> > > Hi Bill,
>> > >
>> > > Great questions; I'll do my best to reply inline:
>> > >
>> > > On Fri, May 6, 2022 at 3:21 PM Bill Bejeck <bbej...@gmail.com> wrote:
>> > >
>> > >> Hi Jim,
>> > >>
>> > >> Thanks for the KIP.  I have a couple of meta-questions as well:
>> > >>
>> > >> 1) Regarding pausing only a subset of running instances, I'm thinking
>> > there
>> > >> may be a use case for pausing all of them.
>> > >>     Would it make sense to also allow for pausing all instances by
>> > adding a
>> > >> method `pauseAll()` or something similar?
>> > >>
>> > >
>> > > Honestly, I'm indifferent on this point.  Presently, I think what I
>> have
>> > > proposed is the minimal change to get the ability to pause and resume
>> > > processing.  If adding a 'pauseAll()' is required, I'd be happy to do
>> > that!
>> > >
>> > >  From Guozhang's email, it sounds like this would require using the
>> > > rebalance protocol to trigger the coordination.  Would there be enough
>> > room
>> > > in that approach to indicate that a named topology is to be paused
>> across
>> > > all nodes?
>> > >
>> > >
>> > >> 2) Would pausing affect standby tasks?  For example, imagine there
>> are 3
>> > >> instances A, B, and C.
>> > >>     A user elects to pause instance C only but it hosts the standby
>> > tasks
>> > >> for A.
>> > >>     Would the standby tasks on the paused application continue to read
>> > from
>> > >> the changelog topic?
>> > >>
>> > >
>> > > Yes, standby tasks would continue reading from the changelog topic.
>> All
>> > > consumers would continue reading to avoid getting dropped from their
>> > > consumer groups.
>> > >
>> > > Cheers,
>> > >
>> > > Jim
>> > >
>> > >
>> > >
>> > >
>> > >> Thanks!
>> > >> Bill
>> > >>
>> > >>
>> > >> On Fri, May 6, 2022 at 2:44 PM Jim Hughes
>> <jhug...@confluent.io.invalid
>> > >
>> > >> wrote:
>> > >>
>> > >>> Hi Guozhang,
>> > >>>
>> > >>> Thanks for the feedback; responses inline below:
>> > >>>
>> > >>> On Fri, May 6, 2022 at 1:09 PM Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> > >>>
>> > >>>> Hello Jim,
>> > >>>>
>> > >>>> Thanks for the proposed KIP. I have some meta questions about it:
>> > >>>>
>> > >>>> 1) Would an instance always pause/resume all of its current owned
>> > >>>> topologies (i.e. the named topologies), or are there any scenarios
>> > >> where
>> > >>> we
>> > >>>> only want to pause/resume a subset of them?
>> > >>>>
>> > >>>
>> > >>> An instance may wish to pause some of its named topologies.  I was
>> > unsure
>> > >>> what to say about named topologies in the KIP since they seem to be
>> an
>> > >>> internal detail at the moment.
>> > >>>
>> > >>> I intend to add to KafkaStreamsNamedTopologyWrapper methods like:
>> > >>>      public void pauseNamedTopology(final String topologyToPause)
>> > >>>      public boolean isNamedTopologyPaused(final String topology)
>> > >>>      public void resumeNamedTopology(final String topologyToResume)
>> > >>>
>> > >>>
>> > >>>
>> > >>>> 2) From a user's perspective, do we want to always issue a
>> > >> `pause/resume`
>> > >>>> to all the instances or not? For example, we can define the
>> semantics
>> > >> of
>> > >>>> the function as "you only need to call this function on any of the
>> > >>>> application's instances, and all instances would then pause (via the
>> > >>>> rebalance error codes)", or as "you would call this function for all
>> > >> the
>> > >>>> instances of an application". Which one are you referring to?
>> > >>>>
>> > >>>
>> > >>> My initial intent is that one would call this function on any
>> instances
>> > >> of
>> > >>> the application that one wishes to pause.  This should allow more
>> > control
>> > >>> (in case one wanted to pause a portion of the instances).  On the
>> other
>> > >>> hand, this approach would put more work on the implementer to
>> > coordinate
>> > >>> calling pause or resume across instances.
>> > >>>
>> > >>> If the other option is more suitable, happy to do that instead.
>> > >>>
>> > >>>
>> > >>>> 3) With EOS, there's a transaction timeout which would determine how
>> > >>> long a
>> > >>>> transaction can stay idle before it's force-aborted on the broker
>> > >> side. I
>> > >>>> think when a pause is issued, that means we'd need to immediately
>> > >> commit
>> > >>>> the current transaction for EOS since we do not know how long we
>> could
>> > >>>> pause for. Is that right? If yes could you please clarify that in
>> the
>> > >> doc
>> > >>>> as well.
>> > >>>>
>> > >>>
>> > >>> Good point.  My intent is for pause() to wait for the next iteration
>> > >>> through `runOnce()` and then only skip over the processing for paused
>> > >> tasks
>> > >>> in `taskManager.process(numIterations, time)`.
>> > >>>
>> > >>> Do commits live inside that call or do they live across/outside of
>> it?
>> > >> In
>> > >>> the former case, I think there shouldn't be any issues with EOS.
>> > >>> Otherwise, we may need to work through some details to get EOS right.
>> > >>>
>> > >>> Once we figure that out, I can update the KIP.
>> > >>>
>> > >>> Thanks,
>> > >>>
>> > >>> Jim
>> > >>>
>> > >>>
>> > >>>
>> > >>>>
>> > >>>>
>> > >>>> Guozhang
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> On Wed, May 4, 2022 at 10:51 AM Jim Hughes
>> > >> <jhug...@confluent.io.invalid
>> > >>>>
>> > >>>> wrote:
>> > >>>>
>> > >>>>> Hi all,
>> > >>>>>
>> > >>>>> I have written up a KIP for adding the ability to pause and resume
>> > >> the
>> > >>>>> processing of a topology in AK Streams.  The KIP is here:
>> > >>>>>
>> > >>>>
>> > >>>
>> > >>
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
>> > >>>>>
>> > >>>>> Thanks in advance for your feedback!
>> > >>>>>
>> > >>>>> Cheers,
>> > >>>>>
>> > >>>>> Jim
>> > >>>>>
>> > >>>>
>> > >>>>
>> > >>>> --
>> > >>>> -- Guozhang
>> > >>>>
>> > >>>
>> > >>
>> > >
>> >
>>
>
>
> -- 
> -- Guozhang

Reply via email to