Sophie,

Regarding item "3" (my last paragraph from the previous email), perhaps I
should give a more general example now that I've had more time to clarify
my thoughts:

In some stateful applications, certain keys have to be findable without any
information about when the relevant data was created. For example, if I'm
running a word-count app and I want to use Interactive Queries to find the
count for "foo", I would need to know whether "foo" first arrived before or
after time T before I could find the correct partition to look up the data.
In this case, I don't think static partitioning is possible. Is this
use-case a non-goal of the KIP, or am I missing something?

Colt McNealy
*Founder, LittleHorse.io*


On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> Thanks for the responses guys! I'll get the easy stuff out of the way
> first:
>
> 1) Fixed the KIP so that StaticStreamPartitioner extends StreamPartitioner
> 2) I totally agree with you Colt, the record value might have valuable (no
> pun) information
> in it that is needed to compute the partition without breaking the static
> constraint. As in my
> own example earlier, maybe the userId is a field in the value and not the
> key itself. Actually
> it was that exact thought that made me do a U-turn on this but I forgot to
> update the thread
> 3) Colt, I'm not  sure I follow what you're trying to say in that last
> paragraph, can you expand?
> 4) Lucas, it's a good question as to what kind of guard-rails we could put
> up to enforce or even
> detect a violation of static partitioning. Most likely Streams would need
> to track every key to
> partition mapping in an internal state store, but we have no guarantee the
> key space is bounded
> and the store wouldn't grow out of control. Mostly however I imagine users
> would be frustrated
> to find out there's a secret, extra state store taking up space when you
> enable autoscaling, and
> it's not even to provide functionality but just to make sure users aren't
> doing something wrong.
>
> I wish I had a better idea, but sadly I think the only practical solution
> here is to try and make this
> condition as clear and obvious and easy to understand as possible, perhaps
> by providing an
> example of what does and does not satisfy the constraint in the javadocs.
> I'll work on that
> 5) I covered a bit above the impracticality of storing a potentially
> unbounded keyspace, which
> as you mention would need to be shared by all partitioners as well, so I
> would agree that this
> feels insurmountable. I'm leaning towards only enabling this feature for
> the static partitioning
> case at least in the first iteration, and we can see how things go from
> there -- for example, are
> people generally able to implement it correctly? If we find that the
> feature is working well and
> users are hungry for more, then it would be relatively straightforward to
> open things up to
> stateless applications, or even stateful applications which can withstand
> some "blips" in the
> logic/correctness.
>
> That said, *technically* the feature would be able to be turned on for any
> such case as it is, since
> as discussed above it's difficult to place true guardrails around the
> feature that can enforce
> static partitioning. Perhaps we could put a short note in the
> StaticStreamPartitioner docs that
> explain how and when it's safe to break the static requirement, but that we
> recommend against
> doing so..
>
> Thoughts?
>
> -Sophie
>
> On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy <c...@littlehorse.io> wrote:
>
> > Sophie,
> >
> > Thank you for your detailed response. That makes sense (one partition per
> > user seems like a lot of extra metadata if you've got millions of users,
> > but I'm guessing that was just for illustrative purposes).
> >
> > In this case I'd like to question one small detail in your kip. The
> > StaticPartitioner takes in just the key and not the value...in an
> > application I've been working on, the "value" is a long-lived entity
> > (spanning hundreds of records over several days) that has timestamp
> > information about the creation of the entity inside of it. The ID itself
> is
> > provided by the end-user of the system and as such isn't guaranteed to
> have
> > timestamp info.
> >
> > This is quite a corner case, but if the StaticStreamPartitioner interface
> > were allowed to peak at the record value, it would be trivial to
> implement
> > logic as follows:
> > ```
> > entity = deserialize(record.value())
> >
> > if entity.created_before(T):
> >   return hash(key) % old_partitions
> > else:
> >   return hash(key) % new_partitions
> > ```
> >
> > That said, you're a rockstar architect and have seen a lot more system
> > design than I have (I'm 23 and only 3 years out of school...you
> implemented
> > cooperative rebalancing 😀). So don't make that decision unless you can
> see
> > other use-cases where it is appropriate.
> >
> > Additionally, for my own use-case I'm not sure if static partitioning
> alone
> > (as opposed to re-partitioning and re-playing the changelogs into new
> > stores) would enable auto-scaleout because my system uses Kafka Streams
> as
> > the data store *and* a secondary index...for example, when a user wants
> to
> > look up all entities where the variable `user_email==f...@bar.com`, we
> have
> > an index store that has keys partitioned by and prefixed with
> `user_email==
> > f...@bar.com`. Entities with that email (for example) could come before
> or
> > after time T.
> >
> > Anyways, that's just my twopence, if I were a voting committer I'd vote
> for
> > this KIP as-is.
> >
> > Cheers,
> > Colt McNealy
> > *Founder, LittleHorse.io*
> >
> >
> > On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > Thanks for your questions, I would say that your understanding sounds
> > > correct based
> > > on what you described but I'll try to add some clarity. The basic idea
> is
> > > that, as you said,
> > > any keys that are processed before time T will go to partition 1. All
> of
> > > those keys should
> > > then continue to be routed to partition 1 for the remainder of the
> app's
> > > lifetime, if you care
> > > about maintaining correct history/"state" for that key (I'll come back
> to
> > > this in the next
> > > paragraph). After the time T, new keys that weren't processed prior to
> T
> > > may be routed to
> > > either partition, provided they are similarly mapped to the same
> > partition
> > > forever after. It's
> > > up to the user to enforce this, perhaps by trying to keep track of all
> > keys
> > > but that is likely to
> > > be impractical. This feature is generally more targeted at cases where
> > the
> > > partition mapping
> > > is "obvious" enough to compute without needing to maintain a history of
> > all
> > > keys and their
> > > original partition: for example, imagine an application that processes
> > user
> > > account information.
> > > You can scale out to a partition per user, and add a new partition each
> > > time someone opens
> > > a new account. When they open that account they get a userID number,
> > > starting with #0 and
> > > counting up from there. In that case, the partition for any records
> > > pertaining to a given account
> > > would just be its userID.
> > >
> > > I hope that clears up the kind of intended use case we're targeting
> with
> > > this feature. That said,
> > > another important and equally viable use case that I neglected to
> mention
> > > in the KIP is fully
> > > stateless applications. Technically this feature can produce correct
> > > results for applications that
> > > are at least one of (a) statically partitioned, or (b) completely
> > > stateless. However, the stateless
> > > case is a bit stickier since even if the Streams application itself
> > doesn't
> > > care about maintaining
> > > the same mapping of key to partition, it could for example be feeding
> > into
> > > a downstream
> > > application which *does* need to maintain state, and which would wind
> up
> > > "losing" the history for
> > > any keys that changed partition.
> > >
> > > I kind of felt like opening this feature up to stateless applications
> > would
> > > be asking for trouble and
> > > make it too easy for people to shoot themselves in the foot. That said,
> > I'm
> > > open to discussion on
> > > this point if you feel like the benefits here outweigh the risks. I'm
> > also
> > > happy to consider modifying
> > > the API so that it could naturally be expanded to include stateless
> > > applications  in the future, even
> > > if we decide against allowing that use case in the first iteration of
> the
> > > feature.
> > >
> > > Thoughts?
> > >
> > > Sophie
> > >
> > > On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy <c...@littlehorse.io>
> > wrote:
> > >
> > > > Sophie,
> > > >
> > > > Thank you for the KIP! Choosing the number of partitions in a Streams
> > app
> > > > is a tricky task because of how difficult it is to re-partition; I'm
> > glad
> > > > you're working on an improvement. I've got two questions:
> > > >
> > > > First, `StaticStreamsPartitioner` is an interface that we (Streams
> > users)
> > > > must implement, I'm trying to understand how it would work. For
> > example,
> > > > let's say there's some point in time 'T' before which we have 1
> > > partition.
> > > > Then we decide to increase the partition count to 2 at time T. From
> my
> > > > understanding, all keys that had passed through the Streams app
> before
> > > time
> > > > T must end up on partition 1 if they appear again in the input
> topics;
> > > but
> > > > any new keys are allowed to be sent to partition 2. Is that correct?
> > And
> > > > (pardon the naive question) how is this achieved without keeping
> track
> > of
> > > > all keys that have been seen at any point?
> > > >
> > > > Secondly, will this feature work with applications that use
> interactive
> > > > queries?
> > > >
> > > > Thank you very much,
> > > > Colt McNealy
> > > > *Founder, LittleHorse.io*
> > > >
> > > >
> > > > On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman
> > > > <sop...@confluent.io.invalid> wrote:
> > > >
> > > > > Hey all,
> > > > >
> > > > > I'd like to propose a new autoscaling feature for Kafka Streams
> > > > > applications which can follow the constraint of static
> partitioning.
> > > For
> > > > > further details please refer to the KIP document:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams
> > > > >
> > > > > This feature will be targeted for 3.4 but may not be fully
> > implemented
> > > > > until the following release, 3.5.
> > > > >
> > > > > Please give this a read and let me know what you think!
> > > > >
> > > > > Cheers,
> > > > > Sophie
> > > > >
> > > >
> > >
> >
>

Reply via email to