----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/14730/#review27195 -----------------------------------------------------------
Thanks for the patch. Using ._1 and ._2 tend to be confusing since it's not obvious what the referenced fields are. So, unless the context is very clear, I recommend that we use named fields. Some detailed comments below. core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala <https://reviews.apache.org/r/14730/#comment52893> For better clarity, we can do map{ case(topicAndPartition, broker) => } core/src/main/scala/kafka/log/LogManager.scala <https://reviews.apache.org/r/14730/#comment52892> For better clarity, can write as for ( (topicAndPartition, truncateOffset) <- partitionAndOffsets) core/src/main/scala/kafka/server/AbstractFetcherManager.scala <https://reviews.apache.org/r/14730/#comment52890> tp._2._1 is going to be confusing since it's not obvious what field this refers to. It's probably worthwhile to create a new case class BrokerAndInitialOffset. Then we can do for ( (topicAndPartition, brokerAndInitialOffset) <- partitionAndOffsets) core/src/main/scala/kafka/server/AbstractFetcherManager.scala <https://reviews.apache.org/r/14730/#comment52891> We need to think through this change. What if a broker is restarted quickly with a new host/port, but same broker id? Can we guarantee that a new fetcher thread with the new host/port will be added if we just keep broker id in the fetcherThreadMap? core/src/main/scala/kafka/server/AbstractFetcherThread.scala <https://reviews.apache.org/r/14730/#comment52888> It may not be obvious what ._1 and ._2 are. We can write the code like the following: for ( (topicAndPartition, offset) <- partitionAndOffsets) Then we can refer to the named fields. core/src/main/scala/kafka/server/AbstractFetcherThread.scala <https://reviews.apache.org/r/14730/#comment52889> Not sure if doing this outside of the lock is safe since immediately after the check, new partitions can be added. Removing all partitions inside a lock should still be cheap. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment52894> We probably should synchronize on the processing of stop replica requests as well. So, we need a more general name for the lock. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment52895> can write as map{ case(topic, partition) => } core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment52896> Ditto as above. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment52897> Ditto as above. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment52898> can write as foreach{ case(topicAndPartition, partitionStateInfo) => } core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment52899> Not sure that I understand the comment. Why would HW be overwritten to 0? Also, could we explain the importance of the ordering of the steps, i.e., removing fetchers, truncate log(and checkpoint flushing offset) and adding fetchers? - Jun Rao On Oct. 18, 2013, 2:15 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/14730/ > ----------------------------------------------------------- > > (Updated Oct. 18, 2013, 2:15 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1001 > https://issues.apache.org/jira/browse/KAFKA-1001 > > > Repository: kafka > > > Description > ------- > > KAFKA-1001.v1.6 > > > KAFKA-1001.v1.5 > > > KAFKA-1001.v1 > > > Diffs > ----- > > core/src/main/scala/kafka/cluster/Partition.scala > 5ccecd179d33abfc14dcefc35dd68de7474c6978 > core/src/main/scala/kafka/common/ErrorMapping.scala > 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4 > core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala > 566ca46d113ee7da4b38ee57302ba183b59ab5d6 > core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala > dda0a8f041f242bf8a501a8cbd2b9c0258323f96 > core/src/main/scala/kafka/log/LogManager.scala > 47197153c5d3797d2e2a2f9539d9cd55501468e3 > core/src/main/scala/kafka/server/AbstractFetcherManager.scala > 15b7bd31446ffb97b8ed0fa6461649a01d81c7e9 > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > c64260f12bdd6b6c964875e1f3873156442e44e1 > core/src/main/scala/kafka/server/ReplicaManager.scala > ee1cc0cf451b691eb91d9158ca765aeb60fc3dc8 > > Diff: https://reviews.apache.org/r/14730/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >