Updated Branches: refs/heads/0.8 ede85875a -> 2d7403174
kafka-871; Rename ZkConfig properties; patched by Swapnil Ghike; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d740317 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d740317 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d740317 Branch: refs/heads/0.8 Commit: 2d7403174f8fd6d35fce05585a7ee6edb8af7dd4 Parents: ede8587 Author: Swapnil Ghike <sri...@gmail.com> Authored: Thu Apr 25 18:57:31 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Thu Apr 25 18:57:31 2013 -0700 ---------------------------------------------------------------------- config/consumer.properties | 6 +- config/producer.properties | 2 +- config/server.properties | 6 +- .../main/java/kafka/etl/impl/DataGenerator.java | 2 +- contrib/hadoop-producer/README.md | 2 +- .../kafka/bridge/hadoop/KafkaOutputFormat.java | 4 +- core/src/main/scala/kafka/client/ClientUtils.scala | 4 +- .../scala/kafka/consumer/ConsoleConsumer.scala | 2 +- .../scala/kafka/consumer/ConsumerConnector.scala | 4 +- .../scala/kafka/producer/ConsoleProducer.scala | 2 +- .../scala/kafka/producer/KafkaLog4jAppender.scala | 4 +- .../main/scala/kafka/producer/ProducerConfig.scala | 2 +- .../main/scala/kafka/tools/ReplayLogProducer.scala | 4 +- .../kafka/tools/VerifyConsumerRebalance.scala | 2 +- core/src/main/scala/kafka/utils/Utils.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 8 ++-- .../scala/other/kafka/TestEndToEndLatency.scala | 4 +- .../consumer/ZookeeperConsumerConnectorTest.scala | 4 +- .../test/scala/unit/kafka/log/LogOffsetTest.scala | 2 +- .../unit/kafka/producer/AsyncProducerTest.scala | 18 +++--- .../scala/unit/kafka/producer/ProducerTest.scala | 12 ++-- .../test/scala/unit/kafka/utils/TestUtils.scala | 12 ++-- .../src/main/java/kafka/examples/Consumer.java | 6 +- .../src/main/java/kafka/examples/Producer.java | 2 +- .../scala/kafka/perf/ConsumerPerformance.scala | 2 +- .../scala/kafka/perf/ProducerPerformance.scala | 2 +- .../config/migration_producer.properties | 2 +- .../config/server.properties | 2 + .../testcase_9001/testcase_9001_properties.json | 6 +- .../testcase_9003/testcase_9003_properties.json | 6 +- .../testcase_9004/testcase_9004_properties.json | 6 +- .../testcase_9005/testcase_9005_properties.json | 6 +- .../testcase_9006/testcase_9006_properties.json | 6 +- .../config/mirror_consumer.properties | 6 +- .../config/mirror_producer.properties | 2 +- .../config/server.properties | 4 +- .../replication_testsuite/config/server.properties | 4 +- system_test/utils/kafka_system_test_utils.py | 43 +++++++++------ 38 files changed, 111 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/config/consumer.properties ---------------------------------------------------------------------- diff --git a/config/consumer.properties b/config/consumer.properties index 9dbd583..7343cbc 100644 --- a/config/consumer.properties +++ b/config/consumer.properties @@ -14,13 +14,13 @@ # limitations under the License. # see kafka.consumer.ConsumerConfig for more details -# zk connection string +# Zookeeper connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=127.0.0.1:2181 +zookeeper.connect=127.0.0.1:2181 # timeout in ms for connecting to zookeeper -zk.connection.timeout.ms=1000000 +zookeeper.connection.timeout.ms=1000000 #consumer group id group.id=test-consumer-group http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/config/producer.properties ---------------------------------------------------------------------- diff --git a/config/producer.properties b/config/producer.properties index cc8f5f6..162b8a6 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -18,7 +18,7 @@ # list of brokers used for bootstrapping # format: host1:port1,host2:port2 ... -broker.list=localhost:9092 +metadata.broker.list=localhost:9092 # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/config/server.properties ---------------------------------------------------------------------- diff --git a/config/server.properties b/config/server.properties index 04408dd..bc6a521 100644 --- a/config/server.properties +++ b/config/server.properties @@ -97,15 +97,15 @@ log.cleanup.interval.mins=1 ############################# Zookeeper ############################# -# Zk connection string (see zk docs for details). +# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. -zk.connect=localhost:2181 +zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zk.connection.timeout.ms=1000000 +zookeeper.connection.timeout.ms=1000000 # metrics reporter properties kafka.metrics.polling.interval.secs=5 http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index df17978..4b1d117 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -70,7 +70,7 @@ public class DataGenerator { System.out.println("server uri:" + _uri.toString()); Properties producerProps = new Properties(); - producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort())); + producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort())); producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE)); producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/contrib/hadoop-producer/README.md ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/README.md b/contrib/hadoop-producer/README.md index 1bd3721..547c1ef 100644 --- a/contrib/hadoop-producer/README.md +++ b/contrib/hadoop-producer/README.md @@ -87,7 +87,7 @@ compression codec, one would add the "kafka.output.compression.codec" parameter compression). For easier debugging, the above values as well as the Kafka broker information -(kafka.broker.list), the topic (kafka.output.topic), and the schema +(kafka.metadata.broker.list), the topic (kafka.output.topic), and the schema (kafka.output.schema) are injected into the job's configuration. By default, the Hadoop producer uses Kafka's sync producer as asynchronous operation doesn't make sense in the batch Hadoop case. http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index aa1f944..32f096c 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -124,8 +124,8 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V> // URL: kafka://<kafka host>/<topic> // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar String brokerList = uri.getAuthority(); - props.setProperty("broker.list", brokerList); - job.set(KAFKA_CONFIG_PREFIX + ".broker.list", brokerList); + props.setProperty("metadata.broker.list", brokerList); + job.set(KAFKA_CONFIG_PREFIX + ".metadata.broker.list", brokerList); if (uri.getPath() == null || uri.getPath().length() <= 1) throw new KafkaException("no topic specified in kafka uri"); http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/client/ClientUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index a3d88ea..7b3f09d 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -16,7 +16,7 @@ object ClientUtils extends Logging{ /** * Used by the producer to send a metadata request since it has access to the ProducerConfig * @param topics The topics for which the metadata needs to be fetched - * @param brokers The brokers in the cluster as configured on the producer through broker.list + * @param brokers The brokers in the cluster as configured on the producer through metadata.broker.list * @param producerConfig The producer's config * @return topic metadata response */ @@ -60,7 +60,7 @@ object ClientUtils extends Logging{ */ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = { val props = new Properties() - props.put("broker.list", brokers.map(_.getConnectionString()).mkString(",")) + props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(",")) props.put("client.id", clientId) props.put("request.timeout.ms", timeoutMs.toString) val producerConfig = new ProducerConfig(props) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index d6c4a51..e2b0041 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -158,7 +158,7 @@ object ConsoleConsumer extends Logging { props.put("auto.commit.enable", "true") props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") - props.put("zk.connect", options.valueOf(zkConnectOpt)) + props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString) val config = new ConsumerConfig(props) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/consumer/ConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala index d8c23f2..13c3f77 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala @@ -83,7 +83,7 @@ object Consumer extends Logging { * Create a ConsumerConnector * * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper - * connection string zk.connect. + * connection string zookeeper.connect. */ def create(config: ConsumerConfig): ConsumerConnector = { val consumerConnect = new ZookeeperConsumerConnector(config) @@ -94,7 +94,7 @@ object Consumer extends Logging { * Create a ConsumerConnector * * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper - * connection string zk.connect. + * connection string zookeeper.connect. */ def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = { val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/producer/ConsoleProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 9c2260b..5539bce 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -126,7 +126,7 @@ object ConsoleProducer { cmdLineProps.put("topic", topic) val props = new Properties() - props.put("broker.list", brokerList) + props.put("metadata.broker.list", brokerList) val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec props.put("compression.codec", codec.toString) props.put("producer.type", if(sync) "sync" else "async") http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 3d22e6d..88ae784 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -64,9 +64,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { // check for config parameter validity val props = new Properties() if(brokerList != null) - props.put("broker.list", brokerList) + props.put("metadata.broker.list", brokerList) if(props.isEmpty) - throw new MissingConfigException("The broker.list property should be specified") + throw new MissingConfigException("The metadata.broker.list property should be specified") if(topic == null) throw new MissingConfigException("topic must be specified by the Kafka log4j appender") if(serializerClass == null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/producer/ProducerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index e27ec44..7947b18 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -63,7 +63,7 @@ class ProducerConfig private (val props: VerifiableProperties) * format is host1:port1,host2:port2, and the list can be a subset of brokers or * a VIP pointing to a subset of brokers. */ - val brokerList = props.getString("broker.list") + val brokerList = props.getString("metadata.broker.list") /** the partitioner class for partitioning events amongst sub-topics */ val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner") http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/tools/ReplayLogProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index d744a78..814d61a 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -43,7 +43,7 @@ object ReplayLogProducer extends Logging { // consumer properties val consumerProps = new Properties consumerProps.put("group.id", GroupId) - consumerProps.put("zk.connect", config.zkConnect) + consumerProps.put("zookeeper.connect", config.zkConnect) consumerProps.put("consumer.timeout.ms", "10000") consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString) consumerProps.put("fetch.message.max.bytes", (1024*1024).toString) @@ -139,7 +139,7 @@ object ReplayLogProducer extends Logging { class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) val props = new Properties() - props.put("broker.list", config.brokerList) + props.put("metadata.broker.list", config.brokerList) props.put("reconnect.interval", Integer.MAX_VALUE.toString) props.put("send.buffer.bytes", (64*1024).toString) props.put("compression.codec", config.compressionCodec.codec.toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index d9c8bae..dc6d066 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -25,7 +25,7 @@ object VerifyConsumerRebalance extends Logging { def main(args: Array[String]) { val parser = new OptionParser() - val zkConnectOpt = parser.accepts("zk.connect", "ZooKeeper connect string."). + val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string."). withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]); val groupOpt = parser.accepts("group", "Consumer group."). withRequiredArg().ofType(classOf[String]) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/utils/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index c639efb..e83eb5f 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -507,7 +507,7 @@ object Utils extends Logging { val builder = new StringBuilder builder.append("[ ") if (valueInQuotes) - builder.append(jsonData.map("\"" + _ + "\"")).mkString(", ") + builder.append(jsonData.map("\"" + _ + "\"").mkString(", ")) else builder.append(jsonData.mkString(", ")) builder.append(" ]") http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index bd93ff1..7971a09 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -772,14 +772,14 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) class ZKConfig(props: VerifiableProperties) { /** ZK host string */ - val zkConnect = props.getString("zk.connect", null) + val zkConnect = props.getString("zookeeper.connect", null) /** zookeeper session timeout */ - val zkSessionTimeoutMs = props.getInt("zk.session.timeout.ms", 6000) + val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000) /** the max time that the client waits to establish a connection to zookeeper */ - val zkConnectionTimeoutMs = props.getInt("zk.connection.timeout.ms",zkSessionTimeoutMs) + val zkConnectionTimeoutMs = props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs) /** how far a ZK follower can be behind a ZK leader */ - val zkSyncTimeMs = props.getInt("zk.sync.time.ms", 2000) + val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000) } http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/other/kafka/TestEndToEndLatency.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index 98c12b7..c4aed10 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -38,7 +38,7 @@ object TestEndToEndLatency { consumerProps.put("group.id", topic) consumerProps.put("auto.commit", "true") consumerProps.put("auto.offset.reset", "largest") - consumerProps.put("zk.connect", zkConnect) + consumerProps.put("zookeeper.connect", zkConnect) consumerProps.put("socket.timeout.ms", 1201000.toString) val config = new ConsumerConfig(consumerProps) @@ -47,7 +47,7 @@ object TestEndToEndLatency { val iter = stream.iterator val producerProps = new Properties() - producerProps.put("broker.list", brokerList) + producerProps.put("metadata.broker.list", brokerList) producerProps.put("producer.type", "sync") val producer = new Producer[Any, Any](new ProducerConfig(producerProps)) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index f7ee914..86d30ad 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -330,7 +330,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar compression: CompressionCodec = NoCompressionCodec): List[String] = { val header = "test-%d-%d".format(config.brokerId, partition) val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") props.put("compression.codec", compression.codec.toString) props.put("key.serializer.class", classOf[IntEncoder].getName.toString) @@ -350,7 +350,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar numParts: Int): List[String]= { var messages: List[String] = Nil val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") props.put("key.serializer.class", classOf[IntEncoder].getName.toString) props.put("serializer.class", classOf[StringEncoder].getName) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index b343d98..1a9cc01 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -207,7 +207,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { props.put("log.retention.hours", "10") props.put("log.cleanup.interval.mins", "5") props.put("log.segment.bytes", logSize.toString) - props.put("zk.connect", zkConnect.toString) + props.put("zookeeper.connect", zkConnect.toString) props } http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 922a200..7f7a8d7 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -61,7 +61,7 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("producer.type", "async") props.put("queue.buffering.max.messages", "10") props.put("batch.num.messages", "1") @@ -86,7 +86,7 @@ class AsyncProducerTest extends JUnit3Suite { def testProduceAfterClosed() { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("producer.type", "async") props.put("batch.num.messages", "1") @@ -165,7 +165,7 @@ class AsyncProducerTest extends JUnit3Suite { producerDataList.append(new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes))) val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val broker1 = new Broker(0, "localhost", 9092) val broker2 = new Broker(1, "localhost", 9093) broker1 @@ -215,7 +215,7 @@ class AsyncProducerTest extends JUnit3Suite { def testSerializeEvents() { val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m)) val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) // form expected partitions metadata val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092) @@ -241,7 +241,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]] producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes))) val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) // form expected partitions metadata @@ -270,7 +270,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testNoBroker() { val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -301,7 +301,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testIncompatibleEncoder() { val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) val producer=new Producer[String, String](config) @@ -318,7 +318,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testRandomPartitioner() { val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -392,7 +392,7 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testFailedSendRetryLogic() { val props = new Properties() - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("request.required.acks", "1") props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 04acef5..bc37531 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -93,7 +93,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) val props1 = new util.Properties() - props1.put("broker.list", "localhost:80,localhost:81") + props1.put("metadata.broker.list", "localhost:80,localhost:81") props1.put("serializer.class", "kafka.serializer.StringEncoder") val producerConfig1 = new ProducerConfig(props1) val producer1 = new Producer[String, String](producerConfig1) @@ -108,7 +108,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val props2 = new util.Properties() - props2.put("broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1))) + props2.put("metadata.broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1))) props2.put("serializer.class", "kafka.serializer.StringEncoder") val producerConfig2= new ProducerConfig(props2) val producer2 = new Producer[String, String](producerConfig2) @@ -121,7 +121,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val props3 = new util.Properties() - props3.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props3.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props3.put("serializer.class", "kafka.serializer.StringEncoder") val producerConfig3 = new ProducerConfig(props3) val producer3 = new Producer[String, String](producerConfig3) @@ -139,7 +139,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val props1 = new util.Properties() props1.put("serializer.class", "kafka.serializer.StringEncoder") props1.put("partitioner.class", "kafka.utils.StaticPartitioner") - props1.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props1.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props1.put("request.required.acks", "2") props1.put("request.timeout.ms", "1000") @@ -200,7 +200,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", "2000") props.put("request.required.acks", "1") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) // create topic CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0") @@ -257,7 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", String.valueOf(timeoutMs)) - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props.put("request.required.acks", "1") val config = new ProducerConfig(props) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 217ff7a..68c134e 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -128,7 +128,7 @@ object TestUtils extends Logging { props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put("log.flush.interval.messages", "1") - props.put("zk.connect", TestZKUtils.zookeeperConnect) + props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") props } @@ -139,12 +139,12 @@ object TestUtils extends Logging { def createConsumerProperties(zkConnect: String, groupId: String, consumerId: String, consumerTimeout: Long = -1): Properties = { val props = new Properties - props.put("zk.connect", zkConnect) + props.put("zookeeper.connect", zkConnect) props.put("group.id", groupId) props.put("consumer.id", consumerId) props.put("consumer.timeout.ms", consumerTimeout.toString) - props.put("zk.session.timeout.ms", "400") - props.put("zk.sync.time.ms", "200") + props.put("zookeeper.session.timeout.ms", "400") + props.put("zookeeper.sync.time.ms", "200") props.put("auto.commit.interval.ms", "1000") props.put("rebalance.max.retries", "4") @@ -292,7 +292,7 @@ object TestUtils extends Logging { encoder: Encoder[V] = new DefaultEncoder(), keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = { val props = new Properties() - props.put("broker.list", brokerList) + props.put("metadata.broker.list", brokerList) props.put("send.buffer.bytes", "65536") props.put("connect.timeout.ms", "100000") props.put("reconnect.interval", "10000") @@ -303,7 +303,7 @@ object TestUtils extends Logging { def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = { val props = new Properties() - props.put("broker.list", brokerList) + props.put("metadata.broker.list", brokerList) props.put("partitioner.class", partitioner) props.put("message.send.max.retries", "3") props.put("retry.backoff.ms", "1000") http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/examples/src/main/java/kafka/examples/Consumer.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 3460d36..63f099a 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -43,10 +43,10 @@ public class Consumer extends Thread private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); - props.put("zk.connect", KafkaProperties.zkConnect); + props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); - props.put("zk.session.timeout.ms", "400"); - props.put("zk.sync.time.ms", "200"); + props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/examples/src/main/java/kafka/examples/Producer.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index a770a18..96e9893 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -30,7 +30,7 @@ public class Producer extends Thread public Producer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("broker.list", "localhost:9092"); + props.put("metadata.broker.list", "localhost:9092"); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala ---------------------------------------------------------------------- diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index ee2ce95..3158a22 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -128,7 +128,7 @@ object ConsumerPerformance { props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") - props.put("zk.connect", options.valueOf(zkConnectOpt)) + props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", "5000") val consumerConfig = new ConsumerConfig(props) val numThreads = options.valueOf(numThreadsOpt).intValue http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/perf/src/main/scala/kafka/perf/ProducerPerformance.scala ---------------------------------------------------------------------- diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 851a99e..ad2ac26 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -174,7 +174,7 @@ object ProducerPerformance extends Logging { val allDone: CountDownLatch, val rand: Random) extends Runnable { val props = new Properties() - props.put("broker.list", config.brokerList) + props.put("metadata.broker.list", config.brokerList) props.put("compression.codec", config.compressionCodec.codec.toString) props.put("reconnect.interval", Integer.MAX_VALUE.toString) props.put("send.buffer.bytes", (64*1024).toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/config/migration_producer.properties ---------------------------------------------------------------------- diff --git a/system_test/migration_tool_testsuite/config/migration_producer.properties b/system_test/migration_tool_testsuite/config/migration_producer.properties index af080ae..1750807 100644 --- a/system_test/migration_tool_testsuite/config/migration_producer.properties +++ b/system_test/migration_tool_testsuite/config/migration_producer.properties @@ -20,7 +20,7 @@ # configure brokers statically # format: host1:port1,host2:port2 ... -broker.list=localhost:9094,localhost:9095,localhost:9096 +metadata.broker.list=localhost:9094,localhost:9095,localhost:9096 # discover brokers from ZK #zk.connect= http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/config/server.properties ---------------------------------------------------------------------- diff --git a/system_test/migration_tool_testsuite/config/server.properties b/system_test/migration_tool_testsuite/config/server.properties index d231d4c..6ecbb71 100644 --- a/system_test/migration_tool_testsuite/config/server.properties +++ b/system_test/migration_tool_testsuite/config/server.properties @@ -115,8 +115,10 @@ enable.zookeeper=true # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zk.connect=localhost:2181 +zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zk.connection.timeout.ms=1000000 +zookeeper.connection.timeout.ms=1000000 monitoring.period.secs=1 http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json ---------------------------------------------------------------------- diff --git a/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json b/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json index cf84caa..1904ab5 100644 --- a/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json +++ b/system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json @@ -29,7 +29,7 @@ "entity_id": "1", "port": "9091", "brokerid": "1", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_1_logs", "log_filename": "kafka_server_1.log", @@ -39,7 +39,7 @@ "entity_id": "2", "port": "9092", "brokerid": "2", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_2_logs", "log_filename": "kafka_server_2.log", @@ -49,7 +49,7 @@ "entity_id": "3", "port": "9093", "brokerid": "3", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_3_logs", "log_filename": "kafka_server_3.log", http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json ---------------------------------------------------------------------- diff --git a/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json b/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json index 0b413c4..8cacc69 100644 --- a/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json +++ b/system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json @@ -30,7 +30,7 @@ "entity_id": "1", "port": "9091", "brokerid": "1", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_1_logs", "log_filename": "kafka_server_1.log", @@ -40,7 +40,7 @@ "entity_id": "2", "port": "9092", "brokerid": "2", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_2_logs", "log_filename": "kafka_server_2.log", @@ -50,7 +50,7 @@ "entity_id": "3", "port": "9093", "brokerid": "3", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_3_logs", "log_filename": "kafka_server_3.log", http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json ---------------------------------------------------------------------- diff --git a/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json b/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json index 5c6baaf..4dbd80b 100644 --- a/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json +++ b/system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json @@ -30,7 +30,7 @@ "entity_id": "1", "port": "9091", "brokerid": "1", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_1_logs", "log_filename": "kafka_server_1.log", @@ -40,7 +40,7 @@ "entity_id": "2", "port": "9092", "brokerid": "2", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_2_logs", "log_filename": "kafka_server_2.log", @@ -50,7 +50,7 @@ "entity_id": "3", "port": "9093", "brokerid": "3", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_3_logs", "log_filename": "kafka_server_3.log", http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json ---------------------------------------------------------------------- diff --git a/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json b/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json index 8597e1a..e46b453 100644 --- a/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json +++ b/system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json @@ -39,7 +39,7 @@ "entity_id": "2", "port": "9091", "brokerid": "1", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_2_logs", "log_filename": "kafka_server_2.log", @@ -49,7 +49,7 @@ "entity_id": "3", "port": "9092", "brokerid": "2", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_3_logs", "log_filename": "kafka_server_3.log", @@ -59,7 +59,7 @@ "entity_id": "4", "port": "9093", "brokerid": "3", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_4_logs", "log_filename": "kafka_server_4.log", http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json ---------------------------------------------------------------------- diff --git a/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json b/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json index ec3290f..10f5955 100644 --- a/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json +++ b/system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json @@ -39,7 +39,7 @@ "entity_id": "2", "port": "9091", "brokerid": "1", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_2_logs", "log_filename": "kafka_server_2.log", @@ -49,7 +49,7 @@ "entity_id": "3", "port": "9092", "brokerid": "2", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_3_logs", "log_filename": "kafka_server_3.log", @@ -59,7 +59,7 @@ "entity_id": "4", "port": "9093", "brokerid": "3", - "07_client": "true", + "version": "0.7", "log.file.size": "51200", "log.dir": "/tmp/kafka_server_4_logs", "log_filename": "kafka_server_4.log", http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/mirror_maker_testsuite/config/mirror_consumer.properties ---------------------------------------------------------------------- diff --git a/system_test/mirror_maker_testsuite/config/mirror_consumer.properties b/system_test/mirror_maker_testsuite/config/mirror_consumer.properties index bb1a1cc..e90634a 100644 --- a/system_test/mirror_maker_testsuite/config/mirror_consumer.properties +++ b/system_test/mirror_maker_testsuite/config/mirror_consumer.properties @@ -1,5 +1,5 @@ -zk.connect=localhost:2108 -zk.connection.timeout.ms=1000000 +zookeeper.connect=localhost:2108 +zookeeper.connection.timeout.ms=1000000 group.id=mm_regtest_grp auto.commit.interval.ms=120000 auto.offset.reset=smallest @@ -8,5 +8,5 @@ auto.offset.reset=smallest #rebalance.backoff.ms=2000 socket.receive.buffer.bytes=1048576 fetch.message.max.bytes=1048576 -zk.sync.time.ms=15000 +zookeeper.sync.time.ms=15000 shallow.iterator.enable=false http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/mirror_maker_testsuite/config/mirror_producer.properties ---------------------------------------------------------------------- diff --git a/system_test/mirror_maker_testsuite/config/mirror_producer.properties b/system_test/mirror_maker_testsuite/config/mirror_producer.properties index bdb657f..b2bf2c2 100644 --- a/system_test/mirror_maker_testsuite/config/mirror_producer.properties +++ b/system_test/mirror_maker_testsuite/config/mirror_producer.properties @@ -1,6 +1,6 @@ producer.type=async queue.enqueue.timeout.ms=-1 -broker.list=localhost:9094 +metadata.broker.list=localhost:9094 compression.codec=0 message.send.max.retries=3 request.required.acks=1 http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/mirror_maker_testsuite/config/server.properties ---------------------------------------------------------------------- diff --git a/system_test/mirror_maker_testsuite/config/server.properties b/system_test/mirror_maker_testsuite/config/server.properties index dacf158..36dd68d 100644 --- a/system_test/mirror_maker_testsuite/config/server.properties +++ b/system_test/mirror_maker_testsuite/config/server.properties @@ -114,10 +114,10 @@ enable.zookeeper=true # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. -zk.connect=localhost:2181 +zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zk.connection.timeout.ms=1000000 +zookeeper.connection.timeout.ms=1000000 monitoring.period.secs=1 message.max.bytes=1000000 http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/replication_testsuite/config/server.properties ---------------------------------------------------------------------- diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties index dacf158..36dd68d 100644 --- a/system_test/replication_testsuite/config/server.properties +++ b/system_test/replication_testsuite/config/server.properties @@ -114,10 +114,10 @@ enable.zookeeper=true # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. -zk.connect=localhost:2181 +zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zk.connection.timeout.ms=1000000 +zookeeper.connection.timeout.ms=1000000 monitoring.period.secs=1 message.max.bytes=1000000 http://git-wip-us.apache.org/repos/asf/kafka/blob/2d740317/system_test/utils/kafka_system_test_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index dd082f5..ae393bc 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -313,8 +313,8 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv logger.info("testcase config (dest) pathname : " + cfgDestPathname, extra=d) # loop through all zookeepers (if more than 1) to retrieve host and clientPort - # to construct a zk.connect str for broker in the form of: - # zk.connect=<host1>:<port1>,<host2>:<port2>,... + # to construct a zookeeper.connect str for broker in the form of: + # zookeeper.connect=<host1>:<port1>,<host2>:<port2>,... testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] = "" testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] = "" testcaseEnv.userDefinedEnvVarDict["sourceZkEntityIdList"] = [] @@ -409,28 +409,35 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv # copy the associated .properties template, update values, write to testcase_<xxx>/config - if ( clusterCfg["role"] == "broker" ): - if clusterCfg["cluster_name"] == "source": - tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] - elif clusterCfg["cluster_name"] == "target": - tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] - else: - logger.error("Unknown cluster name: " + clusterName, extra=d) - sys.exit(1) - - zeroSevenClient = "false" + if (clusterCfg["role"] == "broker"): + brokerVersion = "0.8" try: - zeroSevenClient = tcCfg["07_client"] + brokerVersion = tcCfg["version"] except: pass + if (brokerVersion == "0.7"): + if clusterCfg["cluster_name"] == "source": + tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + else: + logger.error("Unknown cluster name for 0.7: " + clusterName, extra=d) + sys.exit(1) + else: + if clusterCfg["cluster_name"] == "source": + tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + elif clusterCfg["cluster_name"] == "target": + tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + else: + logger.error("Unknown cluster name: " + clusterName, extra=d) + sys.exit(1) + addedCSVConfig = {} addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true" - if zeroSevenClient == "true": + if brokerVersion == "0.7": addedCSVConfig["brokerid"] = tcCfg["brokerid"] copy_file_with_dict_values(cfgTemplatePathname + "/server.properties", @@ -450,12 +457,12 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv sys.exit(1) elif ( clusterCfg["role"] == "mirror_maker"): - tcCfg["broker.list"] = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] + tcCfg["metadata.broker.list"] = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] copy_file_with_dict_values(cfgTemplatePathname + "/mirror_producer.properties", cfgDestPathname + "/" + tcCfg["mirror_producer_config_filename"], tcCfg, None) - # update zk.connect with the zk entities specified in cluster_config.json - tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + # update zookeeper.connect with the zk entities specified in cluster_config.json + tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties", cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None) @@ -818,7 +825,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " - # get zk.connect + # get zookeeper connect string zkConnectStr = "" if clusterName == "source": zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]