-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
-----------------------------------------------------------


Thanks for the patch. Some comments below.


core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/25995/#comment106601>

    Should this api be in ConsumerConnector?



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/25995/#comment106604>

    Should this be part of the public api in ConsumerConnector?
    
    Also, could we restruct the api a bit better? It seems that the public api 
should just be commitOffsets(offsetMap) since we know this is manual offset 
commit. We can let the other api commitOffsets(isAutoCommit) gets the internal 
offsets and pass them to commitOffsets(offsetMap).



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment106606>

    If we expect consumerConfig to be a singleton, we should just use 
options.valueOf(), instead of valuesOf().head.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment106605>

    This is really weird. We get a customized listener from the consumer 
config, but expects the listener to implement a special class 
MirrorMakerConsumerRelabanceListener, instead of the standard 
ConsumerRebalanceListener. It's probably better to get this from a MirrorMaker 
input param that defaults to MirrorMakerConsumerRelabanceListener. We can then 
add a description if custoimzation is needed, which class the customized 
implmenetation needs to extend from.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment106623>

    Does that imply there is always an offset? Is that always true?
    
    I don't quite follow how the logic works. Since the offset for each target 
partition is updated independantly, I am not sure why you can rely on checking 
that those offsets are consecutive. Also, does this logic work when there is 
partitioning key?
    
    It would be useful to add some comments to describe why a two-level offset 
map is needed.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment106622>

    It's weird to extend from NewShinyProducer but not using its method. 
Perhaps it will be clearer if we just let MirrorMakerNewProducer implement 
MirrorMakerBaseProducer.
    
    Ditto for MirrorMakerOldProducer.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment106621>

    This kind of coordination doesn't quite work. Suppose that we set 
inRebalance to true and entered the while loop. However, just before we call 
inRelanace.wait(), the producer could have finished sending all data and called 
inRebalance.notify(). Then we will be stuck in inRebalance.wait() forever since 
we missed the notification.
    
    One way to do that is to create a lock that protects the read/write of 
numMessageUnacked. Then we use a condition created from the lock to do the 
coordination. This way, both the wait/notification and the update/check of 
numMessageUnacked are protected by the same lock.


- Jun Rao


On Dec. 4, 2014, 7:59 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 4, 2014, 7:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to