Sophie, Regarding item "3" (my last paragraph from the previous email), perhaps I should give a more general example now that I've had more time to clarify my thoughts:
In some stateful applications, certain keys have to be findable without any information about when the relevant data was created. For example, if I'm running a word-count app and I want to use Interactive Queries to find the count for "foo", I would need to know whether "foo" first arrived before or after time T before I could find the correct partition to look up the data. In this case, I don't think static partitioning is possible. Is this use-case a non-goal of the KIP, or am I missing something? Colt McNealy *Founder, LittleHorse.io* On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman <sop...@confluent.io.invalid> wrote: > Thanks for the responses guys! I'll get the easy stuff out of the way > first: > > 1) Fixed the KIP so that StaticStreamPartitioner extends StreamPartitioner > 2) I totally agree with you Colt, the record value might have valuable (no > pun) information > in it that is needed to compute the partition without breaking the static > constraint. As in my > own example earlier, maybe the userId is a field in the value and not the > key itself. Actually > it was that exact thought that made me do a U-turn on this but I forgot to > update the thread > 3) Colt, I'm not sure I follow what you're trying to say in that last > paragraph, can you expand? > 4) Lucas, it's a good question as to what kind of guard-rails we could put > up to enforce or even > detect a violation of static partitioning. Most likely Streams would need > to track every key to > partition mapping in an internal state store, but we have no guarantee the > key space is bounded > and the store wouldn't grow out of control. Mostly however I imagine users > would be frustrated > to find out there's a secret, extra state store taking up space when you > enable autoscaling, and > it's not even to provide functionality but just to make sure users aren't > doing something wrong. > > I wish I had a better idea, but sadly I think the only practical solution > here is to try and make this > condition as clear and obvious and easy to understand as possible, perhaps > by providing an > example of what does and does not satisfy the constraint in the javadocs. > I'll work on that > 5) I covered a bit above the impracticality of storing a potentially > unbounded keyspace, which > as you mention would need to be shared by all partitioners as well, so I > would agree that this > feels insurmountable. I'm leaning towards only enabling this feature for > the static partitioning > case at least in the first iteration, and we can see how things go from > there -- for example, are > people generally able to implement it correctly? If we find that the > feature is working well and > users are hungry for more, then it would be relatively straightforward to > open things up to > stateless applications, or even stateful applications which can withstand > some "blips" in the > logic/correctness. > > That said, *technically* the feature would be able to be turned on for any > such case as it is, since > as discussed above it's difficult to place true guardrails around the > feature that can enforce > static partitioning. Perhaps we could put a short note in the > StaticStreamPartitioner docs that > explain how and when it's safe to break the static requirement, but that we > recommend against > doing so.. > > Thoughts? > > -Sophie > > On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy <c...@littlehorse.io> wrote: > > > Sophie, > > > > Thank you for your detailed response. That makes sense (one partition per > > user seems like a lot of extra metadata if you've got millions of users, > > but I'm guessing that was just for illustrative purposes). > > > > In this case I'd like to question one small detail in your kip. The > > StaticPartitioner takes in just the key and not the value...in an > > application I've been working on, the "value" is a long-lived entity > > (spanning hundreds of records over several days) that has timestamp > > information about the creation of the entity inside of it. The ID itself > is > > provided by the end-user of the system and as such isn't guaranteed to > have > > timestamp info. > > > > This is quite a corner case, but if the StaticStreamPartitioner interface > > were allowed to peak at the record value, it would be trivial to > implement > > logic as follows: > > ``` > > entity = deserialize(record.value()) > > > > if entity.created_before(T): > > return hash(key) % old_partitions > > else: > > return hash(key) % new_partitions > > ``` > > > > That said, you're a rockstar architect and have seen a lot more system > > design than I have (I'm 23 and only 3 years out of school...you > implemented > > cooperative rebalancing 😀). So don't make that decision unless you can > see > > other use-cases where it is appropriate. > > > > Additionally, for my own use-case I'm not sure if static partitioning > alone > > (as opposed to re-partitioning and re-playing the changelogs into new > > stores) would enable auto-scaleout because my system uses Kafka Streams > as > > the data store *and* a secondary index...for example, when a user wants > to > > look up all entities where the variable `user_email==f...@bar.com`, we > have > > an index store that has keys partitioned by and prefixed with > `user_email== > > f...@bar.com`. Entities with that email (for example) could come before > or > > after time T. > > > > Anyways, that's just my twopence, if I were a voting committer I'd vote > for > > this KIP as-is. > > > > Cheers, > > Colt McNealy > > *Founder, LittleHorse.io* > > > > > > On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman > > <sop...@confluent.io.invalid> wrote: > > > > > Thanks for your questions, I would say that your understanding sounds > > > correct based > > > on what you described but I'll try to add some clarity. The basic idea > is > > > that, as you said, > > > any keys that are processed before time T will go to partition 1. All > of > > > those keys should > > > then continue to be routed to partition 1 for the remainder of the > app's > > > lifetime, if you care > > > about maintaining correct history/"state" for that key (I'll come back > to > > > this in the next > > > paragraph). After the time T, new keys that weren't processed prior to > T > > > may be routed to > > > either partition, provided they are similarly mapped to the same > > partition > > > forever after. It's > > > up to the user to enforce this, perhaps by trying to keep track of all > > keys > > > but that is likely to > > > be impractical. This feature is generally more targeted at cases where > > the > > > partition mapping > > > is "obvious" enough to compute without needing to maintain a history of > > all > > > keys and their > > > original partition: for example, imagine an application that processes > > user > > > account information. > > > You can scale out to a partition per user, and add a new partition each > > > time someone opens > > > a new account. When they open that account they get a userID number, > > > starting with #0 and > > > counting up from there. In that case, the partition for any records > > > pertaining to a given account > > > would just be its userID. > > > > > > I hope that clears up the kind of intended use case we're targeting > with > > > this feature. That said, > > > another important and equally viable use case that I neglected to > mention > > > in the KIP is fully > > > stateless applications. Technically this feature can produce correct > > > results for applications that > > > are at least one of (a) statically partitioned, or (b) completely > > > stateless. However, the stateless > > > case is a bit stickier since even if the Streams application itself > > doesn't > > > care about maintaining > > > the same mapping of key to partition, it could for example be feeding > > into > > > a downstream > > > application which *does* need to maintain state, and which would wind > up > > > "losing" the history for > > > any keys that changed partition. > > > > > > I kind of felt like opening this feature up to stateless applications > > would > > > be asking for trouble and > > > make it too easy for people to shoot themselves in the foot. That said, > > I'm > > > open to discussion on > > > this point if you feel like the benefits here outweigh the risks. I'm > > also > > > happy to consider modifying > > > the API so that it could naturally be expanded to include stateless > > > applications in the future, even > > > if we decide against allowing that use case in the first iteration of > the > > > feature. > > > > > > Thoughts? > > > > > > Sophie > > > > > > On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy <c...@littlehorse.io> > > wrote: > > > > > > > Sophie, > > > > > > > > Thank you for the KIP! Choosing the number of partitions in a Streams > > app > > > > is a tricky task because of how difficult it is to re-partition; I'm > > glad > > > > you're working on an improvement. I've got two questions: > > > > > > > > First, `StaticStreamsPartitioner` is an interface that we (Streams > > users) > > > > must implement, I'm trying to understand how it would work. For > > example, > > > > let's say there's some point in time 'T' before which we have 1 > > > partition. > > > > Then we decide to increase the partition count to 2 at time T. From > my > > > > understanding, all keys that had passed through the Streams app > before > > > time > > > > T must end up on partition 1 if they appear again in the input > topics; > > > but > > > > any new keys are allowed to be sent to partition 2. Is that correct? > > And > > > > (pardon the naive question) how is this achieved without keeping > track > > of > > > > all keys that have been seen at any point? > > > > > > > > Secondly, will this feature work with applications that use > interactive > > > > queries? > > > > > > > > Thank you very much, > > > > Colt McNealy > > > > *Founder, LittleHorse.io* > > > > > > > > > > > > On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > Hey all, > > > > > > > > > > I'd like to propose a new autoscaling feature for Kafka Streams > > > > > applications which can follow the constraint of static > partitioning. > > > For > > > > > further details please refer to the KIP document: > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams > > > > > > > > > > This feature will be targeted for 3.4 but may not be fully > > implemented > > > > > until the following release, 3.5. > > > > > > > > > > Please give this a read and let me know what you think! > > > > > > > > > > Cheers, > > > > > Sophie > > > > > > > > > > > > > > >