If we want to maintain ordering than for a pluggable producer-side
partitioner, it needs to coherent with the linear hashing scheme, i.e. we
effectively restrict what kind of customization users can do. However such
restrictions are hard to enforce programmatically unless we change the API,
so maybe we can only document / educate users to do that when they do
customize the partitioner after the upgrade.


Guozhang

On Wed, Mar 7, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hi Dong,
>
> What is not clear to me is how the use of linear hashing affects the
> partitioning logic in the producer, which the user is currently allowed to
> customize through the Partitioner interface. It sounds like we are
> effectively deprecating that interface since we will only provide ordering
> guarantees across partition changes if linear hashing is used. Maybe you
> can clarify whether that is the intention? Basically I am wondering how
> much additional work it would be to leave that partitioning logic pluggable
> and whether it is worthwhile to do so. One potential downside I can think
> of is that it may complicate compaction, but we don't have a concrete
> proposal for handling that at the moment anyway, so it's hard to say.
>
> Thanks,
> Jason
>
>
> On Tue, Mar 6, 2018 at 3:45 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Mattias,
> >
> > Regarding your comment "If it would be time-delay based, it might be
> > problematic
> > for Kafka Streams: if we get the information that the new input
> partitions
> > are available for producing, we need to enable the new changelog
> partitions
> > for producing, too. If those would not be available yet, because the
> > time-delay did not trigger yet, it would be problematic to avoid
> > crashing.", could you just enable the changelog topic to write to its new
> > partitions immediately?  The input topic can be configured with a delay
> in
> > writing to the new partitions. Initially, there won't be new data
> produced
> > into the newly added partitions in the input topic. However, we could
> > prebuild the state for the new input partition and write the state
> changes
> > to the corresponding new partitions in the changelog topic.
> >
> > Hi, Jan,
> >
> > For a compacted topic, garbage collecting the old keys in the existing
> > partitions after partition expansion can be tricky as your pointed out. A
> > few options here. (a) Let brokers exchange keys across brokers during
> > compaction. This will add complexity on the broker side. (b) Build an
> > external tool that scans the compacted topic and drop the prefix of a
> > partition if all records in the prefix are removable. The admin can then
> > run this tool when the unneeded space needs to be reclaimed. (c) Don't
> > support partition change in a compacted topic. This might be ok since
> most
> > compacted topics are not high volume.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi everyone,
> > >
> > > Thanks for all the comments! It appears that everyone prefers linear
> > > hashing because it reduces the amount of state that needs to be moved
> > > between consumers (for stream processing). The KIP has been updated to
> > use
> > > linear hashing.
> > >
> > > Regarding the migration endeavor: it seems that migrating producer
> > library
> > > to use linear hashing should be pretty straightforward without
> > > much operational endeavor. If we don't upgrade client library to use
> this
> > > KIP, we can not support in-order delivery after partition is changed
> > > anyway. Suppose we upgrade client library to use this KIP, if partition
> > > number is not changed, the key -> partition mapping will be exactly the
> > > same as it is now because it is still determined using
> murmur_hash(key) %
> > > original_partition_num. In other words, this change is backward
> > compatible.
> > >
> > > Regarding the load distribution: if we use linear hashing, the load may
> > be
> > > unevenly distributed because those partitions which are not split may
> > > receive twice as much traffic as other partitions that are split. This
> > > issue can be mitigated by creating topic with partitions that are
> several
> > > times the number of consumers. And there will be no imbalance if the
> > > partition number is always doubled. So this imbalance seems acceptable.
> > >
> > > Regarding storing the partition strategy as per-topic config: It seems
> > not
> > > necessary since we can still use murmur_hash as the default hash
> function
> > > and additionally apply the linear hashing algorithm if the partition
> > number
> > > has increased. Not sure if there is any use-case for producer to use a
> > > different hash function. Jason, can you check if there is some use-case
> > > that I missed for using the per-topic partition strategy?
> > >
> > > Regarding how to reduce latency (due to state store/load) in stream
> > > processing consumer when partition number changes: I need to read the
> > Kafka
> > > Stream code to understand how Kafka Stream currently migrate state
> > between
> > > consumers when the application is added/removed for a given job. I will
> > > reply after I finish reading the documentation and code.
> > >
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Great discussion. I think I'm wondering whether we can continue to
> > leave
> > > > Kafka agnostic to the partitioning strategy. The challenge is
> > > communicating
> > > > the partitioning logic from producers to consumers so that the
> > > dependencies
> > > > between each epoch can be determined. For the sake of discussion,
> > imagine
> > > > you did something like the following:
> > > >
> > > > 1. The name (and perhaps version) of a partitioning strategy is
> stored
> > in
> > > > topic configuration when a topic is created.
> > > > 2. The producer looks up the partitioning strategy before writing to
> a
> > > > topic and includes it in the produce request (for fencing). If it
> > doesn't
> > > > have an implementation for the configured strategy, it fails.
> > > > 3. The consumer also looks up the partitioning strategy and uses it
> to
> > > > determine dependencies when reading a new epoch. It could either fail
> > or
> > > > make the most conservative dependency assumptions if it doesn't know
> > how
> > > to
> > > > implement the partitioning strategy. For the consumer, the new
> > interface
> > > > might look something like this:
> > > >
> > > > // Return the partition dependencies following an epoch bump
> > > > Map<Integer, List<Integer>> dependencies(int
> > > numPartitionsBeforeEpochBump,
> > > > int numPartitionsAfterEpochBump)
> > > >
> > > > The unordered case then is just a particular implementation which
> never
> > > has
> > > > any epoch dependencies. To implement this, we would need some way for
> > the
> > > > consumer to find out how many partitions there were in each epoch,
> but
> > > > maybe that's not too unreasonable.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > > On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
> jan.filip...@trivago.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Dong
> > > > >
> > > > > thank you very much for your questions.
> > > > >
> > > > > regarding the time spend copying data across:
> > > > > It is correct that copying data from a topic with one partition
> > mapping
> > > > to
> > > > > a topic with a different partition mapping takes way longer than we
> > can
> > > > > stop producers. Tens of minutes is a very optimistic estimate here.
> > > Many
> > > > > people can not afford copy full steam and therefore will have some
> > rate
> > > > > limiting in place, this can bump the timespan into the day's. The
> > good
> > > > part
> > > > > is that the vast majority of the data can be copied while the
> > producers
> > > > are
> > > > > still going. One can then, piggyback the consumers ontop of this
> > > > timeframe,
> > > > > by the method mentioned (provide them an mapping from their old
> > offsets
> > > > to
> > > > > new offsets in their repartitioned topics. In that way we separate
> > > > > migration of consumers from migration of producers (decoupling
> these
> > is
> > > > > what kafka is strongest at). The time to actually swap over the
> > > producers
> > > > > should be kept minimal by ensuring that when a swap attempt is
> > started
> > > > the
> > > > > consumer copying over should be very close to the log end and is
> > > expected
> > > > > to finish within the next fetch. The operation should have a
> time-out
> > > and
> > > > > should be "reattemtable".
> > > > >
> > > > > Importance of logcompaction:
> > > > > If a producer produces key A, to partiton 0, its forever gonna be
> > > there,
> > > > > unless it gets deleted. The record might sit in there for years. A
> > new
> > > > > producer started with the new partitions will fail to delete the
> > record
> > > > in
> > > > > the correct partition. Th record will be there forever and one can
> > not
> > > > > reliable bootstrap new consumers. I cannot see how linear hashing
> can
> > > > solve
> > > > > this.
> > > > >
> > > > > Regarding your skipping of userland copying:
> > > > > 100%, copying the data across in userland is, as far as i can see,
> > > only a
> > > > > usecase for log compacted topics. Even for logcompaction +
> retentions
> > > it
> > > > > should only be opt-in. Why did I bring it up? I think log
> compaction
> > > is a
> > > > > very important feature to really embrace kafka as a "data
> plattform".
> > > The
> > > > > point I also want to make is that copying data this way is
> completely
> > > > > inline with the kafka architecture. it only consists of reading and
> > > > writing
> > > > > to topics.
> > > > >
> > > > > I hope it clarifies more why I think we should aim for more than
> the
> > > > > current KIP. I fear that once the KIP is done not much more effort
> > will
> > > > be
> > > > > taken.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On 04.03.2018 02:28, Dong Lin wrote:
> > > > >
> > > > >> Hey Jan,
> > > > >>
> > > > >> In the current proposal, the consumer will be blocked on waiting
> for
> > > > other
> > > > >> consumers of the group to consume up to a given offset. In most
> > cases,
> > > > all
> > > > >> consumers should be close to the LEO of the partitions when the
> > > > partition
> > > > >> expansion happens. Thus the time waiting should not be long e.g.
> on
> > > the
> > > > >> order of seconds. On the other hand, it may take a long time to
> wait
> > > for
> > > > >> the entire partition to be copied -- the amount of time is
> > > proportional
> > > > to
> > > > >> the amount of existing data in the partition, which can take tens
> of
> > > > >> minutes. So the amount of time that we stop consumers may not be
> on
> > > the
> > > > >> same order of magnitude.
> > > > >>
> > > > >> If we can implement this suggestion without copying data over in
> > purse
> > > > >> userland, it will be much more valuable. Do you have ideas on how
> > this
> > > > can
> > > > >> be done?
> > > > >>
> > > > >> Not sure why the current KIP not help people who depend on log
> > > > compaction.
> > > > >> Could you elaborate more on this point?
> > > > >>
> > > > >> Thanks,
> > > > >> Dong
> > > > >>
> > > > >> On Wed, Feb 28, 2018 at 10:55 PM, Jan
> Filipiak<Jan.Filipiak@trivago.
> > > com
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >> Hi Dong,
> > > > >>>
> > > > >>> I tried to focus on what the steps are one can currently perform
> to
> > > > >>> expand
> > > > >>> or shrink a keyed topic while maintaining a top notch semantics.
> > > > >>> I can understand that there might be confusion about "stopping
> the
> > > > >>> consumer". It is exactly the same as proposed in the KIP. there
> > needs
> > > > to
> > > > >>> be
> > > > >>> a time the producers agree on the new partitioning. The extra
> > > > semantics I
> > > > >>> want to put in there is that we have a possibility to wait until
> > all
> > > > the
> > > > >>> existing data
> > > > >>> is copied over into the new partitioning scheme. When I say
> > stopping
> > > I
> > > > >>> think more of having a memory barrier that ensures the ordering.
> I
> > am
> > > > >>> still
> > > > >>> aming for latencies  on the scale of leader failovers.
> > > > >>>
> > > > >>> Consumers have to explicitly adapt the new partitioning scheme in
> > the
> > > > >>> above scenario. The reason is that in these cases where you are
> > > > dependent
> > > > >>> on a particular partitioning scheme, you also have other topics
> > that
> > > > have
> > > > >>> co-partition enforcements or the kind -frequently. Therefore all
> > your
> > > > >>> other
> > > > >>> input topics might need to grow accordingly.
> > > > >>>
> > > > >>>
> > > > >>> What I was suggesting was to streamline all these operations as
> > best
> > > as
> > > > >>> possible to have "real" partition grow and shrinkage going on.
> > > > Migrating
> > > > >>> the producers to a new partitioning scheme can be much more
> > > streamlined
> > > > >>> with proper broker support for this. Migrating consumer is a step
> > > that
> > > > >>> might be made completly unnecessary if - for example streams -
> > takes
> > > > the
> > > > >>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect
> > > > consumers
> > > > >>> and other consumers should be fine anyways.
> > > > >>>
> > > > >>> I hope this makes more clear where I was aiming at. The rest
> needs
> > to
> > > > be
> > > > >>> figured out. The only danger i see is that when we are
> introducing
> > > this
> > > > >>> feature as supposed in the KIP, it wont help any people depending
> > on
> > > > log
> > > > >>> compaction.
> > > > >>>
> > > > >>> The other thing I wanted to mention is that I believe the current
> > > > >>> suggestion (without copying data over) can be implemented in pure
> > > > >>> userland
> > > > >>> with a custom partitioner and a small feedbackloop from
> > > ProduceResponse
> > > > >>> =>
> > > > >>> Partitionier in coorporation with a change management system.
> > > > >>>
> > > > >>> Best Jan
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On 28.02.2018 07:13, Dong Lin wrote:
> > > > >>>
> > > > >>> Hey Jan,
> > > > >>>>
> > > > >>>> I am not sure if it is acceptable for producer to be stopped
> for a
> > > > >>>> while,
> > > > >>>> particularly for online application which requires low latency.
> I
> > am
> > > > >>>> also
> > > > >>>> not sure how consumers can switch to a new topic. Does user
> > > > application
> > > > >>>> needs to explicitly specify a different topic for
> > producer/consumer
> > > to
> > > > >>>> subscribe to? It will be helpful for discussion if you can
> provide
> > > > more
> > > > >>>> detail on the interface change for this solution.
> > > > >>>>
> > > > >>>> Thanks,
> > > > >>>> Dong
> > > > >>>>
> > > > >>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
> > Filipiak<Jan.Filipiak@trivago.
> > > > com
> > > > >>>> >
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> Hi,
> > > > >>>>
> > > > >>>>> just want to throw my though in. In general the functionality
> is
> > > very
> > > > >>>>> usefull, we should though not try to find the architecture to
> > hard
> > > > >>>>> while
> > > > >>>>> implementing.
> > > > >>>>>
> > > > >>>>> The manual steps would be to
> > > > >>>>>
> > > > >>>>> create a new topic
> > > > >>>>> the mirrormake from the new old topic to the new topic
> > > > >>>>> wait for mirror making to catch up.
> > > > >>>>> then put the consumers onto the new topic
> > > > >>>>>       (having mirrormaker spit out a mapping from old offsets
> to
> > > new
> > > > >>>>> offsets:
> > > > >>>>>           if topic is increased by factor X there is gonna be a
> > > clean
> > > > >>>>> mapping from 1 offset in the old topic to X offsets in the new
> > > topic,
> > > > >>>>>           if there is no factor then there is no chance to
> > > generate a
> > > > >>>>> mapping that can be reasonable used for continuing)
> > > > >>>>>       make consumers stop at appropriate points and continue
> > > > >>>>> consumption
> > > > >>>>> with offsets from the mapping.
> > > > >>>>> have the producers stop for a minimal time.
> > > > >>>>> wait for mirrormaker to finish
> > > > >>>>> let producer produce with the new metadata.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Instead of implementing the approach suggest in the KIP which
> > will
> > > > >>>>> leave
> > > > >>>>> log compacted topic completely crumbled and unusable.
> > > > >>>>> I would much rather try to build infrastructure to support the
> > > > >>>>> mentioned
> > > > >>>>> above operations more smoothly.
> > > > >>>>> Especially having producers stop and use another topic is
> > difficult
> > > > and
> > > > >>>>> it would be nice if one can trigger "invalid metadata"
> exceptions
> > > for
> > > > >>>>> them
> > > > >>>>> and
> > > > >>>>> if one could give topics aliases so that their produces with
> the
> > > old
> > > > >>>>> topic
> > > > >>>>> will arrive in the new topic.
> > > > >>>>>
> > > > >>>>> The downsides are obvious I guess ( having the same data twice
> > for
> > > > the
> > > > >>>>> transition period, but kafka tends to scale well with
> datasize).
> > So
> > > > >>>>> its a
> > > > >>>>> nicer fit into the architecture.
> > > > >>>>>
> > > > >>>>> I further want to argument that the functionality by the KIP
> can
> > > > >>>>> completely be implementing in "userland" with a custom
> > partitioner
> > > > that
> > > > >>>>> handles the transition as needed. I would appreciate if someone
> > > could
> > > > >>>>> point
> > > > >>>>> out what a custom partitioner couldn't handle in this case?
> > > > >>>>>
> > > > >>>>> With the above approach, shrinking a topic becomes the same
> > steps.
> > > > >>>>> Without
> > > > >>>>> loosing keys in the discontinued partitions.
> > > > >>>>>
> > > > >>>>> Would love to hear what everyone thinks.
> > > > >>>>>
> > > > >>>>> Best Jan
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On 11.02.2018 00:35, Dong Lin wrote:
> > > > >>>>>
> > > > >>>>> Hi all,
> > > > >>>>>
> > > > >>>>>> I have created KIP-253: Support in-order message delivery with
> > > > >>>>>> partition
> > > > >>>>>> expansion. See
> > > > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
> > > > >>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion
> > > > >>>>>> .
> > > > >>>>>>
> > > > >>>>>> This KIP provides a way to allow messages of the same key from
> > the
> > > > >>>>>> same
> > > > >>>>>> producer to be consumed in the same order they are produced
> even
> > > if
> > > > we
> > > > >>>>>> expand partition of the topic.
> > > > >>>>>>
> > > > >>>>>> Thanks,
> > > > >>>>>> Dong
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to