Hey Sophie,

This looks like a very nice feature. Going through the comments, I agree
with Bill above that there could be a case for skew on keys given the
earlier partitions would have the data which it already had and get some
more. Do you think that's a concern/side-effect that this feature could
bring in?

Thanks!
Sagar.

On Wed, Oct 26, 2022 at 2:15 AM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

> Hey Sophie,
>
> Thanks for the KIP. I think this could be useful for a lot of cases. I also
> think that this could cause a lot of confusion.
>
> Just to make sure we are doing our best to prevent people from
> misusing this feature, I wanted to clarify a couple of things.
> 1) There will be only an interface and no "default" implementation that a
> user can plug in for the static partitioner. I am considering when it comes
> to testing we want to make sure that we do not make our testing
> implementation avaible to a user.
> 2)  If a user wanted to use auto scaling for a stateless application it
> should be as easy as implementing the StaticStreamsPartitioner. Their
> implementation could even just wrap the default partitioner if they wanted,
> right?  I can't think of any way we could detect and then warn them about
> the output topic not being partitioned by keys if that were to happen, can
> you?
>
> Overall this looks good to me!
>
> Walker
>
> On Tue, Oct 25, 2022 at 12:27 PM Bill Bejeck <bbej...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > Thanks for the KIP! I think this is a worthwhile feature to add.  I have
> > two main questions about how this new feature will work.
> >
> >
> >    1. You mention that for stateless applications auto-scaling is a
> sticker
> >    situation.  But I was thinking that the auto-scaling would actually
> > benefit
> >    stateless applications the most, let me explain my thinking.  Let's
> say
> > you
> >    have a stateless Kafka Streams application with one input topic and 2
> >    partitions, meaning you're limited to at most 2 stream threads.  In
> > order
> >    to increase the throughput, you increase the number of partitions of
> the
> >    source topic to 4, so you can 4 stream threads.  In this case would
> the
> >    auto-scaling feature automatically increase the number of tasks from 2
> > to
> >    4?  Since the application is stateless, say using a filter then a map
> > for
> >    example, the partition for the record doesn't matter, so it seems that
> >    stateless applications would stand to gain a great deal.
> >    2. For stateful applications I can see the immediate benefit from
> >    autoscaling and static partitioning.   But again going with a
> partition
> >    expansion for increased throughput example, what would be the
> mitigation
> >    strategy for a stateful application that eventually wants to take
> > advantage
> >    of the increased number of partitions? Otherwise keeping all keys on
> > their
> >    original partition means you could end up with "key skew" due to not
> >    allowing keys to distribute out to the new partitions.
> >
> > One last comment, the KIP states "only the key, rather than the key and
> > value, are passed in to the partitioner", but the interface has it
> taking a
> > key and a value as parameters.  Based on your comments earlier in this
> > thread I was thinking that the text needs to be updated.
> >
> > Thanks,
> > Bill
> >
> > On Fri, Oct 21, 2022 at 12:21 PM Lucas Brutschy
> > <lbruts...@confluent.io.invalid> wrote:
> >
> > > Hi all,
> > >
> > > thanks, Sophie, this makes sense. I suppose then the way to help the
> user
> > > not apply this in the wrong setting is having good documentation and a
> > one
> > > or two examples of good use cases.
> > >
> > > I think Colt's time-based partitioning is a good example of how to use
> > > this. It actually doesn't have to be time, the same will work with any
> > > monotonically increasing identifier. I.e. the new partitions will only
> > get
> > > records for users with a "large" user ID greater than some user ID
> > > threshold hardcoded in the static partitioner. At least in this
> > restricted
> > > use-case, lookups by user ID would still be possible.
> > >
> > > Cheers,
> > > Lucas
> > >
> > > On Fri, Oct 21, 2022 at 5:37 PM Colt McNealy <c...@littlehorse.io>
> > wrote:
> > >
> > > > Sophie,
> > > >
> > > > Regarding item "3" (my last paragraph from the previous email),
> > perhaps I
> > > > should give a more general example now that I've had more time to
> > clarify
> > > > my thoughts:
> > > >
> > > > In some stateful applications, certain keys have to be findable
> without
> > > any
> > > > information about when the relevant data was created. For example, if
> > I'm
> > > > running a word-count app and I want to use Interactive Queries to
> find
> > > the
> > > > count for "foo", I would need to know whether "foo" first arrived
> > before
> > > or
> > > > after time T before I could find the correct partition to look up the
> > > data.
> > > > In this case, I don't think static partitioning is possible. Is this
> > > > use-case a non-goal of the KIP, or am I missing something?
> > > >
> > > > Colt McNealy
> > > > *Founder, LittleHorse.io*
> > > >
> > > >
> > > > On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman
> > > > <sop...@confluent.io.invalid> wrote:
> > > >
> > > > > Thanks for the responses guys! I'll get the easy stuff out of the
> way
> > > > > first:
> > > > >
> > > > > 1) Fixed the KIP so that StaticStreamPartitioner extends
> > > > StreamPartitioner
> > > > > 2) I totally agree with you Colt, the record value might have
> > valuable
> > > > (no
> > > > > pun) information
> > > > > in it that is needed to compute the partition without breaking the
> > > static
> > > > > constraint. As in my
> > > > > own example earlier, maybe the userId is a field in the value and
> not
> > > the
> > > > > key itself. Actually
> > > > > it was that exact thought that made me do a U-turn on this but I
> > forgot
> > > > to
> > > > > update the thread
> > > > > 3) Colt, I'm not  sure I follow what you're trying to say in that
> > last
> > > > > paragraph, can you expand?
> > > > > 4) Lucas, it's a good question as to what kind of guard-rails we
> > could
> > > > put
> > > > > up to enforce or even
> > > > > detect a violation of static partitioning. Most likely Streams
> would
> > > need
> > > > > to track every key to
> > > > > partition mapping in an internal state store, but we have no
> > guarantee
> > > > the
> > > > > key space is bounded
> > > > > and the store wouldn't grow out of control. Mostly however I
> imagine
> > > > users
> > > > > would be frustrated
> > > > > to find out there's a secret, extra state store taking up space
> when
> > > you
> > > > > enable autoscaling, and
> > > > > it's not even to provide functionality but just to make sure users
> > > aren't
> > > > > doing something wrong.
> > > > >
> > > > > I wish I had a better idea, but sadly I think the only practical
> > > solution
> > > > > here is to try and make this
> > > > > condition as clear and obvious and easy to understand as possible,
> > > > perhaps
> > > > > by providing an
> > > > > example of what does and does not satisfy the constraint in the
> > > javadocs.
> > > > > I'll work on that
> > > > > 5) I covered a bit above the impracticality of storing a
> potentially
> > > > > unbounded keyspace, which
> > > > > as you mention would need to be shared by all partitioners as well,
> > so
> > > I
> > > > > would agree that this
> > > > > feels insurmountable. I'm leaning towards only enabling this
> feature
> > > for
> > > > > the static partitioning
> > > > > case at least in the first iteration, and we can see how things go
> > from
> > > > > there -- for example, are
> > > > > people generally able to implement it correctly? If we find that
> the
> > > > > feature is working well and
> > > > > users are hungry for more, then it would be relatively
> > straightforward
> > > to
> > > > > open things up to
> > > > > stateless applications, or even stateful applications which can
> > > withstand
> > > > > some "blips" in the
> > > > > logic/correctness.
> > > > >
> > > > > That said, *technically* the feature would be able to be turned on
> > for
> > > > any
> > > > > such case as it is, since
> > > > > as discussed above it's difficult to place true guardrails around
> the
> > > > > feature that can enforce
> > > > > static partitioning. Perhaps we could put a short note in the
> > > > > StaticStreamPartitioner docs that
> > > > > explain how and when it's safe to break the static requirement, but
> > > that
> > > > we
> > > > > recommend against
> > > > > doing so..
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > -Sophie
> > > > >
> > > > > On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy <c...@littlehorse.io>
> > > > wrote:
> > > > >
> > > > > > Sophie,
> > > > > >
> > > > > > Thank you for your detailed response. That makes sense (one
> > partition
> > > > per
> > > > > > user seems like a lot of extra metadata if you've got millions of
> > > > users,
> > > > > > but I'm guessing that was just for illustrative purposes).
> > > > > >
> > > > > > In this case I'd like to question one small detail in your kip.
> The
> > > > > > StaticPartitioner takes in just the key and not the value...in an
> > > > > > application I've been working on, the "value" is a long-lived
> > entity
> > > > > > (spanning hundreds of records over several days) that has
> timestamp
> > > > > > information about the creation of the entity inside of it. The ID
> > > > itself
> > > > > is
> > > > > > provided by the end-user of the system and as such isn't
> guaranteed
> > > to
> > > > > have
> > > > > > timestamp info.
> > > > > >
> > > > > > This is quite a corner case, but if the StaticStreamPartitioner
> > > > interface
> > > > > > were allowed to peak at the record value, it would be trivial to
> > > > > implement
> > > > > > logic as follows:
> > > > > > ```
> > > > > > entity = deserialize(record.value())
> > > > > >
> > > > > > if entity.created_before(T):
> > > > > >   return hash(key) % old_partitions
> > > > > > else:
> > > > > >   return hash(key) % new_partitions
> > > > > > ```
> > > > > >
> > > > > > That said, you're a rockstar architect and have seen a lot more
> > > system
> > > > > > design than I have (I'm 23 and only 3 years out of school...you
> > > > > implemented
> > > > > > cooperative rebalancing 😀). So don't make that decision unless
> you
> > > can
> > > > > see
> > > > > > other use-cases where it is appropriate.
> > > > > >
> > > > > > Additionally, for my own use-case I'm not sure if static
> > partitioning
> > > > > alone
> > > > > > (as opposed to re-partitioning and re-playing the changelogs into
> > new
> > > > > > stores) would enable auto-scaleout because my system uses Kafka
> > > Streams
> > > > > as
> > > > > > the data store *and* a secondary index...for example, when a user
> > > wants
> > > > > to
> > > > > > look up all entities where the variable `user_email==f...@bar.com
> `,
> > > we
> > > > > have
> > > > > > an index store that has keys partitioned by and prefixed with
> > > > > `user_email==
> > > > > > f...@bar.com`. Entities with that email (for example) could come
> > > before
> > > > > or
> > > > > > after time T.
> > > > > >
> > > > > > Anyways, that's just my twopence, if I were a voting committer
> I'd
> > > vote
> > > > > for
> > > > > > this KIP as-is.
> > > > > >
> > > > > > Cheers,
> > > > > > Colt McNealy
> > > > > > *Founder, LittleHorse.io*
> > > > > >
> > > > > >
> > > > > > On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman
> > > > > > <sop...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Thanks for your questions, I would say that your understanding
> > > sounds
> > > > > > > correct based
> > > > > > > on what you described but I'll try to add some clarity. The
> basic
> > > > idea
> > > > > is
> > > > > > > that, as you said,
> > > > > > > any keys that are processed before time T will go to partition
> 1.
> > > All
> > > > > of
> > > > > > > those keys should
> > > > > > > then continue to be routed to partition 1 for the remainder of
> > the
> > > > > app's
> > > > > > > lifetime, if you care
> > > > > > > about maintaining correct history/"state" for that key (I'll
> come
> > > > back
> > > > > to
> > > > > > > this in the next
> > > > > > > paragraph). After the time T, new keys that weren't processed
> > prior
> > > > to
> > > > > T
> > > > > > > may be routed to
> > > > > > > either partition, provided they are similarly mapped to the
> same
> > > > > > partition
> > > > > > > forever after. It's
> > > > > > > up to the user to enforce this, perhaps by trying to keep track
> > of
> > > > all
> > > > > > keys
> > > > > > > but that is likely to
> > > > > > > be impractical. This feature is generally more targeted at
> cases
> > > > where
> > > > > > the
> > > > > > > partition mapping
> > > > > > > is "obvious" enough to compute without needing to maintain a
> > > history
> > > > of
> > > > > > all
> > > > > > > keys and their
> > > > > > > original partition: for example, imagine an application that
> > > > processes
> > > > > > user
> > > > > > > account information.
> > > > > > > You can scale out to a partition per user, and add a new
> > partition
> > > > each
> > > > > > > time someone opens
> > > > > > > a new account. When they open that account they get a userID
> > > number,
> > > > > > > starting with #0 and
> > > > > > > counting up from there. In that case, the partition for any
> > records
> > > > > > > pertaining to a given account
> > > > > > > would just be its userID.
> > > > > > >
> > > > > > > I hope that clears up the kind of intended use case we're
> > targeting
> > > > > with
> > > > > > > this feature. That said,
> > > > > > > another important and equally viable use case that I neglected
> to
> > > > > mention
> > > > > > > in the KIP is fully
> > > > > > > stateless applications. Technically this feature can produce
> > > correct
> > > > > > > results for applications that
> > > > > > > are at least one of (a) statically partitioned, or (b)
> completely
> > > > > > > stateless. However, the stateless
> > > > > > > case is a bit stickier since even if the Streams application
> > itself
> > > > > > doesn't
> > > > > > > care about maintaining
> > > > > > > the same mapping of key to partition, it could for example be
> > > feeding
> > > > > > into
> > > > > > > a downstream
> > > > > > > application which *does* need to maintain state, and which
> would
> > > wind
> > > > > up
> > > > > > > "losing" the history for
> > > > > > > any keys that changed partition.
> > > > > > >
> > > > > > > I kind of felt like opening this feature up to stateless
> > > applications
> > > > > > would
> > > > > > > be asking for trouble and
> > > > > > > make it too easy for people to shoot themselves in the foot.
> That
> > > > said,
> > > > > > I'm
> > > > > > > open to discussion on
> > > > > > > this point if you feel like the benefits here outweigh the
> risks.
> > > I'm
> > > > > > also
> > > > > > > happy to consider modifying
> > > > > > > the API so that it could naturally be expanded to include
> > stateless
> > > > > > > applications  in the future, even
> > > > > > > if we decide against allowing that use case in the first
> > iteration
> > > of
> > > > > the
> > > > > > > feature.
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > Sophie
> > > > > > >
> > > > > > > On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy <
> > c...@littlehorse.io>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Sophie,
> > > > > > > >
> > > > > > > > Thank you for the KIP! Choosing the number of partitions in a
> > > > Streams
> > > > > > app
> > > > > > > > is a tricky task because of how difficult it is to
> > re-partition;
> > > > I'm
> > > > > > glad
> > > > > > > > you're working on an improvement. I've got two questions:
> > > > > > > >
> > > > > > > > First, `StaticStreamsPartitioner` is an interface that we
> > > (Streams
> > > > > > users)
> > > > > > > > must implement, I'm trying to understand how it would work.
> For
> > > > > > example,
> > > > > > > > let's say there's some point in time 'T' before which we
> have 1
> > > > > > > partition.
> > > > > > > > Then we decide to increase the partition count to 2 at time
> T.
> > > From
> > > > > my
> > > > > > > > understanding, all keys that had passed through the Streams
> app
> > > > > before
> > > > > > > time
> > > > > > > > T must end up on partition 1 if they appear again in the
> input
> > > > > topics;
> > > > > > > but
> > > > > > > > any new keys are allowed to be sent to partition 2. Is that
> > > > correct?
> > > > > > And
> > > > > > > > (pardon the naive question) how is this achieved without
> > keeping
> > > > > track
> > > > > > of
> > > > > > > > all keys that have been seen at any point?
> > > > > > > >
> > > > > > > > Secondly, will this feature work with applications that use
> > > > > interactive
> > > > > > > > queries?
> > > > > > > >
> > > > > > > > Thank you very much,
> > > > > > > > Colt McNealy
> > > > > > > > *Founder, LittleHorse.io*
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman
> > > > > > > > <sop...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > Hey all,
> > > > > > > > >
> > > > > > > > > I'd like to propose a new autoscaling feature for Kafka
> > Streams
> > > > > > > > > applications which can follow the constraint of static
> > > > > partitioning.
> > > > > > > For
> > > > > > > > > further details please refer to the KIP document:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams
> > > > > > > > >
> > > > > > > > > This feature will be targeted for 3.4 but may not be fully
> > > > > > implemented
> > > > > > > > > until the following release, 3.5.
> > > > > > > > >
> > > > > > > > > Please give this a read and let me know what you think!
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Sophie
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to