Hi Jun, please add me to the invitation as well.  If this is happening near Palo Alto, let me know if I can join in person. Thanks.

-Ray

On 4/4/18 1:34 PM, Jun Rao wrote:
Hi, Jan, Dong, John, Guozhang,

Perhaps it will be useful to have a KIP meeting to discuss this together as
a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out
an invite to the mailing list.

Thanks,

Jun


On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Want to quickly step in here again because it is going places again.

The last part of the discussion is just a pain to read and completely
diverged from what I suggested without making the reasons clear to me.

I don't know why this happens.... here are my comments anyway.

@Guozhang: That Streams is working on automatic creating
copartition-usuable topics: great for streams, has literally nothing todo
with the KIP as we want to grow the
input topic. Everyone can reshuffle rel. easily but that is not what we
need todo, we need to grow the topic in question. After streams
automatically reshuffled the input topic
still has the same size and it didn't help a bit. I fail to see why this
is relevant. What am i missing here?

@Dong
I am still on the position that the current proposal brings us into the
wrong direction. Especially introducing PartitionKeyRebalanceListener
 From this point we can never move away to proper state full handling
without completely deprecating this creature from hell again.
Linear hashing is not the optimising step we have todo here. An interface
that when a topic is a topic its always the same even after it had
grown or shrunk is important. So from my POV I have major concerns that
this KIP is benefitial in its current state.

What is it that makes everyone so addicted to the idea of linear hashing?
not attractive at all for me.
And with statefull consumers still a complete mess. Why not stick with the
Kappa architecture???





On 03.04.2018 17:38, Dong Lin wrote:

Hey John,

Thanks much for your comments!!

I have yet to go through the emails of John/Jun/Guozhang in detail. But
let
me present my idea for how to minimize the delay for state loading for
stream use-case.

For ease of understanding, let's assume that the initial partition number
of input topics and change log topic are both 10. And initial number of
stream processor is also 10. If we only increase initial partition number
of input topics to 15 without changing number of stream processor, the
current KIP already guarantees in-order delivery and no state needs to be
moved between consumers for stream use-case. Next, let's say we want to
increase the number of processor to expand the processing capacity for
stream use-case. This requires us to move state between processors which
will take time. Our goal is to minimize the impact (i.e. delay) for
processing while we increase the number of processors.

Note that stream processor generally includes both consumer and producer.
In addition to consume from the input topic, consumer may also need to
consume from change log topic on startup for recovery. And producer may
produce state to the change log topic.


The solution will include the following steps:
1) Increase partition number of the input topic from 10 to 15. Since the
messages with the same key will still go to the same consume before and
after the partition expansion, this step can be done without having to
move
state between processors.

2) Increase partition number of the change log topic from 10 to 15. Note
that this step can also be done without impacting existing workflow. After
we increase partition number of the change log topic, key space may split
and some key will be produced to the newly-added partition. But the same
key will still go to the same processor (i.e. consumer) before and after
the partition. Thus this step can also be done without having to move
state
between processors.

3) Now, let's add 5 new consumers whose groupId is different from the
existing processor's groupId. Thus these new consumers will not impact
existing workflow. Each of these new consumers should consume two
partitions from the earliest offset, where these two partitions are the
same partitions that will be consumed if the consumers have the same
groupId as the existing processor's groupId. For example, the first of the
five consumers will consume partition 0 and partition 10. The purpose of
these consumers is to rebuild the state (e.g. RocksDB) for the processors
in advance. Also note that, by design of the current KIP, each consume
will
consume the existing partition of the change log topic up to the offset
before the partition expansion. Then they will only need to consume the
state of the new partition of the change log topic.

4) After consumers have caught up in step 3), we should stop these
consumers and add 5 new processors to the stream processing job. These 5
new processors should run in the same location as the previous 5 consumers
to re-use the state (e.g. RocksDB). And these processors' consumers should
consume partitions of the change log topic from the committed offset the
previous 5 consumers so that no state is missed.

One important trick to note here is that, the mapping from partition to
consumer should also use linear hashing. And we need to remember the
initial number of processors in the job, 10 in this example, and use this
number in the linear hashing algorithm. This is pretty much the same as
how
we use linear hashing to map key to partition. In this case, we get an
identity map from partition -> processor, for both input topic and the
change log topic. For example, processor 12 will consume partition 12 of
the input topic and produce state to the partition 12 of the change log
topic.

There are a few important properties of this solution to note:

- We can increase the number of partitions for input topic and the change
log topic in any order asynchronously.
- The expansion of the processors in a given job in step 4) only requires
the step 3) for the same job. It does not require coordination across
different jobs for step 3) and 4). Thus different jobs can independently
expand there capacity without waiting for each other.
- The logic for 1) and 2) is already supported in the current KIP. The
logic for 3) and 4) appears to be independent of the core Kafka logic and
can be implemented separately outside core Kafka. Thus the current KIP is
probably sufficient after we agree on the efficiency and the correctness
of
the solution. We can have a separate KIP for Kafka Stream to support 3)
and
4).


Cheers,
Dong


On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:

Hey guys, just sharing my two cents here (I promise it will be shorter
than
John's article :).

0. Just to quickly recap, the main discussion point now is how to support
"key partitioning preservation" (John's #4 in topic characteristics
above)
beyond the "single-key ordering preservation" that KIP-253 was originally
proposed to maintain (John's #6 above).

1. From the streams project, we are actively working on improving the
elastic scalability of the library. One of the key features is to
decouple
the input topics from the parallelism model of Streams: i.e. not
enforcing
the topic to be partitioned by the key, not enforcing joining topics to
be
co-partitioned, not relying the number of parallel tasks on the input
topic
partitions. This can be achieved by re-shuffling on the input topics to
make sure key-partitioning / co-partitioning on the internal topics. Note
the re-shuffling task is purely stateless and hence does not require "key
partitioning preservation". Operational-wise it is similar to the
"creating
a new topic with new number of partitions, pipe the data to the new topic
and cut over consumers from old topics" idea, just that users can
optionally let Streams to handle such rather than doing it manually
themselves. There are a few more details on that regard but I will skip
since they are not directly related to this discussion.

2. Assuming that 1) above is done, then the only topics involved in the
scaling events are all input topics. For these topics the only producers
/
consumers of these topics are controlled by Streams clients themselves,
and
hence achieving "key partitioning preservation" is simpler than
non-Streams
scenarios: consumers know the partitioning scheme that producers are
using,
so that for their stateful operations it is doable to split the local
state
stores accordingly or execute backfilling on its own. Of course, if we
decide to do server-side backfilling, it can still help Streams to
directly
rely on that functionality.

3. As John mentioned, another way inside Streams is to do
over-partitioning
on all internal topics; then with 1) Streams would not rely on KIP-253 at
all. But personally I'd like to avoid it if possible to reduce Kafka side
footprint: say we overpartition each input topic up to 1k, with a
reasonable sized stateful topology it can still contribute to tens of
thousands of topics to the topic partition capacity of a single cluster.

4. Summing up 1/2/3, I think we should focus more on non-Streams users
writing their stateful computations with local states, and think whether
/
how we could enable "key partitioning preservation" for them easily, than
to think heavily for Streams library. People may have different opinion
on
how common of a usage pattern it is (I think Jun might be suggesting that
for DIY apps people may more likely use remote states so that it is not a
problem for them). My opinion is that for non-Streams users such usage
pattern could still be large (think: if you are piping data from Kafka to
an external data storage which has single-writer requirements for each
single shard, even though it is not a stateful computational application
it
may still require "key partitioning preservation"), so I prefer to have
backfilling in our KIP than only exposing the API for expansion and
requires consumers to have pre-knowledge of the producer's partitioning
scheme.



Guozhang



On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io> wrote:

Hey Dong,
Congrats on becoming a committer!!!

Since I just sent a novel-length email, I'll try and keep this one brief

;)

Regarding producer coordination, I'll grant that in that case, producers
may coordinate among themselves to produce into the same topic or to
produce co-partitioned topics. Nothing in KStreams or the Kafka
ecosystem
in general requires such coordination for correctness or in fact for any
optional features, though, so I would not say that we require producer
coordination of partition logic. If producers currently coordinate, it's
completely optional and their own choice.

Regarding the portability of partition algorithms, my observation is
that
systems requiring independent implementations of the same algorithm with
100% correctness are a large source of risk and also a burden on those

who

have to maintain them. If people could flawlessly implement algorithms
in
actual software, the world would be a wonderful place indeed! For a

system

as important and widespread as Kafka, I would recommend restricting
limiting such requirements as aggressively as possible.

I'd agree that we can always revisit decisions like allowing arbitrary
partition functions, but of course, we shouldn't do that in a vacuum.

That

feels like the kind of thing we'd need to proactively seek guidance from
the users list about. I do think that the general approach of saying
that
"if you use a custom partitioner, you cannot do partition expansion" is
very reasonable (but I don't think we need to go that far with the

current

proposal). It's similar to my statement in my email to Jun that in
principle KStreams doesn't *need* backfill, we only need it if we want
to
employ partition expansion.

I reckon that the main motivation for backfill is to support KStreams
use
cases and also any other use cases involving stateful consumers.

Thanks for your response, and congrats again!
-John


On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> wrote:

Hey John,
Great! Thanks for all the comment. It seems that we agree that the

current

KIP is in good shape for core Kafka. IMO, what we have been discussing

in
the recent email exchanges is mostly about the second step, i.e. how to
address problem for the stream use-case (or stateful processing in
general).

I will comment inline.




On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io>

wrote:
Thanks for the response, Dong.
Here are my answers to your questions:

- "Asking producers and consumers, or even two different producers,

to
share code like the partition function is a pretty huge ask. What
if
they
are using different languages?". It seems that today we already
require
different producer's to use the same hash function -- otherwise
messages
with the same key will go to different partitions of the same topic
which
may cause problem for downstream consumption. So not sure if it
adds
any
more constraint by assuming consumers know the hash function of
producer.
Could you explain more why user would want to use a cusmtom
partition
function? Maybe we can check if this is something that can be
supported
in
the default Kafka hash function. Also, can you explain more why it

is
difficuilt to implement the same hash function in different
languages?
Sorry, I meant two different producers as in producers to two

different
topics. This was in response to the suggestion that we already
require
coordination among producers to different topics in order to achieve
co-partitioning. I was saying that we do not (and should not).

It is probably common for producers of different team to produce

message
to

the same topic. In order to ensure that messages with the same key go

to
same partition, we need producers of different team to share the same
partition algorithm, which by definition requires coordination among
producers of different teams in an organization. Even for producers of
different topics, it may be common to require producers to use the same
partition algorithm in order to join two topics for stream processing.

Does

this make it reasonable to say we already require coordination across
producers?


By design, consumers are currently ignorant of the partitioning
scheme.
It
suffices to trust that the producer has partitioned the topic by key,

if
they claim to have done so. If you don't trust that, or even if you
just
need some other partitioning scheme, then you must re-partition it
yourself. Nothing we're discussing can or should change that. The

value
of
backfill is that it preserves the ability for consumers to avoid
re-partitioning before consuming, in the case where they don't need

to
today.
Regarding shared "hash functions", note that it's a bit inaccurate to
talk

about the "hash function" of the producer. Properly speaking, the

producer

has only a "partition function". We do not know that it is a hash.

The
producer can use any method at their disposal to assign a partition
to
a

record. The partition function obviously may we written in any
programming

language, so in general it's not something that can be shared around
without a formal spec or the ability to execute arbitrary executables

in
arbitrary runtime environments.
Yeah it is probably better to say partition algorithm. I guess it
should
not be difficult to implement same partition algorithms in different
languages, right? Yes we would need a formal specification of the

default
partition algorithm in the producer. I think that can be documented as
part

of the producer interface.


Why would a producer want a custom partition function? I don't
know...
why
did we design the interface so that our users can provide one? In

general,

such systems provide custom partitioners because some data sets may

be
unbalanced under the default or because they can provide some
interesting
functionality built on top of the partitioning scheme, etc. Having
provided

this ability, I don't know why we would remove it.

Yeah it is reasonable to assume that there was reason to support
custom
partition function in producer. On the other hand it may also be

reasonable

to revisit this interface and discuss whether we actually need to

support
custom partition function. If we don't have a good reason, we can
choose
not to support custom partition function in this KIP in a backward
compatible manner, i.e. user can still use custom partition function

but
they would not get the benefit of in-order delivery when there is
partition

expansion. What do you think?


- Besides the assumption that consumer needs to share the hash
function
of
producer, is there other organization overhead of the proposal in
the
current KIP?
It wasn't clear to me that KIP-253 currently required the producer
and
consumer to share the partition function, or in fact that it had a
hard
requirement to abandon the general partition function and use a
linear
hash
function instead.

In my reading, there is a requirement to track the metadata about
what
partitions split into what other partitions during an expansion
operation.

If the partition function is linear, this is easy. If not, you can

always
just record that all old partitions split into all new partitions.
This
has
the effect of forcing all consumers to wait until the old epoch is
completely consumed before starting on the new epoch. But this may

be a
reasonable tradeoff, and it doesn't otherwise alter your design.
You only mention the consumer needing to know that the partition

function
is linear, not what the actual function is, so I don't think your
design
actually calls for sharing the function. Plus, really all the
consumer
needs is the metadata about what old-epoch partitions to wait for
before
consuming a new-epoch partition. This information is directly
captured
in

metadata, so I don't think it actually even cares whether the
partition
function is linear or not.
You are right that the current KIP does not mention it. My comment
related

to the partition function coordination was related to support the
stream-use case which we have been discussing so far.


So, no, I really think KIP-253 is in good shape. I was really more
talking

about the part of this thread that's outside of KIP-253's scope,

namely,
creating the possibility of backfilling partitions after expansion.
Great! Can you also confirm that the main motivation for backfilling
partitions after expansion is to support the stream use-case?


- Currently producer can forget about the message that has been
acknowledged by the broker. Thus the producer probably does not

know
most
of the exiting messages in topic, including those messages produced
by
other producers. We can have the owner of the producer to
split+backfill.
In my opion it will be a new program that wraps around the existing
producer and consumer classes.

This sounds fine by me!
Really, I was just emphasizing that the part of the organization that
produces a topic shouldn't have to export their partition function to

the
part(s) of the organization (or other organizations) that consume the
topic. Whether the backfill operation goes into the Producer

interface
is

secondary, I think.
- Regarding point 5. The argument is in favor of the split+backfill

but
for
changelog topic. And it intends to address the problem for stream
use-case

in general. In this KIP we will provide interface (i.e.
PartitionKeyRebalanceListener in the KIP) to be used by sream

use-case
and
the goal is that user can flush/re-consume the state as part of the
interface implementation regardless of whether there is change log

topic.
Maybe you are suggesting that the main reason to do split+backfill
of
input
topic is to support log compacted topics? You mentioned in Point 1

that
log
compacted topics is out of the scope of this KIP. Maybe I could

understand

your position better. Regarding Jan's proposal to split partitions

with
backfill, do you think this should replace the proposal in the
existing
KIP, or do you think this is something that we should do in
addition
to

the
existing KIP?

I think that interface is a good/necessary component of KIP-253.
I personally (FWIW) feel that KIP-253 is appropriately scoped, but I

do
think its utility will be limited unless there is a later KIP
offering
backfill. But, maybe unlike Jan, I think it makes sense to try and
tackle
the ordering problem independently of backfill, so I'm in support of
the
current KIP.
- Regarding point 6. I guess we can agree that it is better not to

have
the
performance overhread of copying the input data. Before we discuss
more
on
whether the performance overhead is acceptable or not, I am trying

to
figure out what is the benefit of introducing this overhread. You
mentioned

that the benefit is the loose organizational coupling. By

"organizational
coupling", are you referring to the requirement that consumer needs
to
know
the hash function of producer? If so, maybe we can discuss the

use-case
of
custom partiton function and see whether we can find a way to

support
such
use-case without having to copy the input data.

I'm not too sure about what an "input" is in this sense, since we are
just

talking about topics. Actually the point I was making there is that

AKAICT

the performance overhead of a backfill is less than any other option,
assuming you split partitions rarely.

By "input" I was referring to source Kafka topic of a stream
processing
job.


Separately, yes, "organizational coupling" increases if producers and
consumers have to share code, such as the partition function. This

would
not be the case if producers could only pick from a menu of a few
well-known partition functions, but I think this is a poor tradeoff.

Maybe we can revisit the custom partition function and see whether we
actually need it? Otherwise, I am concerned that every user will pay

the
overhead of data movement to support something that was not really
needed
for most users.

To me, this is two strong arguments in favor of backfill being less
expensive than no backfill, but again, I think that particular debate

comes

after KIP-253, so I don't want to create the impression of opposition

to
your proposal.

Finally, to respond to a new email I just noticed:

BTW, here is my understanding of the scope of this KIP. We want to
allow
consumers to always consume messages with the same key from the
same
producer in the order they are produced. And we need to provide a
way
for
stream use-case to be able to flush/load state when messages with
the
same
key are migrated between consumers. In addition to ensuring that

this
goal
is correctly supported, we should do our best to keep the

performance
and
organization overhead of this KIP as low as possible.
I think we're on the same page there! In fact, I would generalize a
little

more and say that the mechanism you've designed provides *all

consumers*
the ability "to flush/load state when messages with the same key are
migrated between consumers", not just Streams.

Thanks for all the comment!

Thanks for the discussion,
-John



On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com>

wrote:
Hey John,
Thanks much for the detailed comments. Here are my thoughts:

- The need to delete messages from log compacted topics is mainly

for
performance (e.g. storage space) optimization than for correctness
for
this
KIP. I agree that we probably don't need to focus on this in our

discussion

since it is mostly for performance optimization.

- "Asking producers and consumers, or even two different producers,

to
share code like the partition function is a pretty huge ask. What
if
they
are using different languages?". It seems that today we already
require
different producer's to use the same hash function -- otherwise
messages
with the same key will go to different partitions of the same topic
which
may cause problem for downstream consumption. So not sure if it
adds
any
more constraint by assuming consumers know the hash function of
producer.
Could you explain more why user would want to use a cusmtom
partition
function? Maybe we can check if this is something that can be
supported
in
the default Kafka hash function. Also, can you explain more why it

is
difficuilt to implement the same hash function in different
languages?
- Besides the assumption that consumer needs to share the hash
function
of
producer, is there other organization overhead of the proposal in

the
current KIP?
- Currently producer can forget about the message that has been
acknowledged by the broker. Thus the producer probably does not

know
most
of the exiting messages in topic, including those messages produced
by
other producers. We can have the owner of the producer to
split+backfill.
In my opion it will be a new program that wraps around the existing
producer and consumer classes.

- Regarding point 5. The argument is in favor of the split+backfill

but
for
changelog topic. And it intends to address the problem for stream

use-case

in general. In this KIP we will provide interface (i.e.
PartitionKeyRebalanceListener in the KIP) to be used by sream

use-case
and
the goal is that user can flush/re-consume the state as part of the
interface implementation regardless of whether there is change log

topic.
Maybe you are suggesting that the main reason to do split+backfill
of
input
topic is to support log compacted topics? You mentioned in Point 1

that
log
compacted topics is out of the scope of this KIP. Maybe I could

understand

your position better. Regarding Jan's proposal to split partitions

with
backfill, do you think this should replace the proposal in the
existing
KIP, or do you think this is something that we should do in
addition
to

the
existing KIP?

- Regarding point 6. I guess we can agree that it is better not to

have
the
performance overhread of copying the input data. Before we discuss

more
on
whether the performance overhead is acceptable or not, I am trying

to
figure out what is the benefit of introducing this overhread. You
mentioned

that the benefit is the loose organizational coupling. By

"organizational
coupling", are you referring to the requirement that consumer needs
to
know
the hash function of producer? If so, maybe we can discuss the

use-case
of
custom partiton function and see whether we can find a way to

support
such
use-case without having to copy the input data.

Thanks,
Dong


On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io>

wrote:

Hey Dong and Jun,
Thanks for the thoughtful responses. If you don't mind, I'll mix

my
replies
together to try for a coherent response. I'm not too familiar

with
mailing-list etiquette, though.
I'm going to keep numbering my points because it makes it easy

for
you
all
to respond.

Point 1:
As I read it, KIP-253 is *just* about properly fencing the

producers
and
consumers so that you preserve the correct ordering of records
during
partition expansion. This is clearly necessary regardless of
anything
else
we discuss. I think this whole discussion about backfill,

consumers,
streams, etc., is beyond the scope of KIP-253. But it would be
cumbersome
to start a new thread at this point.
I had missed KIP-253's Proposed Change #9 among all the

details...
I

think
this is a nice addition to the proposal. One thought is that it's

actually

irrelevant whether the hash function is linear. This is simply an

algorithm

for moving a key from one partition to another, so the type of

hash
function need not be a precondition. In fact, it also doesn't
matter
whether the topic is compacted or not, the algorithm works
regardless.
I think this is a good algorithm to keep in mind, as it might
solve a
variety of problems, but it does have a downside: that the
producer
won't
know whether or not K1 was actually in P1, it just knows that K1
was
in

P1's keyspace before the new epoch. Therefore, it will have to
pessimistically send (K1,null) to P1 just in case. But the next

time
K1

comes along, the producer *also* won't remember that it already
retracted
K1 from P1, so it will have to send (K1,null) *again*. By
extension,
every
time the producer sends to P2, it will also have to send a

tombstone
to

P1,
which is a pretty big burden. To make the situation worse, if

there
is
a

second split, say P2 becomes P2 and P3, then any key Kx belonging
to
P3

will also have to be retracted from P2 *and* P1, since the
producer
can't
know whether Kx had been last written to P2 or P1. Over a long
period
of
time, this clearly becomes a issue, as the producer must send an
arbitrary

number of retractions along with every update.

In contrast, the proposed backfill operation has an end, and

after
it

ends,
everyone can afford to forget that there ever was a different

partition
layout.
Really, though, figuring out how to split compacted topics is

beyond
the
scope of KIP-253, so I'm not sure #9 really even needs to be in
this
KIP...
We do need in-order delivery during partition expansion. It would

be
fine
by me to say that you *cannot* expand partitions of a
log-compacted
topic
and call it a day. I think it would be better to tackle that in
another
KIP.

Point 2:
Regarding whether the consumer re-shuffles its inputs, this is

always
on
the table; any consumer who wants to re-shuffle its input is free
to
do

so.
But this is currently not required. It's just that the current

high-level
story with Kafka encourages the use of partitions as a unit of
concurrency.

As long as consumers are single-threaded, they can happily

consume
a

single
partition without concurrency control of any kind. This is a key

aspect
to
this system that lets folks design high-throughput systems on top

of
it

surprisingly easily. If all consumers were instead
encouraged/required
to

implement a repartition of their own, then the consumer becomes
significantly more complex, requiring either the consumer to

first
produce
to its own intermediate repartition topic or to ensure that

consumer
threads have a reliable, high-bandwith channel of communication
with
every
other consumer thread.

Either of those tradeoffs may be reasonable for a particular user

of
Kafka,
but I don't know if we're in a position to say that they are

reasonable
for
*every* user of Kafka.


Point 3:
Regarding Jun's point about this use case, "(3) stateful and

maintaining
the
states in a local store", I agree that they may use a framework

*like*
Kafka Streams, but that is not the same as using Kafka Streams.
This
is

why
I think it's better to solve it in Core: because it is then

solved
for
KStreams and also for everything else that facilitates local
state
maintenance. To me, Streams is a member of the category of
"stream
processing frameworks", which is itself a subcategory of "things
requiring

local state maintenence". I'm not sure if it makes sense to


Reply via email to