-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review83959
-----------------------------------------------------------
Thanks for the patch. There seem to be some compilation errors. Perhaps need to
rebase.
:core:compileScala/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala:68:
type mismatch;
found : kafka.network.RequestOrResponseSend
required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new
RequestOrResponseSend(request.connectionId, errorResponse)))
^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala:65:
type mismatch;
found : kafka.network.RequestOrResponseSend
required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new
RequestOrResponseSend(request.connectionId, errorResponse)))
^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/FetchRequest.scala:152:
type mismatch;
found : kafka.api.FetchResponseSend
required: kafka.network.Send
requestChannel.sendResponse(new RequestChannel.Response(request, new
FetchResponseSend(request.connectionId, errorResponse)))
^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala:189:
type mismatch;
found : kafka.network.RequestOrResponseSend
required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new
RequestOrResponseSend(request.connectionId, errorResponse)))
^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:78:
value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see
corresponding Javadoc for more information.
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:167:
type mismatch;
found : kafka.network.RequestOrResponseSend
required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new
RequestOrResponseSend(request.connectionId, commitResponse)))
^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetFetchRequest.scala:99:
type mismatch;
found : kafka.network.RequestOrResponseSend
required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new
RequestOrResponseSend(request.connectionId, errorResponse)))
^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetRequest.scala:121:
type mismatch;
found : kafka.network.RequestOrResponseSend
required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new
RequestOrResponseSend(request.connectionId, errorResponse)))
^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/network/RequestChannel.scala:29:
imported `Send' is permanently hidden by definition of trait Send in package
network
import org.apache.kafka.common.network.Send
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
<https://reviews.apache.org/r/33065/#comment135068>
We don't need this since the idle conenctions only need to be closed on the
server side.
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
<https://reviews.apache.org/r/33065/#comment135072>
We don't need this since the idle conenctions only need to be closed on the
server side.
clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
<https://reviews.apache.org/r/33065/#comment135073>
Actually, with MultiSend, we will be sending a 4-byte size plus the
payload, the sum of which could be a bit larger than max_int. So, I think we
need to make writeTo and size return long instead in Send. Sorry for the
incorrect suggestion earlier.
clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
<https://reviews.apache.org/r/33065/#comment135074>
To be consistent, no need to wrap single line statement in {}.
clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment135059>
It seems that we don't really need IdenityHashMap to optimize performance.
In the following link, HashMap gives comparable performance as IdenityHashMap
on String keys since String caches the hashcode.
http://java-performance.info/java-util-identityhashmap/
clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment135056>
We only need to close idle connections on the server side. So, we can use
MAX_LONG for connectionMaxIdleMs for the clients. But we need to see if
maybeCloseOldestConnection handles overflows well. Alternatively, we can just
not maintain the lruConnection on the client side.
core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/33065/#comment135077>
sends can be private.
core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/33065/#comment135078>
sends can be private.
core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135015>
Our convention is to use () for methods with side effects. So, we should do
wakeup(). Same is true for shutdownHook.
core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135044>
It seems that we can just call wakeup() and get rid of shutdownHook.
core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135052>
I guess we will make use of this in a followup patch?
core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135016>
This is not enough to get the thread out of the loop. Also, we probably
don't want to continue with the rest of the logic. The easiest way is to
rethrow the exception to kill the thread.
core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135020>
Actually, if the incoming request is invalid, the exception will be thrown
here. So, we should handle the exception here. We probably should just handle
Exception, instead of Throwable.
core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135019>
Do we need to close again? We may hit the same exception as before. Also,
see the comment above.
core/src/test/resources/log4j.properties
<https://reviews.apache.org/r/33065/#comment135080>
These are probably not intended.
- Jun Rao
On May 15, 2015, 7:30 a.m., Gwen Shapira wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33065/
> -----------------------------------------------------------
>
> (Updated May 15, 2015, 7:30 a.m.)
>
>
> Review request for kafka.
>
>
> Bugs: 1928 and KAFKA-1928
> https://issues.apache.org/jira/browse/1928
> https://issues.apache.org/jira/browse/KAFKA-1928
>
>
> Repository: kafka
>
>
> Description
> -------
>
> first pass on replacing Send
>
>
> implement maxSize and improved docs
>
>
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into
> KAFKA-1928-v2
>
>
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into
> KAFKA-1928-v2
>
> Conflicts:
> core/src/main/scala/kafka/network/RequestChannel.scala
>
> moved selector out of abstract thread
>
>
> mid-way through putting selector in SocketServer
>
>
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into
> KAFKA-1928-v2
>
> Also, SocketServer is now using Selector. Stil a bit messy - but all tests
> pass.
>
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into
> KAFKA-1928-v2
>
>
> renamed requestKey to connectionId to reflect new use and changed type from
> Any to String
>
>
> Following Jun's comments - moved MultiSend to client. Cleaned up destinations
> as well
>
>
> removed reify and remaining from send/recieve API, per Jun. moved
> maybeCloseOldest() to Selector per Jay
>
>
> added idString to node API, changed written to int in Send API
>
>
> cleaning up MultiSend, added size() to Send interface
>
>
> fixed some issues with multisend
>
>
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into
> KAFKA-1928-v2
>
>
> fixed metric thingies
>
>
> fixed response order bug
>
>
> error handling for illegal selector state and fix metrics bug
>
>
> optimized selection key lookup with identity hash
>
>
> Diffs
> -----
>
> clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
> da76cc257b4cfe3c4bce7120a1f14c7f31ef8587
> clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
> 936487b16e7ac566f8bdcd39a7240ceb619fd30e
> clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
> 1311f85847b022efec8cb05c450bb18231db6979
> clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
> 435fbb5116e80302eba11ed1d3069cb577dbdcbd
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
> bdff518b732105823058e6182f445248b45dc388
> clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60
>
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
> e55ab11df4db0b0084f841a74cbcf819caf780d5
>
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
> ef9dd5238fbc771496029866ece1d85db6d7b7a5
> clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> 8e336a3aa96c73f52beaeb56b931baf4b026cf21
> clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
> 187d0004c8c46b6664ddaffecc6166d4b47351e5
>
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
> 1e943d621732889a1c005b243920dc32cea7af66
> clients/src/main/java/org/apache/kafka/common/Node.java
> f4e4186c7602787e58e304a2f1c293a633114656
>
> clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
> 129ae827bccbd982ad93d56e46c6f5c46f147fe0
> clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
> c8213e156ec9c9af49ee09f5238492318516aaa3
> clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
> PRE-CREATION
> clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
> fc0d168324aaebb97065b0aafbd547a1994d76a7
> clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
> 68327cd3a734fd429966d3e2016a2488dbbb19e5
> clients/src/main/java/org/apache/kafka/common/network/Receive.java
> 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a
> clients/src/main/java/org/apache/kafka/common/network/Selectable.java
> b5f8d83e89f9026dc0853e5f92c00b2d7f043e22
> clients/src/main/java/org/apache/kafka/common/network/Selector.java
> 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302
> clients/src/main/java/org/apache/kafka/common/network/Send.java
> 5d321a09e470166a1c33639cf0cab26a3bce98ec
> clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
> 27cbf390c7f148ffa8c5abc154c72cbf0829715c
> clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
> PRE-CREATION
> clients/src/test/java/org/apache/kafka/clients/MockClient.java
> 5e3fab13e3c02eb351558ec973b949b3d1196085
> clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
> 8b278892883e63899b53e15efb9d8c926131e858
> clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
> d5b306b026e788b4e5479f3419805aa49ae889f3
> clients/src/test/java/org/apache/kafka/test/MockSelector.java
> ea89b06a4c9e5bb351201299cd3037f5226f0e6c
> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
> 1c3b3802ac221d570e7610458e50518b4499e7ed
> core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
> a3b1b78adb760eaeb029466b54f335a29caf3b0f
> core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
> fe81635c864cec03ca1d4681c9c47c3fc4f975ee
> core/src/main/scala/kafka/api/FetchRequest.scala
> b038c15186c0cbcc65b59479324052498361b717
> core/src/main/scala/kafka/api/FetchResponse.scala
> 75aaf57fb76ec01660d93701a57ae953d877d81c
> core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
> 431190ab94afc4acfc14348a1fc720e17c071cea
> core/src/main/scala/kafka/api/OffsetCommitRequest.scala
> 317daed18db8b02635927d81fbcad623f137de5e
> core/src/main/scala/kafka/api/OffsetFetchRequest.scala
> fa8bd6a145fd3f08a5f78fcfa813ed7417ccffd2
> core/src/main/scala/kafka/api/OffsetRequest.scala
> 3d483bc7518ad76f9548772522751afb4d046b78
> core/src/main/scala/kafka/api/ProducerRequest.scala
> 570b2da1d865086f9830aa919a49063abbbe574d
> core/src/main/scala/kafka/api/StopReplicaRequest.scala
> 5e14987c990fe561c01dac2909f5ed21a506e038
> core/src/main/scala/kafka/api/TopicMetadataRequest.scala
> 363bae01752318f3849242b97a6619747697c1d9
> core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
> 69f0397b187a737b4ddf50e390d3c2f418ce6b5d
> core/src/main/scala/kafka/client/ClientUtils.scala
> 62394c0d3813f19a443cf862c8bc6c5808be9f88
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> 31a2639477bf66f9a05d2b9b07794572d7ec393b
> core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
> aa8d9404a3e78a365df06404b79d0d8f694b4bd6
> core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> 6cf13f0a1f7f31ff9367197a435e0ae4427b6438
> core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
> b0b7be14d494ae8c87f4443b52db69d273c20316
> core/src/main/scala/kafka/network/BlockingChannel.scala
> 6e2a38eee8e568f9032f95c75fa5899e9715b433
> core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
> c0d77261353478232ab85591c182be57845b3f13
> core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
> b95b73b71252932867c60192b3d5b91efe99e122
> core/src/main/scala/kafka/network/ByteBufferSend.scala
> af30042a4c713418ecd83b6c6c17dfcbdc101c62
> core/src/main/scala/kafka/network/Handler.scala
> a0300336b8cb5a2d5be68b7b48bdbe045bf99324
> core/src/main/scala/kafka/network/RequestChannel.scala
> 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026
> core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION
> core/src/main/scala/kafka/network/SocketServer.scala
> edf6214278935c031cf493d72d266e715d43dd06
> core/src/main/scala/kafka/network/Transmission.scala
> 2827103d7e57789bb04859bdeb9d4720c8bd060c
> core/src/main/scala/kafka/producer/SyncProducer.scala
> 0f09951329a8a8f86bd4d1512e8d10eb151ddb43
> core/src/main/scala/kafka/server/KafkaApis.scala
> 417960dd1ab407ebebad8fdb0e97415db3e91a2f
> core/src/main/scala/kafka/server/KafkaServer.scala
> b7d2a2842e17411a823b93bdedc84657cbd62be1
> core/src/main/scala/kafka/server/MessageSetSend.scala
> 566764850cc60b9d35a4b51abd89a8109f340f5d
> core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
> d2bac85e16a247b1326f63619711fb0bbbd2e82a
> core/src/test/resources/log4j.properties
> 1b7d5d8f7d5fae7d272849715714781cad05d77b
> core/src/test/scala/other/kafka/TestOffsetManager.scala
> 9881bd3dff0591f315bd53aea96d3c6e12a24cb6
> core/src/test/scala/unit/kafka/network/SocketServerTest.scala
> 95d562134c0414ddc3caaa1e1defeb246f585b0b
>
> Diff: https://reviews.apache.org/r/33065/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Gwen Shapira
>
>