> On May 27, 2015, 6:44 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/network/SocketServer.scala, line 109 > > <https://reviews.apache.org/r/33065/diff/16/?file=971264#file971264line109> > > > > Agreed with Joel's point. We should probably just create a single > > Metrics object in KafkaServer and pass it into SocketServer. Also, we > > should create it in a way similar to that in KafkaProducer. We likely will > > need to add some new metrics related properties in KafkaConfig. > > Gwen Shapira wrote: > I ran into an issue when I tried using a single Metrics object: > > I initiated a single Metrics object and passed it into all the Selectors. > Each selector had its own "processor-id" tags. When the Selectors > initialize, each Sensor is re-used by all Selectors. The different > MetricsNames with the different processor-id tags are all added with to the > same Sensor. When we call Sensor.record - all statistics are updated, for all > Selectors. Which means that all Selectors have the same statistics, which > cannot be correct. > > I solved this with multiple Metrics objects. Did I miss a better solution?
This seems to be an issue with the Seletor. Basically, when creating a sensor in a selector, we didn't include the metricTags in the sensor name. Therefore, the sensors in the selector will have the same name if the metric instance is the same. We can fix this by including the tags in the sensor name. The sensor name just have to be unique and is not tied to the external jmx name. So, we can probably just add strings like "tagKey-tagValue" to the sensor name. This will make the construction of the sensor name more expensive. We can probably create a separate jira to optimize that a bit (e.g. by caching the global and the per node sensor names). - Jun ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review85412 ----------------------------------------------------------- On May 24, 2015, 4:53 p.m., Gwen Shapira wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33065/ > ----------------------------------------------------------- > > (Updated May 24, 2015, 4:53 p.m.) > > > Review request for kafka. > > > Bugs: 1928 and KAFKA-1928 > https://issues.apache.org/jira/browse/1928 > https://issues.apache.org/jira/browse/KAFKA-1928 > > > Repository: kafka > > > Description > ------- > > first pass on replacing Send > > > implement maxSize and improved docs > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > Conflicts: > core/src/main/scala/kafka/network/RequestChannel.scala > > moved selector out of abstract thread > > > mid-way through putting selector in SocketServer > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > Also, SocketServer is now using Selector. Stil a bit messy - but all tests > pass. > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > renamed requestKey to connectionId to reflect new use and changed type from > Any to String > > > Following Jun's comments - moved MultiSend to client. Cleaned up destinations > as well > > > removed reify and remaining from send/recieve API, per Jun. moved > maybeCloseOldest() to Selector per Jay > > > added idString to node API, changed written to int in Send API > > > cleaning up MultiSend, added size() to Send interface > > > fixed some issues with multisend > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > fixed metric thingies > > > fixed response order bug > > > error handling for illegal selector state and fix metrics bug > > > optimized selection key lookup with identity hash > > > fix accidental change > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > addressing Jun's comments > > > removed connection-aging for clients > > > fix issues with exception handling and other cleanup > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > Revert "removed connection-aging for clients" > > This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1. > > improving exception handling and other minor fixes > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, > changed Send size to long, and fixed an existing issue where Reporters are > not actually loaded > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java > da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 > clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java > cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java > 936487b16e7ac566f8bdcd39a7240ceb619fd30e > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java > 1311f85847b022efec8cb05c450bb18231db6979 > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > 435fbb5116e80302eba11ed1d3069cb577dbdcbd > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > bdff518b732105823058e6182f445248b45dc388 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > b2764df11afa7a99fce46d1ff48960d889032d14 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > ef9dd5238fbc771496029866ece1d85db6d7b7a5 > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 8e336a3aa96c73f52beaeb56b931baf4b026cf21 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 187d0004c8c46b6664ddaffecc6166d4b47351e5 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 1e943d621732889a1c005b243920dc32cea7af66 > clients/src/main/java/org/apache/kafka/common/Node.java > f4e4186c7602787e58e304a2f1c293a633114656 > > clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java > 129ae827bccbd982ad93d56e46c6f5c46f147fe0 > clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java > c8213e156ec9c9af49ee09f5238492318516aaa3 > clients/src/main/java/org/apache/kafka/common/network/MultiSend.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java > fc0d168324aaebb97065b0aafbd547a1994d76a7 > clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java > 68327cd3a734fd429966d3e2016a2488dbbb19e5 > clients/src/main/java/org/apache/kafka/common/network/Receive.java > 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a > clients/src/main/java/org/apache/kafka/common/network/Selectable.java > b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 > clients/src/main/java/org/apache/kafka/common/network/Selector.java > 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 > clients/src/main/java/org/apache/kafka/common/network/Send.java > 5d321a09e470166a1c33639cf0cab26a3bce98ec > clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java > 27cbf390c7f148ffa8c5abc154c72cbf0829715c > clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/clients/MockClient.java > 5e3fab13e3c02eb351558ec973b949b3d1196085 > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 8b278892883e63899b53e15efb9d8c926131e858 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > d5b306b026e788b4e5479f3419805aa49ae889f3 > clients/src/test/java/org/apache/kafka/test/MockSelector.java > ea89b06a4c9e5bb351201299cd3037f5226f0e6c > core/src/main/scala/kafka/Kafka.scala > 9efabaadd7d22001a75c3720e2d691f45cd83d9e > core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala > 1c3b3802ac221d570e7610458e50518b4499e7ed > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala > a3b1b78adb760eaeb029466b54f335a29caf3b0f > core/src/main/scala/kafka/api/ControlledShutdownRequest.scala > fe81635c864cec03ca1d4681c9c47c3fc4f975ee > core/src/main/scala/kafka/api/FetchRequest.scala > b038c15186c0cbcc65b59479324052498361b717 > core/src/main/scala/kafka/api/FetchResponse.scala > 75aaf57fb76ec01660d93701a57ae953d877d81c > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > 431190ab94afc4acfc14348a1fc720e17c071cea > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 317daed18db8b02635927d81fbcad623f137de5e > core/src/main/scala/kafka/api/OffsetFetchRequest.scala > fa8bd6a145fd3f08a5f78fcfa813ed7417ccffd2 > core/src/main/scala/kafka/api/OffsetRequest.scala > 3d483bc7518ad76f9548772522751afb4d046b78 > core/src/main/scala/kafka/api/ProducerRequest.scala > 570b2da1d865086f9830aa919a49063abbbe574d > core/src/main/scala/kafka/api/RequestKeys.scala > ef7a86ec3324028496d6bb7c7c6fec7d7d19d64e > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 5e14987c990fe561c01dac2909f5ed21a506e038 > core/src/main/scala/kafka/api/TopicMetadataRequest.scala > 363bae01752318f3849242b97a6619747697c1d9 > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 69f0397b187a737b4ddf50e390d3c2f418ce6b5d > core/src/main/scala/kafka/client/ClientUtils.scala > 62394c0d3813f19a443cf862c8bc6c5808be9f88 > core/src/main/scala/kafka/consumer/SimpleConsumer.scala > 31a2639477bf66f9a05d2b9b07794572d7ec393b > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > 0b0dca15c1ccf7579559f91902602cca78d1b252 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > 6cf13f0a1f7f31ff9367197a435e0ae4427b6438 > core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala > b0b7be14d494ae8c87f4443b52db69d273c20316 > core/src/main/scala/kafka/network/BlockingChannel.scala > 6e2a38eee8e568f9032f95c75fa5899e9715b433 > core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala > c0d77261353478232ab85591c182be57845b3f13 > core/src/main/scala/kafka/network/BoundedByteBufferSend.scala > b95b73b71252932867c60192b3d5b91efe99e122 > core/src/main/scala/kafka/network/ByteBufferSend.scala > af30042a4c713418ecd83b6c6c17dfcbdc101c62 > core/src/main/scala/kafka/network/Handler.scala > a0300336b8cb5a2d5be68b7b48bdbe045bf99324 > core/src/main/scala/kafka/network/RequestChannel.scala > 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 > core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION > core/src/main/scala/kafka/network/SocketServer.scala > edf6214278935c031cf493d72d266e715d43dd06 > core/src/main/scala/kafka/network/Transmission.scala > 2827103d7e57789bb04859bdeb9d4720c8bd060c > core/src/main/scala/kafka/producer/SyncProducer.scala > 0f09951329a8a8f86bd4d1512e8d10eb151ddb43 > core/src/main/scala/kafka/server/KafkaApis.scala > 387e387998fc3a6c9cb585dab02b5f77b0381fbf > core/src/main/scala/kafka/server/KafkaServer.scala > ea6d165d8e5c3146d2c65e8ad1a513308334bf6f > core/src/main/scala/kafka/server/MessageSetSend.scala > 566764850cc60b9d35a4b51abd89a8109f340f5d > core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala > d2bac85e16a247b1326f63619711fb0bbbd2e82a > core/src/test/scala/other/kafka/TestOffsetManager.scala > 9881bd3dff0591f315bd53aea96d3c6e12a24cb6 > core/src/test/scala/unit/kafka/KafkaConfigTest.scala > bc4aef31940b7ab896bd0a5559a3a63d9f7de915 > core/src/test/scala/unit/kafka/network/SocketServerTest.scala > 95d562134c0414ddc3caaa1e1defeb246f585b0b > > Diff: https://reviews.apache.org/r/33065/diff/ > > > Testing > ------- > > > Thanks, > > Gwen Shapira > >