Regarding the second API and the `TaskStatus` class: I'd suggest we
consolidate on the existing `TaskMetadata` since we have already
accumulated a bunch of such classes, and its better to keep them small as
public APIs. You can see https://issues.apache.org/jira/browse/KAFKA-12370
for a reference and a proposal.

On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thanks for the updates Walker. Some replies and follow-up questions:
>
> 1. I agree one task could have multiple partitions, but when we hit a delay
> in terms of offset progress, do we have a convenient way to reverse mapping
> TopicPartition to the problematic task? In production, I believe it would
> be much quicker to identify the problem using task.id instead of topic
> partition, especially when it points to an internal topic. I think having
> the task id as part of the entry value seems useful, which means getting
> something like Map<TopicPartition, TaskProgress> where TaskProgress
> contains both committed offsets & task id.
>
> 2. The task idling API was still confusing. I don't think we care about the
> exact state when making tasksIdling()query, instead we care more about how
> long one task has been in idle state since when you called, which reflects
> whether it is a normal idling period. So I feel it might be helpful to
> track that time difference and report it in the TaskStatus struct.
>
> 3. What I want to achieve to have some global mapping of either
> TopicPartition or TaskId was that it is not possible for a health check
> service to report a task failure that doesn't emit any metrics. So as long
> as we have a global topic partition API, health check could always be aware
> of any task/partition not reporting its progress, does that make sense? If
> you feel we have a better way to achieve this, such as querying all the
> input/intermediate topic metadata directly from Kafka for the baseline, I
> think that should be good as well and worth mentioning it in the KIP.
>
> Also it seems that the KIP hasn't reflected what you proposed for the task
> idling status.
>
> Best,
> Boyang
>
>
> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <wcarl...@confluent.io>
> wrote:
>
> > Thank you for the comments everyone!
> >
> > I think there are a few things I can clear up in general then I will
> > specifically respond to each question.
> >
> > First, when I say "idling" I refer to task idling. Where the stream is
> > intentionally not making progress. (
> > https://issues.apache.org/jira/browse/KAFKA-10091 is an example). This
> > becomes relevant if a task is waiting on one partition with no data but
> > that is holding up a partition with data. That would cause one just
> looking
> > at the committed offset changes to believe the task has a problem when it
> > is working as intended.
> >
> > In light of this confusion. I plan to change tasksIdling() to
> `Map<TaskId,
> > TaskStatus> getTasksStatus()` this should hopefully make it more clear
> what
> > is being exposed.
> >
> > TaskStatus would include: TopicPartions, TaskId, ProcessorTopology,
> Idling,
> > and State.
> >
> > Boyang:
> >
> > 2) I think that each task should report on whatever TopicPartitions they
> > hold, this means a Topic Partition might get reported twice but the user
> > can roll those up and use the larger one when looking at the whole app.
> >
> > 4) If the user collects the committed offsets across all the running
> > clients there shouldn't be any tasks missing correct?
> >
> > 6) Because there is not a 1:1 mapping between Tasks and TopicPartitions I
> > think it is cleaner to report them separately.
> >
> > Guozhang:
> >
> > 1) Yes, that was my original plan but it made more sense to mirror how
> the
> > consumer exposes the committed offset.
> >
> > 3) That is a good point. I think that we should include internal topics
> as
> > well. I think that if the topology were to evolve there should be fair
> > warning anyways. Maybe you can clarify what would be limited by exposing
> > the interior topics here? I thought a user could find them in other ways.
> > If it is the name we could aynomise them before exposing them.
> >
> > Thank you all for your comments. If I did not respond directly to one of
> > your questions I updated the kip to include the details it was
> requesting.
> > I didn't not include my proposed changes mentioned earlier as I would
> like
> > to get some feedback about what to include in TaskStatus and in general.
> >
> > best,
> > Walker
> >
> > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Walker, thanks for the KIP. A few thoughts:
> > >
> > > 1) Have you considered just relying on the `KafkaStreams#metrics()`
> that
> > > includes embedded consumer metrics that have the committed offsets
> > > instead of adding a new API? Not advocating that this is a better
> > approach
> > > but want to make sure we considered all options before we come to the
> > "last
> > > resort" of adding new public interfaces.
> > >
> > > 2) The javadoc mentions "tasks assigned to this client", but the
> returned
> > > map is on partitions. I think we should make the javadoc and the return
> > > types consistent, either tasks or topic partitions.
> > >
> > > 3) In addition, if for 2) above we ended up with topic partitions, then
> > > would they include only external source topics, or also including
> > internal
> > > repartition / changelog topics? I think including only external source
> > > topic partitions are not sufficient for your goal of tracking progress,
> > but
> > > exposing internal topic names are also a big commitment here for future
> > > topology evolution.
> > >
> > > 4)  For "tasksIdling", I'm wondering if we can make it more general,
> that
> > > the returned value is not just a boolean, but a TaskState that can be
> an
> > > enum of "created, restoring, running, idle, closing". This could help
> us
> > in
> > > the future to track other things like restoration efficiency and
> > rebalance
> > > efficiency etc.
> > >
> > > 5) We need to clarify how is "idling" being defined here: e.g. we can
> > > clearly state that a task is considered idle only if 1) lag is
> > > increasing, indicating that there are indeed new records arrived at
> > source,
> > > while committed offset is not advancing, AND 2) produced offset
> (imagine
> > we
> > > may have punctuations that generate new data to the output topic even
> if
> > > there's no input for a while) is not advancing either.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Walker for the proposed KIP! This should definitely empower
> > > KStream
> > > > users with better visibility.
> > > >
> > > > Meanwhile I got a couple of questions/suggestions:
> > > >
> > > >
> > > > 1. typo "repost/report" in the motivation section.
> > > >
> > > > 2. What offsets do we report when the task is under restoration or
> > > > rebalancing?
> > > >
> > > > 3. IIUC, we should clearly state that our reported metrics are based
> > off
> > > > locally assigned tasks for each instance.
> > > >
> > > > 4. In the meantime, what’s our strategy to report tasks that are not
> > > local
> > > > to the instance? Users would normally try to monitor all the possible
> > > > tasks, and it’s unfortunate we couldn’t determine whether we have
> lost
> > > > tasks. My brainstorming was whether it makes sense for the leader
> > > instance
> > > > to report the task progress as -1 for all “supposed to be running”
> > tasks,
> > > > so that on the metrics collector side it could catch any missing
> tasks.
> > > >
> > > > 5. It seems not clear how users should use `isTaskIdling`. Why not
> > > report a
> > > > map/set for idling tasks just as what we did for committed offsets?
> > > >
> > > > 6. Why do we use TopicPartition instead of TaskId as the key in the
> > > > returned map?
> > > > 7. Could we include some details in where we got the commit offsets
> for
> > > > each task? Is it through consumer offset fetch, or the stream
> > processing
> > > > progress based on the records fetched?
> > > >
> > > >
> > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> wcarl...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hello all,
> > > > >
> > > > > I would like to start discussion on KIP-715. This kip aims to make
> it
> > > > > easier to monitor Kafka Streams progress by exposing the committed
> > > offset
> > > > > in a similar way as the consumer client does.
> > > > >
> > > > > Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg
> > > > >
> > > > > Best,
> > > > > Walker
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


-- 
-- Guozhang

Reply via email to