----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31706/#review75071 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/31706/#comment122005> We will try to avoid wildcard imports as much as possible; one thing you can do is in your IDE set the auto-wildcard threshold to sth. like 99. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/31706/#comment122006> Ditto here. Also it seems you do not have any actual changes other than imports? core/src/main/scala/kafka/consumer/PartitionAssignor.scala <https://reviews.apache.org/r/31706/#comment122007> Change the comments in @return? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/31706/#comment122023> This is not a comment but just a thought: in the new consumer where we also have these two callbacks which do not have the global assignment information, we could possibly use metadata request to get the required information. core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java <https://reviews.apache.org/r/31706/#comment122018> Add comment that it is a map of (topic -> map of (partitionId -> consumerThreadId)) since it is different with the assignor's output, which is the map of (consumerId -> map of (topic_partition -> consumerThreadId)). core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment122019> Are we sure that one producer io thread is sufficient for all cases? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment122026> It seems the flush / commit check is outside the inner while loop, and hence will only be triggered when iter.hasNext() returns false? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment122035> Instead of a wrapper rebalancer, I think it is cleaner to just instantiate the ConsumerRebalanceListener interface and pass-in the producer / mirrormaker object for flush() and commitOffsets(). - Guozhang Wang On March 3, 2015, 10:25 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31706/ > ----------------------------------------------------------- > > (Updated March 3, 2015, 10:25 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1997 > https://issues.apache.org/jira/browse/KAFKA-1997 > > > Repository: kafka > > > Description > ------- > > Patch for KAFKA-1997: refactor mirror maker based on producer.flush() > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f > core/src/main/scala/kafka/consumer/PartitionAssignor.scala > e6ff7683a0df4a7d221e949767e57c34703d5aad > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > 5487259751ebe19f137948249aa1fd2637d2deb4 > core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java > 7f45a90ba6676290172b7da54c15ee5dc1a42a2e > core/src/main/scala/kafka/tools/MirrorMaker.scala > 5374280dc97dc8e01e9b3ba61fd036dc13ae48cb > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala > 543070f4fd3e96f3183cae9ee2ccbe843409ee58 > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > a17e8532c44aadf84b8da3a57bcc797a848b5020 > > Diff: https://reviews.apache.org/r/31706/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >