> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 394
> > <https://reviews.apache.org/r/25995/diff/18/?file=797121#file797121line394>
> >
> >     if isAutoCommit is true then we will not retry anyway. I think this 
> > condition can be removed altogether. i.e., if we are shutting down, then we 
> > should probably allow committing offsets up to retryCount. I don't 
> > recollect why this was written this way, but I think retrying up to the 
> > configured retry count is reasonable on shutdown. Do you agree?

Agree, so we set max retries based on isAutoCommit value and only use retry 
count to control how many times we actually retry. It's clearer.


> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 85
> > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line85>
> >
> >     NumUnackedOffsets is a "weird" metric especially with the presence of 
> > NumUnackedMessages. Can you think of a better name? My attempt would be 
> > NumPendingSourceOffsets - but I don't like that either.

How about UnackedOffsetListsSize?


> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 90
> > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line90>
> >
> >     Add comment on what this metric is. Actually do we need this since this 
> > will be covered by the producer's dropped metric? As above, this is also a 
> > weird mbean to see. Not sure if we can come up with a better name.

I added comments to this metric. I'm kind of relactant from using producer's 
dropped metrics. The reason is that the intention of this metric is diffrent 
from dropped messages, although they have some connection. Also, we could 
potentially have many producers, it would be better if we have a single number 
instead of having to go through multiple metrics. What do you think?


> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 198
> > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line198>
> >
> >     Can you move this to the if (useNewProducer) block further down (line 
> > 230)

This block is here because the consumer property has to be set before the 
consumer instantiation. And offset commit thread could only starts after 
consumer connector is instantiated. It also seems better for offset commit 
thread starts after producer thread starts. That's why we have those spreaded 
if statement...


> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 295
> > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line295>
> >
> >     I think the final commitOffsets (on shutdown) should be a reliable 
> > commit - i.e., retry up to configured retries if there are commit errors.

Yes, it will, isAutoCommit is set to false. This commitOffsets is defined in 
mirror maker wraps the ZooKeeperConsumerConnector.commitOffsets, where 
isAutoCommit == false. Or do you mean something else?


> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 449
> > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line449>
> >
> >     If block on buffer exhaustion is not turned on, do we still want to 
> > shut down the mirror maker? i.e., if the user really wants zero data loss 
> > they would set that to true right?
> >     
> >     If it is set to false and the MM exits what purpose does it serve?

I kind of think that we should let user know that there is something wrong in 
the producer once it occurs. For users not care about zero data loss, they 
probably still want to at least have a working mirror maker. If we just drop 
the message and let producer move on, potentially we can have a running mirror 
maker that only drops messages. In that case, it's probably better to let the 
mirror maker die to indicate something wrong happened. So I'm thinking exits on 
BufferExhaustedException is more from normal operating point of view instead of 
zero data loss point of view.


> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 560
> > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line560>
> >
> >     The commit "thread" has not exited. i.e., this should just be "Shutting 
> > down mirror maker due to error when committing offsets." I think OOME is 
> > the only exception we should really shutdown right? i.e., we should 
> > probably let everything else go right?

Agreed.


> On Dec. 22, 2014, 1:34 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 720
> > <https://reviews.apache.org/r/25995/diff/18/?file=797123#file797123line720>
> >
> >     What does "node validation skipped" mean?

I meant something like whether the node is really in the list... But it 
probably doesn't matter in this case. I'll just remove it.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65759
-----------------------------------------------------------


On Dec. 23, 2014, 3:07 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 23, 2014, 3:07 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Incorporated Joel's comments
> 
> 
> Incorporated Joel's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Joel's comments
> 
> 
> Fix a bug in metric.
> 
> 
> Missed some change in the prvevious patch submission, submit patch again.
> 
> 
> change offset commit thread to use scheduler.
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 53cb16c2949e0ac36a0e943564fc9fc9b4c84caa 
>   core/src/main/scala/kafka/utils/DoublyLinkedList.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/DoublyLinkedListTest.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to