Hi Jiangjie, Thanks for entertaining my question so far. Last question, I have is about serialization of message key. If the key de-serialization (Class) is not present at the MM instance, then does it use raw byte hashcode to determine the partition ? How are you going to address the situation where key needs to be de-serialization and get actual hashcode needs to be computed ?.
Thanks, Bhavesh On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hi Bhavesh, > > Please see inline comments. > > Jiangjie (Becket) Qin > > On 1/29/15, 7:00 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> wrote: > > >Hi Jiangjie, > > > >Thanks for the input. > > > >a) Is MM will producer ack will be attach to Producer Instance or per > >topic. Use case is that one instance of MM > >needs to handle both strong ack and also ack=0 for some topic. Or it > >would > >be better to set-up another instance of MM. > The acks setting is producer level setting instead of topic level setting. > In this case you probably need to set up another instance. > > > >b) Regarding TCP connections, Why does #producer instance attach to TCP > >connection. Is it possible to use Broker Connection TCP Pool, producer > >will just checkout TCP connection to Broker. So, # of Producer Instance > >does not correlation to Brokers Connection. Is this possible ? > In new producer, each producer maintains a connection to each broker > within the producer instance. Making producer instances to share the TCP > connections is a very big change to the current design, so I suppose we > won’t be able to do that. > > > > > >Thanks, > > > >Bhavesh > > > >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin <j...@linkedin.com.invalid > > > >wrote: > > > >> Hi Bhavesh, > >> > >> I think it is the right discussion to have when we are talking about the > >> new new design for MM. > >> Please see the inline comments. > >> > >> Jiangjie (Becket) Qin > >> > >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> > >>wrote: > >> > >> >Hi Jiangjie, > >> > > >> >I just wanted to let you know about our use case and stress the point > >>that > >> >local data center broker cluster have fewer partitions than the > >> >destination > >> >offline broker cluster. Just because we do the batch pull from CAMUS > >>and > >> >in > >> >order to drain data faster than the injection rate (from four DCs for > >>same > >> >topic). > >> Keeping the same partition number in source and target cluster will be > >>an > >> option but will not be enforced by default. > >> > > >> >We are facing following issues (probably due to configuration): > >> > > >> >1) We occasionally loose data due to message batch size is too > >>large > >> >(2MB) on target data (we are using old producer but I think new > >>producer > >> >will solve this problem to some extend). > >> We do see this issue in LinkedIn as well. New producer also might have > >> this issue. There are some proposal of solutions, but no real work > >>started > >> yet. For now, as a workaround, setting a more aggressive batch size on > >> producer side should work. > >> >2) Since only one instance is set to MM data, we are not able to > >> >set-up ack per topic instead ack is attached to producer instance. > >> I don’t quite get the question here. > >> >3) How are you going to address two phase commit problem if ack is > >> >set > >> >to strongest, but auto commit is on for consumer (meaning producer does > >> >not > >> >get ack, but consumer auto committed offset that message). Is there > >> >transactional (Kafka transaction is in process) based ack and commit > >> >offset > >> >? > >> Auto offset commit should be turned off in this case. The offset will > >>only > >> be committed once by the offset commit thread. So there is no two phase > >> commit. > >> >4) How are you planning to avoid duplicated message? ( Is > >> >brokergoing > >> >have moving window of message collected and de-dupe ?) Possibly, we > >>get > >> >this from retry set to 5…? > >> We are not trying to completely avoid duplicates. The duplicates will > >> still be there if: > >> 1. Producer retries on failure. > >> 2. Mirror maker is hard killed. > >> Currently, dedup is expected to be done by user if necessary. > >> >5) Last, is there any warning or any thing you can provide insight > >> >from MM component about data injection rate into destination > >>partitions is > >> >NOT evenly distributed regardless of keyed or non-keyed message > >>(Hence > >> >there is ripple effect such as data not arriving late, or data is > >>arriving > >> >out of order in intern of time stamp and early some time, and CAMUS > >> >creates huge number of file count on HDFS due to uneven injection rate > >>. > >> >Camus Job is configured to run every 3 minutes.) > >> I think uneven data distribution is typically caused by server side > >> unbalance, instead of something mirror maker could control. In new > >>mirror > >> maker, however, there is a customizable message handler, that might be > >> able to help a little bit. In message handler, you can explicitly set a > >> partition that you want to produce the message to. So if you know the > >> uneven data distribution in target cluster, you may offset it here. But > >> that probably only works for non-keyed messages. > >> > > >> >I am not sure if this is right discussion form to bring these to > >> >your/kafka > >> >Dev team attention. This might be off track, > >> > > >> > > >> >Thanks, > >> > > >> >Bhavesh > >> > > >> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin > >><j...@linkedin.com.invalid > >> > > >> >wrote: > >> > > >> >> I’ve updated the KIP page. Feedbacks are welcome. > >> >> > >> >> Regarding the simple mirror maker design. I thought over it and have > >> >>some > >> >> worries: > >> >> There are two things that might worth thinking: > >> >> 1. One of the enhancement to mirror maker is adding a message > >>handler to > >> >> do things like reformatting. I think we might potentially want to > >>have > >> >> more threads processing the messages than the number of consumers. > >>If we > >> >> follow the simple mirror maker solution, we lose this flexibility. > >> >> 2. This might not matter too much, but creating more consumers means > >> >>more > >> >> footprint of TCP connection / memory. > >> >> > >> >> Any thoughts on this? > >> >> > >> >> Thanks. > >> >> > >> >> Jiangjie (Becket) Qin > >> >> > >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" <j...@linkedin.com> wrote: > >> >> > >> >> >Hi Jay and Neha, > >> >> > > >> >> >Thanks a lot for the reply and explanation. I do agree it makes more > >> >>sense > >> >> >to avoid duplicate effort and plan based on new consumer. I’ll > >>modify > >> >>the > >> >> >KIP. > >> >> > > >> >> >To Jay’s question on message ordering - The data channel selection > >> >>makes > >> >> >sure that the messages from the same source partition will sent by > >>the > >> >> >same producer. So the order of the messages is guaranteed with > >>proper > >> >> >producer settings (MaxInFlightRequests=1,retries=Integer.MaxValue, > >> >>etc.) > >> >> >For keyed messages, because they come from the same source partition > >> >>and > >> >> >will end up in the same target partition, as long as they are sent > >>by > >> >>the > >> >> >same producer, the order is guaranteed. > >> >> >For non-keyed messages, the messages coming from the same source > >> >>partition > >> >> >might go to different target partitions. The order is only > >>guaranteed > >> >> >within each partition. > >> >> > > >> >> >Anyway, I’ll modify the KIP and data channel will be away. > >> >> > > >> >> >Thanks. > >> >> > > >> >> >Jiangjie (Becket) Qin > >> >> > > >> >> > > >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" <n...@confluent.io> wrote: > >> >> > > >> >> >>I think there is some value in investigating if we can go back to > >>the > >> >> >>simple mirror maker design, as Jay points out. Here you have N > >> >>threads, > >> >> >>each has a consumer and a producer. > >> >> >> > >> >> >>The reason why we had to move away from that was a combination of > >>the > >> >> >>difference in throughput between the consumer and the old producer > >>and > >> >> >>the > >> >> >>deficiency of the consumer rebalancing that limits the total > >>number of > >> >> >>mirror maker threads. So the only option available was to increase > >>the > >> >> >>throughput of the limited # of mirror maker threads that could be > >> >> >>deployed. > >> >> >>Now that queuing design may not make sense, if the new producer's > >> >> >>throughput is almost similar to the consumer AND the fact that the > >>new > >> >> >>round-robin based consumer rebalancing can allow a very high > >>number of > >> >> >>mirror maker instances to exist. > >> >> >> > >> >> >>This is the end state that the mirror maker should be in once the > >>new > >> >> >>consumer is complete, so it wouldn't hurt to see if we can just > >>move > >> >>to > >> >> >>that right now. > >> >> >> > >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps <jay.kr...@gmail.com> > >> >>wrote: > >> >> >> > >> >> >>> QQ: If we ever use a different technique for the data channel > >> >>selection > >> >> >>> than for the producer partitioning won't that break ordering? How > >> >>can > >> >> >>>we > >> >> >>> ensure these things stay in sync? > >> >> >>> > >> >> >>> With respect to the new consumer--I really do want to encourage > >> >>people > >> >> >>>to > >> >> >>> think through how MM will work with the new consumer. I mean this > >> >>isn't > >> >> >>> very far off, maybe a few months if we hustle? I could imagine us > >> >> >>>getting > >> >> >>> this mm fix done maybe sooner, maybe in a month? So I guess this > >> >>buys > >> >> >>>us an > >> >> >>> extra month before we rip it out and throw it away? Maybe two? > >>This > >> >>bug > >> >> >>>has > >> >> >>> been there for a while, though, right? Is it worth it? Probably > >>it > >> >>is, > >> >> >>>but > >> >> >>> it still kind of sucks to have the duplicate effort. > >> >> >>> > >> >> >>> So anyhow let's definitely think about how things will work with > >>the > >> >> >>>new > >> >> >>> consumer. I think we can probably just have N threads, each > >>thread > >> >>has > >> >> >>>a > >> >> >>> producer and consumer and is internally single threaded. Any > >>reason > >> >> >>>this > >> >> >>> wouldn't work? > >> >> >>> > >> >> >>> -Jay > >> >> >>> > >> >> >>> > >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin > >> >> >>><j...@linkedin.com.invalid> > >> >> >>> wrote: > >> >> >>> > >> >> >>> > Hi Jay, > >> >> >>> > > >> >> >>> > Thanks for comments. Please see inline responses. > >> >> >>> > > >> >> >>> > Jiangjie (Becket) Qin > >> >> >>> > > >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: > >> >> >>> > > >> >> >>> > >Hey guys, > >> >> >>> > > > >> >> >>> > >A couple questions/comments: > >> >> >>> > > > >> >> >>> > >1. The callback and user-controlled commit offset > >>functionality > >> >>is > >> >> >>> already > >> >> >>> > >in the new consumer which we are working on in parallel. If we > >> >> >>> accelerated > >> >> >>> > >that work it might help concentrate efforts. I admit this > >>might > >> >>take > >> >> >>> > >slightly longer in calendar time but could still probably get > >> >>done > >> >> >>>this > >> >> >>> > >quarter. Have you guys considered that approach? > >> >> >>> > Yes, I totally agree that ideally we should put efforts on new > >> >> >>>consumer. > >> >> >>> > The main reason for still working on the old consumer is that > >>we > >> >> >>>expect > >> >> >>> it > >> >> >>> > would still be used in LinkedIn for quite a while before the > >>new > >> >> >>>consumer > >> >> >>> > could be fully rolled out. And we recently suffering a lot from > >> >> >>>mirror > >> >> >>> > maker data loss issue. So our current plan is making necessary > >> >> >>>changes to > >> >> >>> > make current mirror maker stable in production. Then we can > >>test > >> >>and > >> >> >>> > rollout new consumer gradually without getting burnt. > >> >> >>> > > > >> >> >>> > >2. I think partitioning on the hash of the topic partition is > >> >>not a > >> >> >>>very > >> >> >>> > >good idea because that will make the case of going from a > >>cluster > >> >> >>>with > >> >> >>> > >fewer partitions to one with more partitions not work. I > >>think an > >> >> >>> > >intuitive > >> >> >>> > >way to do this would be the following: > >> >> >>> > >a. Default behavior: Just do what the producer does. I.e. if > >>you > >> >> >>> specify a > >> >> >>> > >key use it for partitioning, if not just partition in a > >> >>round-robin > >> >> >>> > >fashion. > >> >> >>> > >b. Add a --preserve-partition option that will explicitly > >> >>inherent > >> >> >>>the > >> >> >>> > >partition from the source irrespective of whether there is a > >>key > >> >>or > >> >> >>> which > >> >> >>> > >partition that key would hash to. > >> >> >>> > Sorry that I did not explain this clear enough. The hash of > >>topic > >> >> >>> > partition is only used when decide which mirror maker data > >>channel > >> >> >>>queue > >> >> >>> > the consumer thread should put message into. It only tries to > >>make > >> >> >>>sure > >> >> >>> > the messages from the same partition is sent by the same > >>producer > >> >> >>>thread > >> >> >>> > to guarantee the sending order. This is not at all related to > >> >>which > >> >> >>> > partition in target cluster the messages end up. That is still > >> >> >>>decided by > >> >> >>> > producer. > >> >> >>> > > > >> >> >>> > >3. You don't actually give the ConsumerRebalanceListener > >> >>interface. > >> >> >>>What > >> >> >>> > >is > >> >> >>> > >that going to look like? > >> >> >>> > Good point! I should have put it in the wiki. I just added it. > >> >> >>> > > > >> >> >>> > >4. What is MirrorMakerRecord? I think ideally the > >> >> >>> > >MirrorMakerMessageHandler > >> >> >>> > >interface would take a ConsumerRecord as input and return a > >> >> >>> > >ProducerRecord, > >> >> >>> > >right? That would allow you to transform the key, value, > >> >>partition, > >> >> >>>or > >> >> >>> > >destination topic... > >> >> >>> > MirrorMakerRecord is introduced in KAFKA-1650, which is exactly > >> >>the > >> >> >>>same > >> >> >>> > as ConsumerRecord in KAFKA-1760. > >> >> >>> > private[kafka] class MirrorMakerRecord (val sourceTopic: > >>String, > >> >> >>> > val sourcePartition: Int, > >> >> >>> > val sourceOffset: Long, > >> >> >>> > val key: Array[Byte], > >> >> >>> > val value: Array[Byte]) { > >> >> >>> > def size = value.length + {if (key == null) 0 else > >>key.length} > >> >> >>> > } > >> >> >>> > > >> >> >>> > However, because source partition and offset is needed in > >>producer > >> >> >>>thread > >> >> >>> > for consumer offsets bookkeeping, the record returned by > >> >> >>> > MirrorMakerMessageHandler needs to contain those information. > >> >> >>>Therefore > >> >> >>> > ProducerRecord does not work here. We could probably let > >>message > >> >> >>>handler > >> >> >>> > take ConsumerRecord for both input and output. > >> >> >>> > > > >> >> >>> > >5. Have you guys thought about what the implementation will > >>look > >> >> >>>like in > >> >> >>> > >terms of threading architecture etc with the new consumer? > >>That > >> >>will > >> >> >>>be > >> >> >>> > >soon so even if we aren't starting with that let's make sure > >>we > >> >>can > >> >> >>>get > >> >> >>> > >rid > >> >> >>> > >of a lot of the current mirror maker accidental complexity in > >> >>terms > >> >> >>>of > >> >> >>> > >threads and queues when we move to that. > >> >> >>> > I haven¹t thought about it throughly. The quick idea is after > >> >> >>>migration > >> >> >>> to > >> >> >>> > the new consumer, it is probably better to use a single > >>consumer > >> >> >>>thread. > >> >> >>> > If multithread is needed, decoupling consumption and processing > >> >>might > >> >> >>>be > >> >> >>> > used. MirrorMaker definitely needs to be changed after new > >> >>consumer > >> >> >>>get > >> >> >>> > checked in. I¹ll document the changes and can submit follow up > >> >> >>>patches > >> >> >>> > after the new consumer is available. > >> >> >>> > > > >> >> >>> > >-Jay > >> >> >>> > > > >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin > >> >> >>><j...@linkedin.com.invalid > >> >> >>> > > >> >> >>> > >wrote: > >> >> >>> > > > >> >> >>> > >> Hi Kafka Devs, > >> >> >>> > >> > >> >> >>> > >> We are working on Kafka Mirror Maker enhancement. A KIP is > >> >>posted > >> >> >>>to > >> >> >>> > >> document and discuss on the followings: > >> >> >>> > >> 1. KAFKA-1650: No Data loss mirror maker change > >> >> >>> > >> 2. KAFKA-1839: To allow partition aware mirror. > >> >> >>> > >> 3. KAFKA-1840: To allow message filtering/format conversion > >> >> >>> > >> Feedbacks are welcome. Please let us know if you have any > >> >> >>>questions or > >> >> >>> > >> concerns. > >> >> >>> > >> > >> >> >>> > >> Thanks. > >> >> >>> > >> > >> >> >>> > >> Jiangjie (Becket) Qin > >> >> >>> > >> > >> >> >>> > > >> >> >>> > > >> >> >>> > >> >> >> > >> >> >> > >> >> >> > >> >> >>-- > >> >> >>Thanks, > >> >> >>Neha > >> >> > > >> >> > >> >> > >> > >> > >