> On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: > > Thanks for the patch. Looks promising. Some comments. > > > > 1. I overlooked this when I suggested the new broker format in ZK. This > > means that we will need to upgrade all consumer clients before we can turn > > on the flag of using the new protocol on the brokers, which may not be > > convenient. Now, I think your earlier approach is probably better because > > of this?
Yeah, this will break ZK consumer :( I'm having second thoughts about the use.new.wire.protocol flag. After finishing the upgrade, it will have to be "true" on all brokers. Then during the next upgrade you'll need to set it back to "false", and then back to true again... Perhaps we need something like: wire.protocol.version and accept values like 0.8.2, 0.8.3, 0.9, etc? This way you won't have to change it twice on each upgrade. Thoughts? > On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/javaapi/TopicMetadata.scala, lines 55-65 > > <https://reviews.apache.org/r/28769/diff/12/?file=820424#file820424line55> > > > > Technically, this is an api change since it's used in > > javaapi.SimpleConsumer. The caller will now get a different type in the > > response. An alternative is to leave Broker as it is and create sth like > > BrokerProfile to include all endpoints. Perhaps, we need to discuss this in > > WIP a bit, whether it's better to break the api in order to use a more > > meaningingful class name, or not break the api and stick with a lousy name. Yeah, I think I mentioned that at one of the discussions. We decided we don't want to support the new security protocols on the old clients (which will be deprecated by the time this is included in a release), we definitely don't want to demand upgrade of clients during broker upgrade - but this API breakage won't do it. It just means that if you build a Simple Consumer, the highest version you can depend on is 0.8.2. Simple Consumers built on old versions will keep working (since we kept wire protocol compatible), and they will simple serialize the TopicMetadataResponse into Broker. Upgrades will work as long as no one will change dependencies and rebuild clients, which sounds fairly reasonable to me. I'll bring it up on the mailing list. > On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaConfig.scala, lines 185-186 > > <https://reviews.apache.org/r/28769/diff/12/?file=820431#file820431line185> > > > > I am thinking about how we should name this field. Since this is only > > needed for internal communication among brokers, perhaps we should name it > > as sth like use.new.intra.broker.wire.protocol. My next question is what > > happens if we have intra broker protocol changes in 2 releases. Do we want > > to use different names so that we can enable each change independantly? An > > alternative is to have the same property name and the meaning is to turn on > > intra broker changes introduced in this release only. The latter implies > > that one can't skip the upgrading of the intermediate release. So, my > > feeling is that probably the former will be better? Perhaps we can bring > > this up in the WIP discussion. I also had thoughts about it (see my reply to first comment). Lets discuss on mailing list. > On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: > > kafka-patch-review.py, line 10 > > <https://reviews.apache.org/r/28769/diff/12/?file=820463#file820463line10> > > > > Are the changes in this file intended? I rebased and it looks like a bunch of stuff got included by mistake. Not sure if I did something wrong or its an issue in the patch review tool. Anyway, I'll clean it up. > On Jan. 23, 2015, 1:57 a.m., Jun Rao wrote: > > system_test/utils/kafka_system_test_utils.py, lines 389-396 > > <https://reviews.apache.org/r/28769/diff/12/?file=820465#file820465line389> > > > > I thought protocol is specified separately, and not in broker.list? yes, it is separate. I think these properties are reused to start the brokers too (as "listeners"), but I'll have to double check here. - Gwen ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/#review69281 ----------------------------------------------------------- On Jan. 14, 2015, 2:16 a.m., Gwen Shapira wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/28769/ > ----------------------------------------------------------- > > (Updated Jan. 14, 2015, 2:16 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1809 > https://issues.apache.org/jira/browse/KAFKA-1809 > > > Repository: kafka > > > Description > ------- > > trivial change to add byte serializer to ProducerPerformance; patched by Jun > Rao > > > first commit of refactoring. > > > changed topicmetadata to include brokerendpoints and fixed few unit tests > > > fixing systest and support for binding to default address > > > fixed system tests > > > fix default address binding and ipv6 support > > > fix some issues regarding endpoint parsing. Also, larger segments for systest > make the validation much faster > > > added link to security wiki in doc > > > fixing unit test after rename of ProtocolType to SecurityProtocol > > > Following Joe's advice, added security protocol enum on client side, and > modified protocol to use ID instead of string. > > > validate producer config against enum > > > add a second protocol for testing and modify SocketServerTests to check on > multi-ports > > > Reverted the metadata request changes and removed the explicit security > protocol argument. Instead the socketserver will determine the protocol based > on the port and add this to the request > > > bump version for UpdateMetadataRequest and added support for rolling upgrades > with new config > > > following tests - fixed LeaderAndISR protocol and ZK registration for > backward compatibility > > > cleaned up some changes that were actually not necessary. hopefully making > this patch slightly easier to review > > > undoing some changes that don't belong here > > > bring back config lost in cleanup > > > fixes neccessary for an all non-plaintext cluster to work > > > minor modifications following comments by Jun > > > added missing license > > > formatting > > > clean up imports > > > cleaned up V2 to not include host+port field. Using use.new.protocol flag to > decide which version to serialize > > > change endpoints collection in Broker to Map[protocol, endpoint], mostly to > be clear that we intend to have one endpoint per protocol > > > validate that listeners and advertised listeners have unique ports and > protocols > > > support legacy configs > > > some fixes following rebase > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 > > clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java > 1b828007975ef8893717b425ed96874d4ef7053f > > clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 527dd0f9c47fce7310b7a37a9b95bf87f1b9c292 > clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java > 6e37ea553f73d9c584641c48c56dbf6e62ba5f88 > clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java > a39fab532f73148316a56c0f8e9197f38ea66f79 > config/server.properties b0e4496a8ca736b6abe965a430e8ce87b0e8287f > core/src/main/scala/kafka/admin/AdminUtils.scala > 28b12c7b89a56c113b665fbde1b95f873f8624a3 > core/src/main/scala/kafka/admin/TopicCommand.scala > 285c0333ff43543d3e46444c1cd9374bb883bb59 > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala > 84f60178f6ebae735c8aa3e79ed93fe21ac4aea7 > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > 4ff7e8f8cc695551dd5d2fe65c74f6b6c571e340 > core/src/main/scala/kafka/api/TopicMetadata.scala > 0190076df0adf906ecd332284f222ff974b315fc > core/src/main/scala/kafka/api/TopicMetadataResponse.scala > 92ac4e687be22e4800199c0666bfac5e0059e5bb > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 530982e36b17934b8cc5fb668075a5342e142c59 > core/src/main/scala/kafka/client/ClientUtils.scala > ebba87f0566684c796c26cb76c64b4640a5ccfde > core/src/main/scala/kafka/cluster/Broker.scala > 0060add008bb3bc4b0092f2173c469fce0120be6 > core/src/main/scala/kafka/cluster/BrokerEndPoint.scala PRE-CREATION > core/src/main/scala/kafka/cluster/EndPoint.scala PRE-CREATION > core/src/main/scala/kafka/cluster/SecurityProtocol.scala PRE-CREATION > core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala > PRE-CREATION > core/src/main/scala/kafka/consumer/ConsumerConfig.scala > 9ebbee6c16dc83767297c729d2d74ebbd063a993 > core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala > b9e2bea7b442a19bcebd1b350d39541a8c9dd068 > core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala > ee6139c901082358382c5ef892281386bf6fc91b > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > 191a8677444e53b043e9ad6e94c5a9191c32599e > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > eb492f00449744bc8d63f55b393e2a1659d38454 > core/src/main/scala/kafka/controller/KafkaController.scala > 66df6d2fbdbdd556da6bea0df84f93e0472c8fbf > core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala > 1b28861cdf7dfb30fc696b54f8f8f05f730f4ece > core/src/main/scala/kafka/javaapi/TopicMetadata.scala > f384e04678df10a5b46a439f475c63371bf8e32b > core/src/main/scala/kafka/network/RequestChannel.scala > 7b1db3dbbb2c0676f166890f566c14aa248467ab > core/src/main/scala/kafka/network/SocketServer.scala > 39b1651b680b2995cedfde95d74c086d9c6219ef > core/src/main/scala/kafka/producer/ProducerPool.scala > 43df70bb461dd3e385e6b20396adef3c4016a3fc > core/src/main/scala/kafka/server/AbstractFetcherManager.scala > 20c00cb8cc2351950edbc8cb1752905a0c26e79f > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > 8c281d4668f92eff95a4a5df3c03c4b5b20e7095 > core/src/main/scala/kafka/server/KafkaApis.scala > c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 > core/src/main/scala/kafka/server/KafkaConfig.scala > bbd3fd75e83961c971e50cf83d48db79a3da8791 > core/src/main/scala/kafka/server/KafkaHealthcheck.scala > 4acdd70fe9c1ee78d6510741006c2ece65450671 > core/src/main/scala/kafka/server/KafkaServer.scala > a069eb9272c92ef62387304b60de1fe473d7ff49 > core/src/main/scala/kafka/server/MetadataCache.scala > bf81a1ab88c14be8697b441eedbeb28fa0112643 > core/src/main/scala/kafka/server/ReplicaFetcherManager.scala > 351dbbad3bdb709937943b336a5b0a9e0162a5e2 > core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > 6879e730282185bda3d6bc3659cb15af0672cecf > core/src/main/scala/kafka/server/ReplicaManager.scala > e58fbb922e93b0c31dff04f187fcadb4ece986d7 > core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala > d1e7c434e77859d746b8dc68dd5d5a3740425e79 > core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala > ba6ddd7a909df79a0f7d45e8b4a2af94ea0fceb6 > core/src/main/scala/kafka/tools/SimpleConsumerShell.scala > b4f903b6c7c3bb725cac7c05eb1f885906413c4d > core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala > 111c9a8b94ce45d95551482e9fd3f8c1cccbf548 > core/src/main/scala/kafka/utils/Utils.scala > 738c1af9ef5de16fdf5130daab69757a14c48b5c > core/src/main/scala/kafka/utils/ZkUtils.scala > c14bd455b6642f5e6eb254670bef9f57ae41d6cb > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > 5ec613cdb50b93bfe4477e89554dfc3768759b18 > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > b15237b76def3b234924280fa3fdb25dbb0cc0dc > core/src/test/scala/other/kafka/TestOffsetManager.scala > 41f334d48897b3027ed54c58bbf4811487d3b191 > core/src/test/scala/unit/kafka/KafkaConfigTest.scala > 4d36b8b1173f60d43463c13c9d8c1275a84c8c28 > core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala > 1bf2667f47853585bc33ffb3e28256ec5f24ae84 > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/cluster/BrokerTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala > c0355cc0135c6af2e346b4715659353a31723b86 > core/src/test/scala/unit/kafka/integration/FetcherTest.scala > 25845abbcad2e79f56f729e59239b738d3ddbc9d > core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala > 35dc071b1056e775326981573c9618d8046e601d > core/src/test/scala/unit/kafka/log/LogTest.scala > c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e > core/src/test/scala/unit/kafka/network/SocketServerTest.scala > 78b431f9c88cca1bc5e430ffd41083d0e92b7e75 > core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > 1db6ac329f7b54e600802c8a623f80d159d4e69b > core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > d60d8e0f49443f4dc8bc2cad6e2f951eda28f5cb > core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala > f0c4a56b61b4f081cf4bee799c6e9c523ff45e19 > core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala > 2377abe4933e065d037828a214c3a87e1773a8ef > core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala > c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e > core/src/test/scala/unit/kafka/server/LogOffsetTest.scala > c06ee756bf0fe07e5d3c92823a476c960b37afd6 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > ac15d34425795d5be20c51b01fa1108bdcd66583 > kafka-patch-review.py b7f132f9d210b8648859ab8f9c89f30ec128ab38 > system_test/replication_testsuite/testcase_1/testcase_1_properties.json > 0c6d7a316cc6b51ac0755ca03558507db0706c31 > system_test/utils/kafka_system_test_utils.py > 41d511cbc310fa87e0f2cd2f772e479e8e3ae4e2 > > Diff: https://reviews.apache.org/r/28769/diff/ > > > Testing > ------- > > > Thanks, > > Gwen Shapira > >