----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review65462 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java <https://reviews.apache.org/r/27391/#comment108626> can you add a comment: // only v0, v1 of offsetcommitrequest clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java <https://reviews.apache.org/r/27391/#comment108637> Not introduced by your patch, but it is odd that these are named topicResponseObj and partitionResponse below - probably an artifact of copy/paste. Can you do a rename here before checking in? core/src/main/scala/kafka/common/OffsetMetadataAndError.scala <https://reviews.apache.org/r/27391/#comment108638> I think we discussed before that timestamp does not need to be a var. We can use the case class copy method to make a copy + edit. core/src/main/scala/kafka/server/KafkaServer.scala <https://reviews.apache.org/r/27391/#comment108642> Thanks for fixing this core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/27391/#comment108643> I think it would be better to move this to just before the call to offsetCommitValue in the loop in line 228. This method should only be responsible for taking the offsetAndMetadata and converting that into the on-disk bytes and should not concern itself with setting a critical field like the expiration timestamp. I was actually looking for where this happens (i.e., setting the expiration time) and took me a while to realize it was hidden in here. core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/27391/#comment108644> I think we can make this and some other methods here private. core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/27391/#comment108645> private core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/27391/#comment108646> Also, let us use a case class instead of a tuple core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala <https://reviews.apache.org/r/27391/#comment108655> Rather than sleep, we should improve OffsetManager to take in a MockScheduler instance - we can pass through the time instance from KafkaServer to offsetManager as we do for LogManager and replicaManager. That way we can advance time with MockTime. This test will need to change from OffsetCommitTest to OffsetManagerTest and we will just test the OffsetManager. Can you file a jira for that? Although that would make sense only after you check this in. core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala <https://reviews.apache.org/r/27391/#comment108647> If the offset in fact did expire, the assertion itself won't fail - i.e., you will get a NoSuchElementException Same comments apply to checks below. - Joel Koshy On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/27391/ > ----------------------------------------------------------- > > (Updated Dec. 2, 2014, 2:03 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1634 > https://issues.apache.org/jira/browse/KAFKA-1634 > > > Repository: kafka > > > Description > ------- > > Add another api in offset manager to return the struct, and the cache layer > will only read its expiration timestamp while the offset formatter will read > the struct as a whole > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java > 7517b879866fc5dad5f8d8ad30636da8bbe7784a > clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java > 121e880a941fcd3e6392859edba11a94236494cc > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java > 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f > > clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java > df37fc6d8f0db0b8192a948426af603be3444da4 > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 050615c72efe7dbaa4634f53943bd73273d20ffb > core/src/main/scala/kafka/api/OffsetFetchRequest.scala > c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala > 4cabffeacea09a49913505db19a96a55d58c0909 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > da29a8cb461099eb675161db2f11a9937424a5c6 > core/src/main/scala/kafka/server/KafkaApis.scala > 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 > core/src/main/scala/kafka/server/KafkaServer.scala > 1bf7d10cef23a77e716666eb16bf6d0e68bc4ebe > core/src/main/scala/kafka/server/OffsetManager.scala > 3c79428962604800983415f6f705e04f52acb8fb > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > 8c5364fa97da1be09973c176d1baeb339455d319 > > Diff: https://reviews.apache.org/r/27391/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >