Thanks Sophie.

Good catch on the default partitioner issue!

I missed the default config values as they were put into comments...

About the default timeout: what is the follow up rebalance cadence (I though it would be 10 minutes?). For this case, a default timeout of 15 minutes would imply that we only allow a single retry before we hit the timeout. Would this be sufficient (sounds rather aggressive to me)?


-Matthias

On 12/2/22 8:00 AM, Sophie Blee-Goldman wrote:
Thanks again for the responses -- just want to say up front that I realized
the concept of a
default partitioner is actually substantially more complicated than I first
assumed due to
key/value typing, so I pulled it from this KIP and filed a ticket for it
for now.

Bruno,

What is exactly the motivation behind metric num-autoscaling-failures?
Actually, to realise that autoscaling did not work, we only need to
monitor subtopology-parallelism over partition.autoscaling.timeout.ms
time, right?

That is exactly the motivation -- I imagine some users may want to retry
indefinitely, and it would not be practical (or very nice) to require users
monitor for up to *partition.autoscaling.timeout.ms
<http://partition.autoscaling.timeout.ms>* when that's been
configured to MAX_VALUE

Is num-autoscaling-failures a way to verify that Streams went through
enough autoscaling attempts during partition.autoscaling.timeout.ms?
Could you maybe add one or two sentences on how users should use
num-autoscaling-failures?

Not really, for the reason outlined above -- I just figured users might be
monitoring how often the autoscaling is failing and alert past some
threshold
since this implies something funny is going on. This is more of a "health
check"
kind of metric than a "scaling completed" status gauge. At the very least,
users will want to know when a failure has occurred, even if it's a single
failure,
no?

Hopefully that makes more sense now, but I suppose I can write something
like that in
the KIP too


Matthias -- answers inline below:

On Thu, Dec 1, 2022 at 10:44 PM Matthias J. Sax <mj...@apache.org> wrote:

Thanks for updating the KIP Sophie.

I have the same question as Bruno. How can the user use the failure
metric and what actions can be taken to react if the metric increases?


I guess this depends on how important the autoscaling is, but presumably in
most cases
if you see things failing you probably want to at least look into the logs
to figure out why
(for example quota violation), and at the most stop your application while
investigating?


Plus a few more:

(1) Do we assume that user can reason about `subtopology-parallelism`
metric to figure out if auto-scaling is finished? Given that a topology
might be complex and the rules to determine the partition count of
internal topic are not easy, it might be hard to use?

Even if the feature is for advanced users, I don't think we should push
the burden to understand the partition count details onto them.

We could add a second `target-subtopology-parallelism` metric (or
`expected-subtopology-paralleslism` or some other name)? This way, users
can compare "target/expected" and "actual" value and easily figure out
if some sub-topologies are not expanded yet.

Thoughts?


Makes sense to me -- will add a `expected-subtopology-paralleslism` metric


(2) What are the default values for the newly added configs? It's
obvious that `partition.autoscaling.enabled == false` by default, but
what timeout would we use?


This is in the KIP already -- look at the config definition


Also, what's the `default.stream.partitioner.class`? Should it be
`DefaultStreamPartitioner.class`?

Would we fail if auto-scaling is enabled and the default partitioner is
not changed (of course only for the case it's used; and if there is
state)? -- Not sure what the best behavior is, but the KIP (and docs?)
should explain it.


N/A since the default partitioner config was removed

(3)

This will be configurable for users via the new
partition.autoscaling.timeout.ms config, which will start counting after
the first failure (rather than when the autoscaling attempt began).

If we have interleave failures and partial success (ie, progress to
scale out some topic), would the timeout be reset on each success? I
think resetting would be good, ie, we only time out if there is no
progress at all for the configures timeout period.


Yes, that's what I had in mind -- will add a note to clarify this in the
doc


-Matthias


On 11/28/22 12:25 AM, Bruno Cadonna wrote:
Hi Sophie,

Thanks for the updates!

I also feel the KIP is much cleaner now.

I have one question:
What is exactly the motivation behind metric num-autoscaling-failures?
Actually, to realise that autoscaling did not work, we only need to
monitor subtopology-parallelism over partition.autoscaling.timeout.ms
time, right?
Is num-autoscaling-failures a way to verify that Streams went through
enough autoscaling attempts during partition.autoscaling.timeout.ms?
Could you maybe add one or two sentences on how users should use
num-autoscaling-failures?

Apart from that, the KIP LGTM!

Best,
Bruno

On 19.11.22 20:33, Sophie Blee-Goldman wrote:
Thanks for the feedback everyone. I went back to the drawing board with
a
different guiding
philosophy: that the users of this feature will generally be fairly
advanced, and we should
give them full flexibility to implement whatever they need while
trusting
them to know
what they are doing.

With this in mind, a lot of my original proposal has been replaced and
the KIP document
has been updated with the new details. Rather than addressing each of
the
last questions,
I'll refer everyone to read the new proposal and just call out some of
the
high-level changes.


The primary difference is in how we'll expose this feature to users.
I've
opted to remove the
guardrails and end the discussion on what kinds of applications we
should
allow by introducing
a feature flag that will be available for everyone. This also has the
advantage of letting users
turn the feature on and off.

Another big question was how we can enable users to monitor when Streams
has finished
autoscaling its internal topics. This was the point of the callback on
the
new partitioner
interface in the original proposal, but this is too limiting as
highlighted
by some of the above
examples. Since the point is to let the upstream pipeline logic know
when
it's safe to start
producing to the new partitions, we should provide external monitoring
for
this such as metrics.

The last important question was how to handle failures. This is
covered in
more details in the
KIP, but after thinking the scenario through more carefully I've
proposed
to let Streams retry
via followup rebalances up until a configurable maximum amount of time.

Please call out anything you think I missed addressing either in this
email
or the updated KIP.
Thanks to everyone who helped me refine the design of this feature; it
feels much cleaner now.

Give it a read and let me know what you think!

On Mon, Nov 7, 2022 at 5:45 PM Matthias J. Sax <mj...@apache.org>
wrote:

Thanks for the KIP Sophie. Seems there is a lively discussion going on.
I tried to read up on the history and I hope I don't repeat what was
already discussed.

And sorry for the quite long email...


(1) Stateless vs Stateful

I agree that stateless apps should be supported, even if I am not sure
how many stateless app will benefit from it. If an app is stateless,
why
would one need to repartition to begin with? Stateless apps might most
likely be apps with a single sub-topology and thus don't need this
feature to handle input topic scale out. Of course, there could be some
apps with more than one sub-topology and I don't see any reason why we
should not support scaling out those?

However, the point being is, that this feature is mainly useful for
stateful apps from my understanding.


(2) Config

I am not sure if using `static.partitioner.class` is a good choice and
I
would personally opt for a boolean config. The reason is (as already
mentioned by Bruno) that (stateful) apps might have a single
sub-topology: for this case, the static partitioning must be enforce
upstream already, and Kafka Streams must "just" add a new partition to
the state changelog topics to scale out. It seems odd to force users to
pass in a partitioner that might not be use by the runtime (the only
exception might be IQ which might not be used).

I also don't understand why we would need to enforce that downstream
output topics are using the same static partitioning that the input or
any repartition topics? We don't know anything about the potential
chaining of apps, and it's also not clear to me, why the output topic
would need to be scaled as claimed (it's a possibility, but I am sure
there are many cases for which the output topic is not touched and
standard hash/range/random partitioning is used and just fine)? In the
end, it's the users responsibility and we should not enforce artificial
limitations (cf (4) below).

I agree that we might want to add a new `default.partitioner` config
though to make it simpler for users to change the partitioner globally
instead of one-by-one method overwrites, for the case users need it.


(3) StaticPartitioner

Do we really need this new interface? The only benefit I see is the
added callback `onPartitionExpansion(...)` (but we can add this to
existing `StreamPartitioner` interface, too). In particular, I don't
see
any benefit in adding `staticPartition(...)` method -- if we say it's
the users responsibility to implement a static partitioning strategy,
they can just implement the existing `partition(...)` method IMHO. I
don't see what we gain by the new interface?


(3a) About `onPartitionExpansion()`: why do we need to pass in old/new
partition count?


(3b) Why should users throw a `TaskMigratedException` if they want to
put a record into a non-existing partition? The name seems
inappropriate
to me.
    -> I am also not sure, how this could happen, except for a user
error,
ie, when the user writes new keys into the input topic before the
expansion operation is finished; and for this case it seems ok to just
crash (maybe the user did not even enable the feature or did not intent
to scale the app at all and wrote an "bad key" into the input topic;
for
the later case, we might end up in an infinite rebalance as the input
topic was not scaled to begin with). -- Again, it seems we cannot (and
should not try to) guard the user for this case?



(4) User Responsibility

Using the feature is for advanced users only and they have a lot of
responsibility to use it correctly. For stateful single sub-topology
cases, their responsibility starts upstream by ensuring that the input
topic is partitioned statically.

Thus, I don't understand why we want to disallow any overwrite of the
partitioner in the code (and enforce a single partitioner
implemenation)? Similar to anything else, it's the user's
responsibility
to do the correct thing, and it feels like artificial safe-guards to me
to disallow it. I would prefer full flexibility, because if there are
100 ways user can misuse this feature, it does not buy is much to limit
it to 99 ways by those restrictions and it will make the implementation
(for the feature) much simpler if we don't have restrictions but put
the
burden onto the user.


(5) Runtime

There is a larger section about runtime handling and I am not sure if I
fully understand everything.

For example:

However, it should be noted that you should not change the partitioner
for existing applications and so this feature will generally be
limited to
new applications only.

What do you mean by this and why would we limit the feature to new
apps?
Given the stateful single sub-topology example from above, I don't see
any reason why such an app should not benefit from it (given that the
input topic is already statically partitioned)?


Furthermore, what do you mean by:

No repartitioning of internal topics will be performed until all
external user topics have finished being expanded and stabilized on the
same partition count (excluding any discrepancies due to intentional
differences via an explicit Repartition operation).

I think it would help to add a few concrete examples to the KIP to
illustrate the (easy and problematic) cases you have in mind. If I
interpret the sentence correctly, you are referring to a join use-case
for which both input topics must be expanded (what is not possible
atomically and thus we need to handle his race condition)? -- Would it
be ok (as a first step) to only support topologies with a single input
topic (which should avoid those race conditions)?


I am also wondering, if there could be any cascading issues/cyclic
dependencies to compute the new number of internal topic partitions?
Thoughts?


Lastly, I am not sure if I understand the timeout handling that is
proposed. Can you elaborate? In particular
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG ?



Thanks for reading all this (in case you did)...


-Matthias



On 10/31/22 11:40 PM, Luke Chen wrote:
Hi Sophie,

Thanks for the KIP. A very useful proposal!
Some questions:

1. the staticPartition method in the interface is commented out.

2. For error handling, as you can imagine, there could be errors
happening
during partition expansion.That means, the operation would be (1) take
long
time to complete, or (2) get stuck somewhere with fatal errorI'd
like to
know how we handle these 2 situations? For (1) I'm thinking if we
should
expose some metrics for monitoring, ex: state, topics to be
autoscaled,
...
etc. For (2), I'm not sure if some partitions got expanded and some
not
will cause any weird issues. If no, maybe just expose a metric for
autoscaling state, and have a state said "failed" something like that

3. Could this operation get aborted? I don't think so. Maybe there
should
be a note in the KIP

Thank you.
Luke


On Tue, Nov 1, 2022 at 2:15 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi Sophie,

Thank you for the KIP!

1.
I do not understand how autoscaling should work with a Streams
topology
with a stateful sub-topology that reads from the input topics. The
simplest example is a topology that consists of only one stateful
sub-topology. As far as I understand the upstream producer would
route
existing keys to different partitions after the partition expansion
than
before the expansion. That means Streams would -- in general -- not
read
the same keys with the same stream thread after the expansion. I
think
you proposed the solution to this in your last e-mail with the
following:

<quote>
Essentially whoever is responsible for calculating how many
partitions
are needed should also be responsible for directing whichever new
keys
are supposed to go into those new partitions, then pass it along to
the
upstream producer to encode in the record itself.
</quote>

But I am not 100% sure if you really meant what I understand. If I
understand it correctly, you propose that the user is responsible to
produce the records with existing keys to the same partitions as
before
the expansion upstream. I think that is an important information that
should be pointed out in the KIP.


2.
I would log an error and shutdown the Streams application if a custom
partitioner is used anywhere in the topology. I think that would make
the limitations clearer and would reduce perceived unexpected
behavior
by the users. Are there any specific reasons you propose to ignore it
and log a warning?

Best,
Bruno

On 28.10.22 04:51, Sophie Blee-Goldman wrote:
Thanks all! I'll try to address everything but don't hesitate to
call
me
out if anything is missed

Colt/Lucas:

Thanks for clarifying, I think I understand your example now.
Something I
didn't think to mention
earlier but hopefully clears up how this would be used in practice
is
that
the partitioning decision/
logic doesn't need to -- and perhaps explicitly should not be --
internal
to the StaticStreamPartitioner
interface alone. I would imagine a realistic scenario would have the
partition essentially determined
upstream of the actual application, specifically integrated with
whatever
system (or person) is
making the decision to add new partition(s) in the first place. Then
the
partitioner is just reading out
some field in the record key/value, possibly doing some
translation to
derive the final partition number
from something like a userId if it's not encoded directly, and not
actually
computing anything itself.
Does that make sense? Essentially whoever is responsible for
calculating
how many partitions are
needed should also be responsible for directing whichever new keys
are
supposed to go into those
new partitions, then pass it along to the upstream producer to
encode
in
the record itself.

In sum, I second what Lucas said about your scenario actually being
a
good
example of one way
to approach implementing static partitioning, ie based on time. It's
just
that the semantics/logic to
interpret the target partition based on time would be external to
the
application and not isolated in
the actual StaticStreamPartitioner class. Imo this makes perfect
sense,
as
something like IQ is
also going to be situated outside of the Streams application
itself, so
presumably it can talk to
the system that is responsible for the partitioning logic for any
partition
information it needs.

Bill/Sagar:

I've been going back and forth a lot on whether to open this
feature up
to
stateless applications or
even stateful ones as well, but feel like I've settled on having it
targeted towards both (but only) the
stateless and statically partitioned cases. Bill, my only concern
about
the
stateless apps was the
possibility for trouble when repartitioning a stateless application
that
feeds into a stateful application
downstream. But now that I think about it, users would actually
need to
ensure that any/all apps
in that pipeline could handle partition increases, so it would be
impossible for someone to mess up
something downstream with corrupted partitioning because any
changes to
the
output topics would
of course mean changing the input topics of those downstream apps,
and
they
would just shut down
if not set up to handle this -- that's the whole point of this
KIP. So
I'm +1 on including the stateless folks

As for stateful applications, I feel pretty strongly that we should
discourage users from trying to use
the autoscaling feature when state is involved. However, as I
touch on
again briefly in the API discussion
below, there's no way to truly prevent someone from abusing this
feature
if
they are determined to. So
the idea is really for us to stress and heavily document which
kinds of
applications can and cannot
enable autoscaling and/or be repartitioned without resulting in
significant
corruption of the results.

As for key skew, technically anything is possible -- but (a) we're
entrusting users to make smart choices
throughout this KIP, which includes being careful with the
partitioning
logic, (b) the real-world use cases
I'm aware of that requested this feature were not even susceptible
to
skew
from repartitioning as their
architecture involved giving each key its own partition, and (c)
if key
skew is going to become a problem,
I would consider that a question for the KIP that introduced
partition
increases, not an issue with a KIP
that's just trying to make Streams compatible with this ability :)
But yes, it's always a possibility and nonetheless fair to be
concerned.
It's worth calling out in the docs
somewhere and trying to help users avoid problems with this.

Walker:

Thanks, yes you are right that there will not be a default
implementation
provided, and also right that
this should have been explicitly called out in the KIP. I've added a
note
to address this.

That said, since we're expanding the feature to include/allow
stateless
applications as well, I've
been mulling over a few possible alternatives or modifications to
the
currently proposed APIs.

1. We could expand the scope of the new config to enable setting a
default
partitioner across the application regardless of the static
condition
and
autoscaling feature. But
if the user passes in a custom partitioner that does implement the
new
StaticStreamPartitioner
interface, then autoscaling will be enabled. Some further options
within
this scenario:
      a. Would we still lock down the partitioning and prevent the
static
partitioner from being overridden?
          My personal preference is "yes", though it is a bit
awkward to
have
different semantics depending
          on what kind of partitioner is passed in. Therefore I'd
propose to
always enforce any partitioner
          that's passed in as the default, and not allow
topology-level
overrides. Imo this would also make
          the new config safer from user error due to accidental
discrepancies throughout the topology
      b. How should we expose the feature for stateless apps? We
could
just
offer an OOTB implementation
          for stateless apps, which could implement the
StreamPartitioner
interface directly to circumvent the
          awkwardness of implementing an interface whose condition
(staticness)
it doesn't meet. The downside
          is that some stateless apps may still want customized
partitioning
logic. Of course they can just extend
          the class, but again it just feels slightly awkward due
to the
interface/class asymmetry. Alternatively, the
          StatelessStreamPartitioner could be an interface in
parallel to
the
StaticStreamPartitioner. However, I
          anticipate that the vast majority of stateless apps which
may
want
this feature do not use a custom
          partitioner, and would be annoyed at having to implement
one
just
to
unlock autoscaling. So if we did
          go this route, we'd probably need a default implementation
anyways.
          That last option would probably be the best user
experience,
even
if
slightly more work for us/me to
          add.
2. Another option is to keep the config semantics the same but
change
the
name to something like
'autoscaling.partitioner.class'. Then we can do something similar to
what's
discussed in 1b, with my
preference being to accept either a StaticStreamPartitioner OR
implementation of a StatelessStreamPartitioner
interface, for which an OOTB default partitioner would also be
provided.
3. One last open question here is whether we should try enforcing
the
statelessness of applications that try
to enable autoscaling via whatever API we land on for the stateless
case.
Personally I'm in favor of this, and
users who really want to get around our roadblocks and muck up a
stateful
app could still get through via
the static partitioner. This check would just be an additional
guardrail
from accidental misuses, not intentional ones

What do you all think? Any strong preferences or concerns about
any of
these API options? Should we expand
the config to be useful for any app with custom partitioning, or
keep
it
focused on the autoscaling feature? I do
worry a bit that when some users see a new config about enabling
autoscaling, they'll get excited and blindly plug
in the OOTB assignor to try it out without really understanding its
limitations and intended use. Maybe that's just
paranoid, I certainly hope so. Anyways I look forward to hearing all
your
opinions on the public interface here.

Whew, that was a long one, but thanks again to everyone who's joined
the
discussion so far! You've really helped
me to clarify my thoughts and vision for this feature. Looking
forward
to
your replies

Cheers,
Sophie

On Tue, Oct 25, 2022 at 1:45 PM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

Hey Sophie,

Thanks for the KIP. I think this could be useful for a lot of
cases. I
also
think that this could cause a lot of confusion.

Just to make sure we are doing our best to prevent people from
misusing this feature, I wanted to clarify a couple of things.
1) There will be only an interface and no "default" implementation
that
a
user can plug in for the static partitioner. I am considering
when it
comes
to testing we want to make sure that we do not make our testing
implementation avaible to a user.
2)  If a user wanted to use auto scaling for a stateless
application
it
should be as easy as implementing the StaticStreamsPartitioner.
Their
implementation could even just wrap the default partitioner if they
wanted,
right?  I can't think of any way we could detect and then warn them
about
the output topic not being partitioned by keys if that were to
happen,
can
you?

Overall this looks good to me!

Walker

On Tue, Oct 25, 2022 at 12:27 PM Bill Bejeck <bbej...@gmail.com>
wrote:

Hi Sophie,

Thanks for the KIP! I think this is a worthwhile feature to add.
I
have
two main questions about how this new feature will work.


       1. You mention that for stateless applications
auto-scaling is a
sticker
       situation.  But I was thinking that the auto-scaling would
actually
benefit
       stateless applications the most, let me explain my thinking.
Let's
say
you
       have a stateless Kafka Streams application with one input
topic
and 2
       partitions, meaning you're limited to at most 2 stream
threads.  In
order
       to increase the throughput, you increase the number of
partitions
of
the
       source topic to 4, so you can 4 stream threads.  In this
case
would
the
       auto-scaling feature automatically increase the number of
tasks
from 2
to
       4?  Since the application is stateless, say using a filter
then
a
map
for
       example, the partition for the record doesn't matter, so it
seems
that
       stateless applications would stand to gain a great deal.
       2. For stateful applications I can see the immediate benefit
from
       autoscaling and static partitioning.   But again going with
a
partition
       expansion for increased throughput example, what would be
the
mitigation
       strategy for a stateful application that eventually wants to
take
advantage
       of the increased number of partitions? Otherwise keeping all
keys
on
their
       original partition means you could end up with "key skew"
due to
not
       allowing keys to distribute out to the new partitions.

One last comment, the KIP states "only the key, rather than the
key
and
value, are passed in to the partitioner", but the interface has it
taking a
key and a value as parameters.  Based on your comments earlier in
this
thread I was thinking that the text needs to be updated.

Thanks,
Bill

On Fri, Oct 21, 2022 at 12:21 PM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

Hi all,

thanks, Sophie, this makes sense. I suppose then the way to
help the
user
not apply this in the wrong setting is having good documentation
and a
one
or two examples of good use cases.

I think Colt's time-based partitioning is a good example of how
to
use
this. It actually doesn't have to be time, the same will work
with
any
monotonically increasing identifier. I.e. the new partitions will
only
get
records for users with a "large" user ID greater than some user
ID
threshold hardcoded in the static partitioner. At least in this
restricted
use-case, lookups by user ID would still be possible.

Cheers,
Lucas

On Fri, Oct 21, 2022 at 5:37 PM Colt McNealy <
c...@littlehorse.io>
wrote:

Sophie,

Regarding item "3" (my last paragraph from the previous email),
perhaps I
should give a more general example now that I've had more time
to
clarify
my thoughts:

In some stateful applications, certain keys have to be findable
without
any
information about when the relevant data was created. For
example,
if
I'm
running a word-count app and I want to use Interactive Queries
to
find
the
count for "foo", I would need to know whether "foo" first
arrived
before
or
after time T before I could find the correct partition to look
up
the
data.
In this case, I don't think static partitioning is possible. Is
this
use-case a non-goal of the KIP, or am I missing something?

Colt McNealy
*Founder, LittleHorse.io*


On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

Thanks for the responses guys! I'll get the easy stuff out of
the
way
first:

1) Fixed the KIP so that StaticStreamPartitioner extends
StreamPartitioner
2) I totally agree with you Colt, the record value might have
valuable
(no
pun) information
in it that is needed to compute the partition without
breaking the
static
constraint. As in my
own example earlier, maybe the userId is a field in the value
and
not
the
key itself. Actually
it was that exact thought that made me do a U-turn on this but
I
forgot
to
update the thread
3) Colt, I'm not  sure I follow what you're trying to say in
that
last
paragraph, can you expand?
4) Lucas, it's a good question as to what kind of guard-rails
we
could
put
up to enforce or even
detect a violation of static partitioning. Most likely Streams
would
need
to track every key to
partition mapping in an internal state store, but we have no
guarantee
the
key space is bounded
and the store wouldn't grow out of control. Mostly however I
imagine
users
would be frustrated
to find out there's a secret, extra state store taking up space
when
you
enable autoscaling, and
it's not even to provide functionality but just to make sure
users
aren't
doing something wrong.

I wish I had a better idea, but sadly I think the only
practical
solution
here is to try and make this
condition as clear and obvious and easy to understand as
possible,
perhaps
by providing an
example of what does and does not satisfy the constraint in the
javadocs.
I'll work on that
5) I covered a bit above the impracticality of storing a
potentially
unbounded keyspace, which
as you mention would need to be shared by all partitioners as
well,
so
I
would agree that this
feels insurmountable. I'm leaning towards only enabling this
feature
for
the static partitioning
case at least in the first iteration, and we can see how
things go
from
there -- for example, are
people generally able to implement it correctly? If we find
that
the
feature is working well and
users are hungry for more, then it would be relatively
straightforward
to
open things up to
stateless applications, or even stateful applications which can
withstand
some "blips" in the
logic/correctness.

That said, *technically* the feature would be able to be
turned on
for
any
such case as it is, since
as discussed above it's difficult to place true guardrails
around
the
feature that can enforce
static partitioning. Perhaps we could put a short note in the
StaticStreamPartitioner docs that
explain how and when it's safe to break the static requirement,
but
that
we
recommend against
doing so..

Thoughts?

-Sophie

On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy
<c...@littlehorse.io

wrote:

Sophie,

Thank you for your detailed response. That makes sense (one
partition
per
user seems like a lot of extra metadata if you've got
millions of
users,
but I'm guessing that was just for illustrative purposes).

In this case I'd like to question one small detail in your
kip.
The
StaticPartitioner takes in just the key and not the
value...in an
application I've been working on, the "value" is a long-lived
entity
(spanning hundreds of records over several days) that has
timestamp
information about the creation of the entity inside of it.
The ID
itself
is
provided by the end-user of the system and as such isn't
guaranteed
to
have
timestamp info.

This is quite a corner case, but if the
StaticStreamPartitioner
interface
were allowed to peak at the record value, it would be
trivial to
implement
logic as follows:
```
entity = deserialize(record.value())

if entity.created_before(T):
      return hash(key) % old_partitions
else:
      return hash(key) % new_partitions
```

That said, you're a rockstar architect and have seen a lot
more
system
design than I have (I'm 23 and only 3 years out of
school...you
implemented
cooperative rebalancing 😀). So don't make that decision
unless
you
can
see
other use-cases where it is appropriate.

Additionally, for my own use-case I'm not sure if static
partitioning
alone
(as opposed to re-partitioning and re-playing the changelogs
into
new
stores) would enable auto-scaleout because my system uses
Kafka
Streams
as
the data store *and* a secondary index...for example, when a
user
wants
to
look up all entities where the variable
`user_email==f...@bar.com
`,
we
have
an index store that has keys partitioned by and prefixed with
`user_email==
f...@bar.com`. Entities with that email (for example) could
come
before
or
after time T.

Anyways, that's just my twopence, if I were a voting committer
I'd
vote
for
this KIP as-is.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

Thanks for your questions, I would say that your
understanding
sounds
correct based
on what you described but I'll try to add some clarity. The
basic
idea
is
that, as you said,
any keys that are processed before time T will go to
partition
1.
All
of
those keys should
then continue to be routed to partition 1 for the remainder
of
the
app's
lifetime, if you care
about maintaining correct history/"state" for that key (I'll
come
back
to
this in the next
paragraph). After the time T, new keys that weren't processed
prior
to
T
may be routed to
either partition, provided they are similarly mapped to the
same
partition
forever after. It's
up to the user to enforce this, perhaps by trying to keep
track
of
all
keys
but that is likely to
be impractical. This feature is generally more targeted at
cases
where
the
partition mapping
is "obvious" enough to compute without needing to maintain a
history
of
all
keys and their
original partition: for example, imagine an application that
processes
user
account information.
You can scale out to a partition per user, and add a new
partition
each
time someone opens
a new account. When they open that account they get a userID
number,
starting with #0 and
counting up from there. In that case, the partition for any
records
pertaining to a given account
would just be its userID.

I hope that clears up the kind of intended use case we're
targeting
with
this feature. That said,
another important and equally viable use case that I
neglected
to
mention
in the KIP is fully
stateless applications. Technically this feature can produce
correct
results for applications that
are at least one of (a) statically partitioned, or (b)
completely
stateless. However, the stateless
case is a bit stickier since even if the Streams application
itself
doesn't
care about maintaining
the same mapping of key to partition, it could for example be
feeding
into
a downstream
application which *does* need to maintain state, and which
would
wind
up
"losing" the history for
any keys that changed partition.

I kind of felt like opening this feature up to stateless
applications
would
be asking for trouble and
make it too easy for people to shoot themselves in the foot.
That
said,
I'm
open to discussion on
this point if you feel like the benefits here outweigh the
risks.
I'm
also
happy to consider modifying
the API so that it could naturally be expanded to include
stateless
applications  in the future, even
if we decide against allowing that use case in the first
iteration
of
the
feature.

Thoughts?

Sophie

On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy <
c...@littlehorse.io>
wrote:

Sophie,

Thank you for the KIP! Choosing the number of partitions in
a
Streams
app
is a tricky task because of how difficult it is to
re-partition;
I'm
glad
you're working on an improvement. I've got two questions:

First, `StaticStreamsPartitioner` is an interface that we
(Streams
users)
must implement, I'm trying to understand how it would work.
For
example,
let's say there's some point in time 'T' before which we
have 1
partition.
Then we decide to increase the partition count to 2 at time
T.
From
my
understanding, all keys that had passed through the Streams
app
before
time
T must end up on partition 1 if they appear again in the
input
topics;
but
any new keys are allowed to be sent to partition 2. Is that
correct?
And
(pardon the naive question) how is this achieved without
keeping
track
of
all keys that have been seen at any point?

Secondly, will this feature work with applications that use
interactive
queries?

Thank you very much,
Colt McNealy
*Founder, LittleHorse.io*


On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

Hey all,

I'd like to propose a new autoscaling feature for Kafka
Streams
applications which can follow the constraint of static
partitioning.
For
further details please refer to the KIP document:













https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams

This feature will be targeted for 3.4 but may not be fully
implemented
until the following release, 3.5.

Please give this a read and let me know what you think!

Cheers,
Sophie

















Reply via email to