-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------
(Updated Sept. 2, 2014, 1:09 a.m.)
Review request for kafka.
Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583
Repository: kafka
Description (updated)
-------
Incorporated Jun's comments round two.
Incorporated Jun's comments.
1. I left some cases in Log since they are return values for some of their APIs.
2. I kept the fetch info in the delayed fetch metadata since it needs to be
used for re-reading the log.
3. I kept the name of "callbackOnComplete" by following the principle that only
the caller knows what the callback is used for, and hence they can name the
callback as reponseCallback (from KafkaApi) and putCacheCallback (from
OffsetManager), all the callee will take the callback as "callbackOnComplete".
Unit test passed, with some other notes:
1. Found and fix a bug in the current delayed fetch satisifaction logic:
previously when we calculate the bytes, we do not take in the fetchMaxBytes
into consideration as an upper limit for a single partition's log, but simply
get the diff between the current HW/LEO and the fetch offset.
2. Found and fix a bug in the unit tests: we used to create replica manager on
the fly but did not shut it down upon completing the test, which will leak the
background thread (i.e. reaper thread of purgatory).
3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two
implemented functions: forceComplete() and isCompleted(), and two functions
that need to be instantiated in the subclasses: tryComplete() and complete().
Please let me know if people have more comments on the current API.
4. Cleaned the SimpleFetch test, previously this test is too complicate but it
actually just test a simple logic of the replica manager.
Diffs
-----
core/src/main/scala/kafka/api/FetchRequest.scala
51cdccf7f90eb530cc62b094ed822b8469d50b12
core/src/main/scala/kafka/api/FetchResponse.scala
af9308737bf7832eca018c2b3ede703f7d1209f1
core/src/main/scala/kafka/api/OffsetCommitRequest.scala
861a6cf11dc6b6431fcbbe9de00c74a122f204bd
core/src/main/scala/kafka/api/ProducerRequest.scala
b2366e7eedcac17f657271d5293ff0bef6f3cbe6
core/src/main/scala/kafka/api/ProducerResponse.scala
a286272c834b6f40164999ff8b7f8998875f2cfe
core/src/main/scala/kafka/cluster/Partition.scala
ff106b47e6ee194cea1cf589474fef975b9dd7e2
core/src/main/scala/kafka/common/ErrorMapping.scala
3fae7910e4ce17bc8325887a046f383e0c151d44
core/src/main/scala/kafka/log/Log.scala
0ddf97bd30311b6039e19abade41d2fbbad2f59b
core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
a624359fb2059340bb8dc1619c5b5f226e26eb9b
core/src/main/scala/kafka/server/DelayedFetch.scala
e0f14e25af03e6d4344386dcabc1457ee784d345
core/src/main/scala/kafka/server/DelayedProduce.scala
9481508fc2d6140b36829840c337e557f3d090da
core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
ed1318891253556cdf4d908033b704495acd5724
core/src/main/scala/kafka/server/KafkaApis.scala
c584b559416b3ee4bcbec5966be4891e0a03eefb
core/src/main/scala/kafka/server/OffsetManager.scala
43eb2a35bb54d32c66cdb94772df657b3a104d1a
core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f
core/src/main/scala/kafka/server/ReplicaManager.scala
68758e35d496a4659819960ae8e809d6e215568e
core/src/main/scala/kafka/server/RequestPurgatory.scala
ce06d2c381348deef8559374869fcaed923da1d1
core/src/main/scala/kafka/utils/DelayedItem.scala
d7276494072f14f1cdf7d23f755ac32678c5675c
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
03a424d45215e1e7780567d9559dae4d0ae6fc29
core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
cd302aa51eb8377d88b752d48274e403926439f2
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
a9c4ddc78df0b3695a77a12cf8cf25521a203122
core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
168712de241125982d556c188c76514fceb93779
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5
Diff: https://reviews.apache.org/r/24676/diff/
Testing
-------
Unit tests
Thanks,
Guozhang Wang