Hi Jiangjie,

Thanks for entertaining my question so far.  Last question, I have is about
serialization of message key.  If the key de-serialization (Class) is not
present at the MM instance, then does it use raw byte hashcode to determine
the partition ?  How are you going to address the situation where key needs
to be de-serialization and get actual hashcode needs to be computed  ?.


Thanks,

Bhavesh

On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hi Bhavesh,
>
> Please see inline comments.
>
> Jiangjie (Becket) Qin
>
> On 1/29/15, 7:00 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> wrote:
>
> >Hi Jiangjie,
> >
> >Thanks for the input.
> >
> >a) Is MM will  producer ack will be attach to Producer Instance or per
> >topic.  Use case is that one instance of MM
> >needs to handle both strong ack and also ack=0 for some topic.  Or it
> >would
> >be better to set-up another instance of MM.
> The acks setting is producer level setting instead of topic level setting.
> In this case you probably need to set up another instance.
> >
> >b) Regarding TCP connections, Why does #producer instance attach to TCP
> >connection.  Is it possible to use Broker Connection TCP Pool, producer
> >will just checkout TCP connection  to Broker.  So, # of Producer Instance
> >does not correlation to Brokers Connection.  Is this possible ?
> In new producer, each producer maintains a connection to each broker
> within the producer instance. Making producer instances to share the TCP
> connections is a very big change to the current design, so I suppose we
> won’t be able to do that.
> >
> >
> >Thanks,
> >
> >Bhavesh
> >
> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin <j...@linkedin.com.invalid
> >
> >wrote:
> >
> >> Hi Bhavesh,
> >>
> >> I think it is the right discussion to have when we are talking about the
> >> new new design for MM.
> >> Please see the inline comments.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com>
> >>wrote:
> >>
> >> >Hi Jiangjie,
> >> >
> >> >I just wanted to let you know about our use case and stress the point
> >>that
> >> >local data center broker cluster have fewer partitions than the
> >> >destination
> >> >offline broker cluster. Just because we do the batch pull from CAMUS
> >>and
> >> >in
> >> >order to drain data faster than the injection rate (from four DCs for
> >>same
> >> >topic).
> >> Keeping the same partition number in source and target cluster will be
> >>an
> >> option but will not be enforced by default.
> >> >
> >> >We are facing following issues (probably due to configuration):
> >> >
> >> >1)      We occasionally loose data due to message batch size is too
> >>large
> >> >(2MB) on target data (we are using old producer but I think new
> >>producer
> >> >will solve this problem to some extend).
> >> We do see this issue in LinkedIn as well. New producer also might have
> >> this issue. There are some proposal of solutions, but no real work
> >>started
> >> yet. For now, as a workaround, setting a more aggressive batch size on
> >> producer side should work.
> >> >2)      Since only one instance is set to MM data,  we are not able to
> >> >set-up ack per topic instead ack is attached to producer instance.
> >> I don’t quite get the question here.
> >> >3)      How are you going to address two phase commit problem if ack is
> >> >set
> >> >to strongest, but auto commit is on for consumer (meaning producer does
> >> >not
> >> >get ack,  but consumer auto committed offset that message).  Is there
> >> >transactional (Kafka transaction is in process) based ack and commit
> >> >offset
> >> >?
> >> Auto offset commit should be turned off in this case. The offset will
> >>only
> >> be committed once by the offset commit thread. So there is no two phase
> >> commit.
> >> >4)      How are you planning to avoid duplicated message?  ( Is
> >> >brokergoing
> >> >have moving window of message collected and de-dupe ?)  Possibly, we
> >>get
> >> >this from retry set to 5…?
> >> We are not trying to completely avoid duplicates. The duplicates will
> >> still be there if:
> >> 1. Producer retries on failure.
> >> 2. Mirror maker is hard killed.
> >> Currently, dedup is expected to be done by user if necessary.
> >> >5)      Last, is there any warning or any thing you can provide insight
> >> >from MM component about data injection rate into destination
> >>partitions is
> >> >NOT evenly distributed regardless  of  keyed or non-keyed message
> >>(Hence
> >> >there is ripple effect such as data not arriving late, or data is
> >>arriving
> >> >out of order in  intern of time stamp  and early some time, and CAMUS
> >> >creates huge number of file count on HDFS due to uneven injection rate
> >>.
> >> >Camus Job is  configured to run every 3 minutes.)
> >> I think uneven data distribution is typically caused by server side
> >> unbalance, instead of something mirror maker could control. In new
> >>mirror
> >> maker, however, there is a customizable message handler, that might be
> >> able to help a little bit. In message handler, you can explicitly set a
> >> partition that you want to produce the message to. So if you know the
> >> uneven data distribution in target cluster, you may offset it here. But
> >> that probably only works for non-keyed messages.
> >> >
> >> >I am not sure if this is right discussion form to bring these to
> >> >your/kafka
> >> >Dev team attention.  This might be off track,
> >> >
> >> >
> >> >Thanks,
> >> >
> >> >Bhavesh
> >> >
> >> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin
> >><j...@linkedin.com.invalid
> >> >
> >> >wrote:
> >> >
> >> >> I’ve updated the KIP page. Feedbacks are welcome.
> >> >>
> >> >> Regarding the simple mirror maker design. I thought over it and have
> >> >>some
> >> >> worries:
> >> >> There are two things that might worth thinking:
> >> >> 1. One of the enhancement to mirror maker is adding a message
> >>handler to
> >> >> do things like reformatting. I think we might potentially want to
> >>have
> >> >> more threads processing the messages than the number of consumers.
> >>If we
> >> >> follow the simple mirror maker solution, we lose this flexibility.
> >> >> 2. This might not matter too much, but creating more consumers means
> >> >>more
> >> >> footprint of TCP connection / memory.
> >> >>
> >> >> Any thoughts on this?
> >> >>
> >> >> Thanks.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" <j...@linkedin.com> wrote:
> >> >>
> >> >> >Hi Jay and Neha,
> >> >> >
> >> >> >Thanks a lot for the reply and explanation. I do agree it makes more
> >> >>sense
> >> >> >to avoid duplicate effort and plan based on new consumer. I’ll
> >>modify
> >> >>the
> >> >> >KIP.
> >> >> >
> >> >> >To Jay’s question on message ordering - The data channel selection
> >> >>makes
> >> >> >sure that the messages from the same source partition will sent by
> >>the
> >> >> >same producer. So the order of the messages is guaranteed with
> >>proper
> >> >> >producer settings (MaxInFlightRequests=1,retries=Integer.MaxValue,
> >> >>etc.)
> >> >> >For keyed messages, because they come from the same source partition
> >> >>and
> >> >> >will end up in the same target partition, as long as they are sent
> >>by
> >> >>the
> >> >> >same producer, the order is guaranteed.
> >> >> >For non-keyed messages, the messages coming from the same source
> >> >>partition
> >> >> >might go to different target partitions. The order is only
> >>guaranteed
> >> >> >within each partition.
> >> >> >
> >> >> >Anyway, I’ll modify the KIP and data channel will be away.
> >> >> >
> >> >> >Thanks.
> >> >> >
> >> >> >Jiangjie (Becket) Qin
> >> >> >
> >> >> >
> >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" <n...@confluent.io> wrote:
> >> >> >
> >> >> >>I think there is some value in investigating if we can go back to
> >>the
> >> >> >>simple mirror maker design, as Jay points out. Here you have N
> >> >>threads,
> >> >> >>each has a consumer and a producer.
> >> >> >>
> >> >> >>The reason why we had to move away from that was a combination of
> >>the
> >> >> >>difference in throughput between the consumer and the old producer
> >>and
> >> >> >>the
> >> >> >>deficiency of the consumer rebalancing that limits the total
> >>number of
> >> >> >>mirror maker threads. So the only option available was to increase
> >>the
> >> >> >>throughput of the limited # of mirror maker threads that could be
> >> >> >>deployed.
> >> >> >>Now that queuing design may not make sense, if the new producer's
> >> >> >>throughput is almost similar to the consumer AND the fact that the
> >>new
> >> >> >>round-robin based consumer rebalancing can allow a very high
> >>number of
> >> >> >>mirror maker instances to exist.
> >> >> >>
> >> >> >>This is the end state that the mirror maker should be in once the
> >>new
> >> >> >>consumer is complete, so it wouldn't hurt to see if we can just
> >>move
> >> >>to
> >> >> >>that right now.
> >> >> >>
> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps <jay.kr...@gmail.com>
> >> >>wrote:
> >> >> >>
> >> >> >>> QQ: If we ever use a different technique for the data channel
> >> >>selection
> >> >> >>> than for the producer partitioning won't that break ordering? How
> >> >>can
> >> >> >>>we
> >> >> >>> ensure these things stay in sync?
> >> >> >>>
> >> >> >>> With respect to the new consumer--I really do want to encourage
> >> >>people
> >> >> >>>to
> >> >> >>> think through how MM will work with the new consumer. I mean this
> >> >>isn't
> >> >> >>> very far off, maybe a few months if we hustle? I could imagine us
> >> >> >>>getting
> >> >> >>> this mm fix done maybe sooner, maybe in a month? So I guess this
> >> >>buys
> >> >> >>>us an
> >> >> >>> extra month before we rip it out and throw it away? Maybe two?
> >>This
> >> >>bug
> >> >> >>>has
> >> >> >>> been there for a while, though, right? Is it worth it? Probably
> >>it
> >> >>is,
> >> >> >>>but
> >> >> >>> it still kind of sucks to have the duplicate effort.
> >> >> >>>
> >> >> >>> So anyhow let's definitely think about how things will work with
> >>the
> >> >> >>>new
> >> >> >>> consumer. I think we can probably just have N threads, each
> >>thread
> >> >>has
> >> >> >>>a
> >> >> >>> producer and consumer and is internally single threaded. Any
> >>reason
> >> >> >>>this
> >> >> >>> wouldn't work?
> >> >> >>>
> >> >> >>> -Jay
> >> >> >>>
> >> >> >>>
> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin
> >> >> >>><j...@linkedin.com.invalid>
> >> >> >>> wrote:
> >> >> >>>
> >> >> >>> > Hi Jay,
> >> >> >>> >
> >> >> >>> > Thanks for comments. Please see inline responses.
> >> >> >>> >
> >> >> >>> > Jiangjie (Becket) Qin
> >> >> >>> >
> >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
> >> >> >>> >
> >> >> >>> > >Hey guys,
> >> >> >>> > >
> >> >> >>> > >A couple questions/comments:
> >> >> >>> > >
> >> >> >>> > >1. The callback and user-controlled commit offset
> >>functionality
> >> >>is
> >> >> >>> already
> >> >> >>> > >in the new consumer which we are working on in parallel. If we
> >> >> >>> accelerated
> >> >> >>> > >that work it might help concentrate efforts. I admit this
> >>might
> >> >>take
> >> >> >>> > >slightly longer in calendar time but could still probably get
> >> >>done
> >> >> >>>this
> >> >> >>> > >quarter. Have you guys considered that approach?
> >> >> >>> > Yes, I totally agree that ideally we should put efforts on new
> >> >> >>>consumer.
> >> >> >>> > The main reason for still working on the old consumer is that
> >>we
> >> >> >>>expect
> >> >> >>> it
> >> >> >>> > would still be used in LinkedIn for quite a while before the
> >>new
> >> >> >>>consumer
> >> >> >>> > could be fully rolled out. And we recently suffering a lot from
> >> >> >>>mirror
> >> >> >>> > maker data loss issue. So our current plan is making necessary
> >> >> >>>changes to
> >> >> >>> > make current mirror maker stable in production. Then we can
> >>test
> >> >>and
> >> >> >>> > rollout new consumer gradually without getting burnt.
> >> >> >>> > >
> >> >> >>> > >2. I think partitioning on the hash of the topic partition is
> >> >>not a
> >> >> >>>very
> >> >> >>> > >good idea because that will make the case of going from a
> >>cluster
> >> >> >>>with
> >> >> >>> > >fewer partitions to one with more partitions not work. I
> >>think an
> >> >> >>> > >intuitive
> >> >> >>> > >way to do this would be the following:
> >> >> >>> > >a. Default behavior: Just do what the producer does. I.e. if
> >>you
> >> >> >>> specify a
> >> >> >>> > >key use it for partitioning, if not just partition in a
> >> >>round-robin
> >> >> >>> > >fashion.
> >> >> >>> > >b. Add a --preserve-partition option that will explicitly
> >> >>inherent
> >> >> >>>the
> >> >> >>> > >partition from the source irrespective of whether there is a
> >>key
> >> >>or
> >> >> >>> which
> >> >> >>> > >partition that key would hash to.
> >> >> >>> > Sorry that I did not explain this clear enough. The hash of
> >>topic
> >> >> >>> > partition is only used when decide which mirror maker data
> >>channel
> >> >> >>>queue
> >> >> >>> > the consumer thread should put message into. It only tries to
> >>make
> >> >> >>>sure
> >> >> >>> > the messages from the same partition is sent by the same
> >>producer
> >> >> >>>thread
> >> >> >>> > to guarantee the sending order. This is not at all related to
> >> >>which
> >> >> >>> > partition in target cluster the messages end up. That is still
> >> >> >>>decided by
> >> >> >>> > producer.
> >> >> >>> > >
> >> >> >>> > >3. You don't actually give the ConsumerRebalanceListener
> >> >>interface.
> >> >> >>>What
> >> >> >>> > >is
> >> >> >>> > >that going to look like?
> >> >> >>> > Good point! I should have put it in the wiki. I just added it.
> >> >> >>> > >
> >> >> >>> > >4. What is MirrorMakerRecord? I think ideally the
> >> >> >>> > >MirrorMakerMessageHandler
> >> >> >>> > >interface would take a ConsumerRecord as input and return a
> >> >> >>> > >ProducerRecord,
> >> >> >>> > >right? That would allow you to transform the key, value,
> >> >>partition,
> >> >> >>>or
> >> >> >>> > >destination topic...
> >> >> >>> > MirrorMakerRecord is introduced in KAFKA-1650, which is exactly
> >> >>the
> >> >> >>>same
> >> >> >>> > as ConsumerRecord in KAFKA-1760.
> >> >> >>> > private[kafka] class MirrorMakerRecord (val sourceTopic:
> >>String,
> >> >> >>> >   val sourcePartition: Int,
> >> >> >>> >   val sourceOffset: Long,
> >> >> >>> >   val key: Array[Byte],
> >> >> >>> >   val value: Array[Byte]) {
> >> >> >>> >   def size = value.length + {if (key == null) 0 else
> >>key.length}
> >> >> >>> > }
> >> >> >>> >
> >> >> >>> > However, because source partition and offset is needed in
> >>producer
> >> >> >>>thread
> >> >> >>> > for consumer offsets bookkeeping, the record returned by
> >> >> >>> > MirrorMakerMessageHandler needs to contain those information.
> >> >> >>>Therefore
> >> >> >>> > ProducerRecord does not work here. We could probably let
> >>message
> >> >> >>>handler
> >> >> >>> > take ConsumerRecord for both input and output.
> >> >> >>> > >
> >> >> >>> > >5. Have you guys thought about what the implementation will
> >>look
> >> >> >>>like in
> >> >> >>> > >terms of threading architecture etc with the new consumer?
> >>That
> >> >>will
> >> >> >>>be
> >> >> >>> > >soon so even if we aren't starting with that let's make sure
> >>we
> >> >>can
> >> >> >>>get
> >> >> >>> > >rid
> >> >> >>> > >of a lot of the current mirror maker accidental complexity in
> >> >>terms
> >> >> >>>of
> >> >> >>> > >threads and queues when we move to that.
> >> >> >>> > I haven¹t thought about it throughly. The quick idea is after
> >> >> >>>migration
> >> >> >>> to
> >> >> >>> > the new consumer, it is probably better to use a single
> >>consumer
> >> >> >>>thread.
> >> >> >>> > If multithread is needed, decoupling consumption and processing
> >> >>might
> >> >> >>>be
> >> >> >>> > used. MirrorMaker definitely needs to be changed after new
> >> >>consumer
> >> >> >>>get
> >> >> >>> > checked in. I¹ll document the changes and can submit follow up
> >> >> >>>patches
> >> >> >>> > after the new consumer is available.
> >> >> >>> > >
> >> >> >>> > >-Jay
> >> >> >>> > >
> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin
> >> >> >>><j...@linkedin.com.invalid
> >> >> >>> >
> >> >> >>> > >wrote:
> >> >> >>> > >
> >> >> >>> > >> Hi Kafka Devs,
> >> >> >>> > >>
> >> >> >>> > >> We are working on Kafka Mirror Maker enhancement. A KIP is
> >> >>posted
> >> >> >>>to
> >> >> >>> > >> document and discuss on the followings:
> >> >> >>> > >> 1. KAFKA-1650: No Data loss mirror maker change
> >> >> >>> > >> 2. KAFKA-1839: To allow partition aware mirror.
> >> >> >>> > >> 3. KAFKA-1840: To allow message filtering/format conversion
> >> >> >>> > >> Feedbacks are welcome. Please let us know if you have any
> >> >> >>>questions or
> >> >> >>> > >> concerns.
> >> >> >>> > >>
> >> >> >>> > >> Thanks.
> >> >> >>> > >>
> >> >> >>> > >> Jiangjie (Becket) Qin
> >> >> >>> > >>
> >> >> >>> >
> >> >> >>> >
> >> >> >>>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>--
> >> >> >>Thanks,
> >> >> >>Neha
> >> >> >
> >> >>
> >> >>
> >>
> >>
>
>

Reply via email to