Thanks Walker. Some minor comments:

1. Could you add a reference to localThreadMetadata method in the KIP?
2. Could you make the code block as a java template, such that
TaskMetadata.java could be as the template title? Also it would be good to
add some meta comments about the newly added functions.
3. Could you write more details about rejected alternatives? Just as why we
don't choose to expose as metrics, and how a new method on KStream is not
favorable. These would be valuable when we look back on our design
decisions.

On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wcarl...@confluent.io>
wrote:

> I understand now. I think that is a valid concern but I think it is best
> solved but having an external service verify through streams. As this KIP
> is now just adding fields to TaskMetadata to be returned in the
> threadMetadata I am going to say that is out of scope.
>
> That seems to be the last concern. If there are no others I will put this
> up for a vote soon.
>
> walker
>
> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
>
> > For the 3rd point, yes, what I'm proposing is an edge case. For example,
> > when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing logic
> > causing no one gets 1_1 assigned. Then the health check service will only
> > see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not paying
> > attention to 1_1. What I want to expose is a "logical global" view of all
> > the tasks through the stream instance, since each instance gets the
> > assigned topology and should be able to infer all the exact tasks to be
> up
> > and running when the service is healthy.
> >
> > On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wcarl...@confluent.io>
> > wrote:
> >
> > > Thanks for the follow up Boyang and Guozhang,
> > >
> > > I have updated the kip to include these ideas.
> > >
> > > Guozhang, that is a good idea about using the TaskMetadata. We can get
> it
> > > through the ThreadMetadata with a minor change to `localThreadMetadata`
> > in
> > > kafkaStreams. This means that we will only need to update TaskMetadata
> > and
> > > add no other APIs
> > >
> > > Boyang, since each TaskMetadata contains the TaskId and
> TopicPartitions I
> > > don't believe mapping either way will be a problem. Also I think we can
> > do
> > > something like record the time the task started idling and when it
> stops
> > > idling we can override it to -1. I think that should clear up the first
> > two
> > > points.
> > >
> > > As for your third point I am not sure I 100% understand. The
> > ThreadMetadata
> > > will contain a set of all task assigned to that thread. Any health
> check
> > > service will just need to query all clients and aggregate their
> responses
> > > to get a complete picture of all tasks correct?
> > >
> > > walker
> > >
> > > On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Regarding the second API and the `TaskStatus` class: I'd suggest we
> > > > consolidate on the existing `TaskMetadata` since we have already
> > > > accumulated a bunch of such classes, and its better to keep them
> small
> > as
> > > > public APIs. You can see
> > > https://issues.apache.org/jira/browse/KAFKA-12370
> > > > for a reference and a proposal.
> > > >
> > > > On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks for the updates Walker. Some replies and follow-up
> questions:
> > > > >
> > > > > 1. I agree one task could have multiple partitions, but when we
> hit a
> > > > delay
> > > > > in terms of offset progress, do we have a convenient way to reverse
> > > > mapping
> > > > > TopicPartition to the problematic task? In production, I believe it
> > > would
> > > > > be much quicker to identify the problem using task.id instead of
> > topic
> > > > > partition, especially when it points to an internal topic. I think
> > > having
> > > > > the task id as part of the entry value seems useful, which means
> > > getting
> > > > > something like Map<TopicPartition, TaskProgress> where TaskProgress
> > > > > contains both committed offsets & task id.
> > > > >
> > > > > 2. The task idling API was still confusing. I don't think we care
> > about
> > > > the
> > > > > exact state when making tasksIdling()query, instead we care more
> > about
> > > > how
> > > > > long one task has been in idle state since when you called, which
> > > > reflects
> > > > > whether it is a normal idling period. So I feel it might be helpful
> > to
> > > > > track that time difference and report it in the TaskStatus struct.
> > > > >
> > > > > 3. What I want to achieve to have some global mapping of either
> > > > > TopicPartition or TaskId was that it is not possible for a health
> > check
> > > > > service to report a task failure that doesn't emit any metrics. So
> as
> > > > long
> > > > > as we have a global topic partition API, health check could always
> be
> > > > aware
> > > > > of any task/partition not reporting its progress, does that make
> > sense?
> > > > If
> > > > > you feel we have a better way to achieve this, such as querying all
> > the
> > > > > input/intermediate topic metadata directly from Kafka for the
> > > baseline, I
> > > > > think that should be good as well and worth mentioning it in the
> KIP.
> > > > >
> > > > > Also it seems that the KIP hasn't reflected what you proposed for
> the
> > > > task
> > > > > idling status.
> > > > >
> > > > > Best,
> > > > > Boyang
> > > > >
> > > > >
> > > > > On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> > wcarl...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thank you for the comments everyone!
> > > > > >
> > > > > > I think there are a few things I can clear up in general then I
> > will
> > > > > > specifically respond to each question.
> > > > > >
> > > > > > First, when I say "idling" I refer to task idling. Where the
> stream
> > > is
> > > > > > intentionally not making progress. (
> > > > > > https://issues.apache.org/jira/browse/KAFKA-10091 is an
> example).
> > > This
> > > > > > becomes relevant if a task is waiting on one partition with no
> data
> > > but
> > > > > > that is holding up a partition with data. That would cause one
> just
> > > > > looking
> > > > > > at the committed offset changes to believe the task has a problem
> > > when
> > > > it
> > > > > > is working as intended.
> > > > > >
> > > > > > In light of this confusion. I plan to change tasksIdling() to
> > > > > `Map<TaskId,
> > > > > > TaskStatus> getTasksStatus()` this should hopefully make it more
> > > clear
> > > > > what
> > > > > > is being exposed.
> > > > > >
> > > > > > TaskStatus would include: TopicPartions, TaskId,
> ProcessorTopology,
> > > > > Idling,
> > > > > > and State.
> > > > > >
> > > > > > Boyang:
> > > > > >
> > > > > > 2) I think that each task should report on whatever
> TopicPartitions
> > > > they
> > > > > > hold, this means a Topic Partition might get reported twice but
> the
> > > > user
> > > > > > can roll those up and use the larger one when looking at the
> whole
> > > app.
> > > > > >
> > > > > > 4) If the user collects the committed offsets across all the
> > running
> > > > > > clients there shouldn't be any tasks missing correct?
> > > > > >
> > > > > > 6) Because there is not a 1:1 mapping between Tasks and
> > > > TopicPartitions I
> > > > > > think it is cleaner to report them separately.
> > > > > >
> > > > > > Guozhang:
> > > > > >
> > > > > > 1) Yes, that was my original plan but it made more sense to
> mirror
> > > how
> > > > > the
> > > > > > consumer exposes the committed offset.
> > > > > >
> > > > > > 3) That is a good point. I think that we should include internal
> > > topics
> > > > > as
> > > > > > well. I think that if the topology were to evolve there should be
> > > fair
> > > > > > warning anyways. Maybe you can clarify what would be limited by
> > > > exposing
> > > > > > the interior topics here? I thought a user could find them in
> other
> > > > ways.
> > > > > > If it is the name we could aynomise them before exposing them.
> > > > > >
> > > > > > Thank you all for your comments. If I did not respond directly to
> > one
> > > > of
> > > > > > your questions I updated the kip to include the details it was
> > > > > requesting.
> > > > > > I didn't not include my proposed changes mentioned earlier as I
> > would
> > > > > like
> > > > > > to get some feedback about what to include in TaskStatus and in
> > > > general.
> > > > > >
> > > > > > best,
> > > > > > Walker
> > > > > >
> > > > > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hello Walker, thanks for the KIP. A few thoughts:
> > > > > > >
> > > > > > > 1) Have you considered just relying on the
> > `KafkaStreams#metrics()`
> > > > > that
> > > > > > > includes embedded consumer metrics that have the committed
> > offsets
> > > > > > > instead of adding a new API? Not advocating that this is a
> better
> > > > > > approach
> > > > > > > but want to make sure we considered all options before we come
> to
> > > the
> > > > > > "last
> > > > > > > resort" of adding new public interfaces.
> > > > > > >
> > > > > > > 2) The javadoc mentions "tasks assigned to this client", but
> the
> > > > > returned
> > > > > > > map is on partitions. I think we should make the javadoc and
> the
> > > > return
> > > > > > > types consistent, either tasks or topic partitions.
> > > > > > >
> > > > > > > 3) In addition, if for 2) above we ended up with topic
> > partitions,
> > > > then
> > > > > > > would they include only external source topics, or also
> including
> > > > > > internal
> > > > > > > repartition / changelog topics? I think including only external
> > > > source
> > > > > > > topic partitions are not sufficient for your goal of tracking
> > > > progress,
> > > > > > but
> > > > > > > exposing internal topic names are also a big commitment here
> for
> > > > future
> > > > > > > topology evolution.
> > > > > > >
> > > > > > > 4)  For "tasksIdling", I'm wondering if we can make it more
> > > general,
> > > > > that
> > > > > > > the returned value is not just a boolean, but a TaskState that
> > can
> > > be
> > > > > an
> > > > > > > enum of "created, restoring, running, idle, closing". This
> could
> > > help
> > > > > us
> > > > > > in
> > > > > > > the future to track other things like restoration efficiency
> and
> > > > > > rebalance
> > > > > > > efficiency etc.
> > > > > > >
> > > > > > > 5) We need to clarify how is "idling" being defined here: e.g.
> we
> > > can
> > > > > > > clearly state that a task is considered idle only if 1) lag is
> > > > > > > increasing, indicating that there are indeed new records
> arrived
> > at
> > > > > > source,
> > > > > > > while committed offset is not advancing, AND 2) produced offset
> > > > > (imagine
> > > > > > we
> > > > > > > may have punctuations that generate new data to the output
> topic
> > > even
> > > > > if
> > > > > > > there's no input for a while) is not advancing either.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > > > > reluctanthero...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Walker for the proposed KIP! This should definitely
> > > empower
> > > > > > > KStream
> > > > > > > > users with better visibility.
> > > > > > > >
> > > > > > > > Meanwhile I got a couple of questions/suggestions:
> > > > > > > >
> > > > > > > >
> > > > > > > > 1. typo "repost/report" in the motivation section.
> > > > > > > >
> > > > > > > > 2. What offsets do we report when the task is under
> restoration
> > > or
> > > > > > > > rebalancing?
> > > > > > > >
> > > > > > > > 3. IIUC, we should clearly state that our reported metrics
> are
> > > > based
> > > > > > off
> > > > > > > > locally assigned tasks for each instance.
> > > > > > > >
> > > > > > > > 4. In the meantime, what’s our strategy to report tasks that
> > are
> > > > not
> > > > > > > local
> > > > > > > > to the instance? Users would normally try to monitor all the
> > > > possible
> > > > > > > > tasks, and it’s unfortunate we couldn’t determine whether we
> > have
> > > > > lost
> > > > > > > > tasks. My brainstorming was whether it makes sense for the
> > leader
> > > > > > > instance
> > > > > > > > to report the task progress as -1 for all “supposed to be
> > > running”
> > > > > > tasks,
> > > > > > > > so that on the metrics collector side it could catch any
> > missing
> > > > > tasks.
> > > > > > > >
> > > > > > > > 5. It seems not clear how users should use `isTaskIdling`.
> Why
> > > not
> > > > > > > report a
> > > > > > > > map/set for idling tasks just as what we did for committed
> > > offsets?
> > > > > > > >
> > > > > > > > 6. Why do we use TopicPartition instead of TaskId as the key
> in
> > > the
> > > > > > > > returned map?
> > > > > > > > 7. Could we include some details in where we got the commit
> > > offsets
> > > > > for
> > > > > > > > each task? Is it through consumer offset fetch, or the stream
> > > > > > processing
> > > > > > > > progress based on the records fetched?
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > > > > wcarl...@confluent.io>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello all,
> > > > > > > > >
> > > > > > > > > I would like to start discussion on KIP-715. This kip aims
> to
> > > > make
> > > > > it
> > > > > > > > > easier to monitor Kafka Streams progress by exposing the
> > > > committed
> > > > > > > offset
> > > > > > > > > in a similar way as the consumer client does.
> > > > > > > > >
> > > > > > > > > Here is the KIP:
> > https://cwiki.apache.org/confluence/x/aRRRCg
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Walker
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Reply via email to