-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review48283
-----------------------------------------------------------


Thanks for the patch. There are some unused imports. Detailed comments below.


core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84747>

    We may not be able to remove the readlock here. The issue is that this 
method accesses not only leaderReplicaIdOpt, but other internal data structures 
like assignedReplicaMap. Without the lock, the read from the Map could fail 
even it's being concurrently modified.
    
    In general, we can get away with the lock only if we want to read a single 
internal value. Perhaps we can introduce another function isLeaderLocal() that 
returns a boolean. This method will only need to access leaderReplicaIdOpt. 
Then all callers will first call leaderReplicaIfLocal and hold onto the leader 
replica. They can then use isLeaderLocal to see if the leader has changed 
subsequently.



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84749>

    It's possible that we get an UnknownOffsetMetadata during the conversion. 
In this case, we probably should set HW to logEndOffset.



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84757>

    Instead of using negation, could we do 
leaderHW.precedes(replica.logEndOffset)?
    
    Also, could we move && to the previous line?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84750>

    Not sure if we need to copy since inSyncReplicas is immutable. We probably 
just need to do a reference assignment. 



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84812>

    This seems to be an existing problem. If ack=-1, a safer check is HW >= 
requiredOffset. This way, we will be sure that if ISR expands, the acked 
message is guaranteed to be in the replicas newly added to ISR.
    
    The following is an example that shows the issue with the existing check. 
Suppose that all replicas in ISR are at offset 10, but HW is still at 8 and we 
call checkEnoughReplicasReachOffset on offset 9.  The check will be satisfied 
and the message is considered committed. We will be updating HW to 10 pretty 
soon. However, before that happens, another replica whose LEO is at 8 can be 
added to ISR. This replica won't have message 9, which is acked as committed. 



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84832>

    Perhaps we could create TopicPartionRequestKey just once?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84833>

    Move && to previous line?



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/23767/#comment84834>

    Should we rename highWatermarkValue and logEndOffsetValue to 
highWatermarkMetadata and logEndOffsetMetadata?



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/23767/#comment84698>

    It would be a bit confusing to reason about the consistency btw nextOffset 
and nextOffsetMetadata since they are not updated atomically. Could we just 
keep nextOffsetMetadata?



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/23767/#comment84699>

    Perhaps we can add a bit details to the comment. So, we are in the 
situation that the startOffset is in range, but we can't find a single message 
whose offset is >= startOffset. One possibility seems to be that all messages 
after startOffset have been deleted due to compaction. Is that the only case? 
Let's describe all situations when this can happen. 



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment84915>

    In DelayedProduce, we don't send the response immediately if one partition 
has an error. Should we do the same thing for DelayedFetch? Will that make the 
logic simpler?



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment84836>

    All those should probably be private.



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment84913>

    Will this case ever happen? If so, could we add a comment how this can 
happen?



core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84746>

    This can be private.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/23767/#comment84910>

    If ack is >1, it won't be -1. So "but no = -1" is redundant.



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment84814>

    Would it be better to name it offsetOrdering?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment84687>

    The ordering doesn't match that in the signature.



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment84919>

    Should we just use one constructor with defaults?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment84689>

    Do we need the space after !?



core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84736>

    This can be private.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment84837>

    TopicAndPartition -> (PartitionData, OffsetMetadta)
    
    It probably will be clearer if we use a case Class 
PartitionDataAndOffsetMetadata, instead of using a pair.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment84839>

    Perhaps it will be clearer if we return a FetchResponseAndOffsetMetadata 
instead of a pair?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment84754>

    Would updateReplicaLEOAndHW be enough?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment84912>

    Do we need to log (topic, partition) twice?



core/src/main/scala/kafka/server/RequestKey.scala
<https://reviews.apache.org/r/23767/#comment84920>

    Should we rename this to DelayedRequestKey?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84921>

    These comments may need to be changed according to the comments below.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84718>

    I actually think it's more intuitive to return true if the request is 
satisfied by the caller. Then, we can assign a meaningful return val in the 
caller.
    
    val isSatisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(request)
    
    Also, We need to explain this method better. How about the following 
comment?
    
    Potentially add the request for watch on all keys. Return true iff the 
request is satisfied and the satisfaction is done by the caller.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84728>

    key doesn't seem to be used.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84709>

    Could we name this checkAndMaybeAdd and add the comment?



core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
<https://reviews.apache.org/r/23767/#comment84922>

    Can this just be created as Map(a -> b)?


- Jun Rao


On July 21, 2014, 7:53 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 21, 2014, 7:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Rebased on KAFKA-1462: 1. LogSegment.read() will also return fetch info, even 
> if the corresponding message set is empty; 2. Purgatory checking satisfactory 
> in checkAndMaybeWatch synchronously, and will only return false if this 
> thread successfully set the satisfactory bit to true; 3. Remove the read lock 
> on Partition's reading of the leaderOpt and epoch and making them volatile 
> instead since these two functions are just single read; 4. Fix some minor 
> issues in TestEndToEndLatency; 5. Other minor fixes
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/FileMessageSet.scala 
> b2652ddbe2f857028d5980e29a008b2c614694a3 
>   core/src/main/scala/kafka/log/Log.scala 
> b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
>   core/src/main/scala/kafka/log/LogSegment.scala 
> 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
> 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
>   core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 
> 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 
> 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 
> 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
> 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala 
> cec1caecc51507ae339ebf8f3b8a028b12a1a056 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
> d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
> 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 
> 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
> 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
> 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to