Thanks for the responses guys! I'll get the easy stuff out of the way first:

1) Fixed the KIP so that StaticStreamPartitioner extends StreamPartitioner
2) I totally agree with you Colt, the record value might have valuable (no
pun) information
in it that is needed to compute the partition without breaking the static
constraint. As in my
own example earlier, maybe the userId is a field in the value and not the
key itself. Actually
it was that exact thought that made me do a U-turn on this but I forgot to
update the thread
3) Colt, I'm not  sure I follow what you're trying to say in that last
paragraph, can you expand?
4) Lucas, it's a good question as to what kind of guard-rails we could put
up to enforce or even
detect a violation of static partitioning. Most likely Streams would need
to track every key to
partition mapping in an internal state store, but we have no guarantee the
key space is bounded
and the store wouldn't grow out of control. Mostly however I imagine users
would be frustrated
to find out there's a secret, extra state store taking up space when you
enable autoscaling, and
it's not even to provide functionality but just to make sure users aren't
doing something wrong.

I wish I had a better idea, but sadly I think the only practical solution
here is to try and make this
condition as clear and obvious and easy to understand as possible, perhaps
by providing an
example of what does and does not satisfy the constraint in the javadocs.
I'll work on that
5) I covered a bit above the impracticality of storing a potentially
unbounded keyspace, which
as you mention would need to be shared by all partitioners as well, so I
would agree that this
feels insurmountable. I'm leaning towards only enabling this feature for
the static partitioning
case at least in the first iteration, and we can see how things go from
there -- for example, are
people generally able to implement it correctly? If we find that the
feature is working well and
users are hungry for more, then it would be relatively straightforward to
open things up to
stateless applications, or even stateful applications which can withstand
some "blips" in the
logic/correctness.

That said, *technically* the feature would be able to be turned on for any
such case as it is, since
as discussed above it's difficult to place true guardrails around the
feature that can enforce
static partitioning. Perhaps we could put a short note in the
StaticStreamPartitioner docs that
explain how and when it's safe to break the static requirement, but that we
recommend against
doing so..

Thoughts?

-Sophie

On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy <c...@littlehorse.io> wrote:

> Sophie,
>
> Thank you for your detailed response. That makes sense (one partition per
> user seems like a lot of extra metadata if you've got millions of users,
> but I'm guessing that was just for illustrative purposes).
>
> In this case I'd like to question one small detail in your kip. The
> StaticPartitioner takes in just the key and not the value...in an
> application I've been working on, the "value" is a long-lived entity
> (spanning hundreds of records over several days) that has timestamp
> information about the creation of the entity inside of it. The ID itself is
> provided by the end-user of the system and as such isn't guaranteed to have
> timestamp info.
>
> This is quite a corner case, but if the StaticStreamPartitioner interface
> were allowed to peak at the record value, it would be trivial to implement
> logic as follows:
> ```
> entity = deserialize(record.value())
>
> if entity.created_before(T):
>   return hash(key) % old_partitions
> else:
>   return hash(key) % new_partitions
> ```
>
> That said, you're a rockstar architect and have seen a lot more system
> design than I have (I'm 23 and only 3 years out of school...you implemented
> cooperative rebalancing 😀). So don't make that decision unless you can see
> other use-cases where it is appropriate.
>
> Additionally, for my own use-case I'm not sure if static partitioning alone
> (as opposed to re-partitioning and re-playing the changelogs into new
> stores) would enable auto-scaleout because my system uses Kafka Streams as
> the data store *and* a secondary index...for example, when a user wants to
> look up all entities where the variable `user_email==f...@bar.com`, we have
> an index store that has keys partitioned by and prefixed with `user_email==
> f...@bar.com`. Entities with that email (for example) could come before or
> after time T.
>
> Anyways, that's just my twopence, if I were a voting committer I'd vote for
> this KIP as-is.
>
> Cheers,
> Colt McNealy
> *Founder, LittleHorse.io*
>
>
> On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman
> <sop...@confluent.io.invalid> wrote:
>
> > Thanks for your questions, I would say that your understanding sounds
> > correct based
> > on what you described but I'll try to add some clarity. The basic idea is
> > that, as you said,
> > any keys that are processed before time T will go to partition 1. All of
> > those keys should
> > then continue to be routed to partition 1 for the remainder of the app's
> > lifetime, if you care
> > about maintaining correct history/"state" for that key (I'll come back to
> > this in the next
> > paragraph). After the time T, new keys that weren't processed prior to T
> > may be routed to
> > either partition, provided they are similarly mapped to the same
> partition
> > forever after. It's
> > up to the user to enforce this, perhaps by trying to keep track of all
> keys
> > but that is likely to
> > be impractical. This feature is generally more targeted at cases where
> the
> > partition mapping
> > is "obvious" enough to compute without needing to maintain a history of
> all
> > keys and their
> > original partition: for example, imagine an application that processes
> user
> > account information.
> > You can scale out to a partition per user, and add a new partition each
> > time someone opens
> > a new account. When they open that account they get a userID number,
> > starting with #0 and
> > counting up from there. In that case, the partition for any records
> > pertaining to a given account
> > would just be its userID.
> >
> > I hope that clears up the kind of intended use case we're targeting with
> > this feature. That said,
> > another important and equally viable use case that I neglected to mention
> > in the KIP is fully
> > stateless applications. Technically this feature can produce correct
> > results for applications that
> > are at least one of (a) statically partitioned, or (b) completely
> > stateless. However, the stateless
> > case is a bit stickier since even if the Streams application itself
> doesn't
> > care about maintaining
> > the same mapping of key to partition, it could for example be feeding
> into
> > a downstream
> > application which *does* need to maintain state, and which would wind up
> > "losing" the history for
> > any keys that changed partition.
> >
> > I kind of felt like opening this feature up to stateless applications
> would
> > be asking for trouble and
> > make it too easy for people to shoot themselves in the foot. That said,
> I'm
> > open to discussion on
> > this point if you feel like the benefits here outweigh the risks. I'm
> also
> > happy to consider modifying
> > the API so that it could naturally be expanded to include stateless
> > applications  in the future, even
> > if we decide against allowing that use case in the first iteration of the
> > feature.
> >
> > Thoughts?
> >
> > Sophie
> >
> > On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy <c...@littlehorse.io>
> wrote:
> >
> > > Sophie,
> > >
> > > Thank you for the KIP! Choosing the number of partitions in a Streams
> app
> > > is a tricky task because of how difficult it is to re-partition; I'm
> glad
> > > you're working on an improvement. I've got two questions:
> > >
> > > First, `StaticStreamsPartitioner` is an interface that we (Streams
> users)
> > > must implement, I'm trying to understand how it would work. For
> example,
> > > let's say there's some point in time 'T' before which we have 1
> > partition.
> > > Then we decide to increase the partition count to 2 at time T. From my
> > > understanding, all keys that had passed through the Streams app before
> > time
> > > T must end up on partition 1 if they appear again in the input topics;
> > but
> > > any new keys are allowed to be sent to partition 2. Is that correct?
> And
> > > (pardon the naive question) how is this achieved without keeping track
> of
> > > all keys that have been seen at any point?
> > >
> > > Secondly, will this feature work with applications that use interactive
> > > queries?
> > >
> > > Thank you very much,
> > > Colt McNealy
> > > *Founder, LittleHorse.io*
> > >
> > >
> > > On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman
> > > <sop...@confluent.io.invalid> wrote:
> > >
> > > > Hey all,
> > > >
> > > > I'd like to propose a new autoscaling feature for Kafka Streams
> > > > applications which can follow the constraint of static partitioning.
> > For
> > > > further details please refer to the KIP document:
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams
> > > >
> > > > This feature will be targeted for 3.4 but may not be fully
> implemented
> > > > until the following release, 3.5.
> > > >
> > > > Please give this a read and let me know what you think!
> > > >
> > > > Cheers,
> > > > Sophie
> > > >
> > >
> >
>

Reply via email to