Repository: incubator-gobblin Updated Branches: refs/heads/master a871e5c5d -> 603f22de0
[Gobblin 190][GOBBLIN-190] Kafka Sink replication factor and partition creation. Closes #2126 from dallaybatta/GOBBLIN-190 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/603f22de Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/603f22de Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/603f22de Branch: refs/heads/master Commit: 603f22de008737d290a9ed18fb6a76544df119a5 Parents: a871e5c Author: [email protected] <[email protected]> Authored: Wed Nov 1 15:00:54 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Wed Nov 1 15:00:54 2017 -0700 ---------------------------------------------------------------------- gobblin-modules/gobblin-kafka-09/build.gradle | 8 +- .../gobblin/kafka/writer/Kafka09DataWriter.java | 42 ++++- .../gobblin/kafka/KafkaClusterTestBase.java | 128 +++++++++++++ .../gobblin/kafka/writer/ByPassWatcher.java | 30 +++ .../kafka/writer/Kafka09TopicProvisionTest.java | 184 +++++++++++++++++++ .../writer/KafkaWriterConfigurationKeys.java | 11 ++ 6 files changed, 401 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle b/gobblin-modules/gobblin-kafka-09/build.gradle index 44a5b80..54ba448 100644 --- a/gobblin-modules/gobblin-kafka-09/build.gradle +++ b/gobblin-modules/gobblin-kafka-09/build.gradle @@ -72,7 +72,13 @@ configurations { } test { - workingDir rootProject.rootDir + workingDir rootProject.rootDir + systemProperty "live.newtopic", System.getProperty("live.newtopic") + systemProperty "live.newtopic.replicationCount", System.getProperty("live.newtopic.replicationCount") + systemProperty "live.newtopic.partitionCount", System.getProperty("live.newtopic.partitionCount") + systemProperty "live.cluster.count", System.getProperty("live.cluster.count") + systemProperty "live.zookeeper", System.getProperty("live.zookeeper") + systemProperty "live.broker", System.getProperty("live.broker") } ext.classification="library" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java index 2cb00e1..89b637a 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java @@ -31,8 +31,15 @@ import com.google.common.base.Throwables; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import kafka.utils.ZkUtils; +import kafka.admin.AdminUtils; +import kafka.utils.ZKStringSerializer$; import lombok.extern.slf4j.Slf4j; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; + +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.writer.AsyncDataWriter; import org.apache.gobblin.writer.WriteCallback; import org.apache.gobblin.writer.WriteResponse; @@ -49,6 +56,7 @@ import org.apache.gobblin.writer.WriteResponseMapper; @Slf4j public class Kafka09DataWriter<D> implements AsyncDataWriter<D> { + private static final WriteResponseMapper<RecordMetadata> WRITE_RESPONSE_WRAPPER = new WriteResponseMapper<RecordMetadata>() { @@ -94,6 +102,7 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> { public Kafka09DataWriter(Producer producer, Config config) { this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC); + provisionTopic(topic,config); this.producer = producer; } @@ -121,6 +130,37 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> { @Override public void flush() throws IOException { - this.producer.flush(); + this.producer.flush(); } + + private void provisionTopic(String topicName,Config config) { + String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER; + if(!config.hasPath(zooKeeperPropKey)) { + log.debug("Topic "+topicName+" is configured without the partition and replication"); + return; + } + String zookeeperConnect = config.getString(zooKeeperPropKey); + int sessionTimeoutMs = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT, KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT); + int connectionTimeoutMs = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT, KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT); + // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then + // createTopic() will only seem to work (it will return without error). The topic will exist in + // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the + // topic. + ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); + // Security for Kafka was added in Kafka 0.9.0.0 + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), false); + int partitions = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.PARTITION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT); + int replication = ConfigUtils.getInt(config, KafkaWriterConfigurationKeys.REPLICATION_COUNT, KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT); + Properties topicConfig = new Properties(); + if(AdminUtils.topicExists(zkUtils, topicName)) { + log.debug("Topic"+topicName+" already Exists with replication: "+replication+" and partitions :"+partitions); + return; + } + try { + AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig); + } catch (RuntimeException e) { + throw new RuntimeException(e); + } + log.info("Created Topic "+topicName+" with replication: "+replication+" and partitions :"+partitions); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java new file mode 100644 index 0000000..8cbe983 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.zk.EmbeddedZookeeper; + +import org.apache.gobblin.test.TestUtils; + +public class KafkaClusterTestBase extends KafkaTestBase { + + int clusterCount; + EmbeddedZookeeper _zkServer; + String _zkConnectString; + ZkClient _zkClient; + List<KafkaServer> kafkaBrokerList = new ArrayList<KafkaServer>(); + List<Integer> kafkaBrokerPortList = new ArrayList<Integer>(); + + public KafkaClusterTestBase(int clusterCount) throws InterruptedException, RuntimeException { + super(); + this.clusterCount = clusterCount; + } + + public void startCluster() { + // Start Zookeeper. + _zkServer = new EmbeddedZookeeper(); + _zkConnectString = "127.0.0.1:"+_zkServer.port(); + _zkClient = new ZkClient(_zkConnectString, 30000, 30000, ZKStringSerializer$.MODULE$); + // Start Kafka Cluster. + for(int i=0;i<clusterCount;i++) { + KafkaServer _kafkaServer = createKafkaServer(i,_zkConnectString); + kafkaBrokerList.add(_kafkaServer); + } + } + + public void stopCluster() { + Iterator<KafkaServer> iter = kafkaBrokerList.iterator(); + while(iter.hasNext()){ + KafkaServer server = iter.next(); + try { + server.shutdown(); + } catch (RuntimeException e) { + // Simply Ignore. + } + } + } + + public int getZookeeperPort() { + return _zkServer.port(); + } + + public List<KafkaServer> getBrokerList() { + return kafkaBrokerList; + } + + public List<Integer> getKafkaBrokerPortList() { + return kafkaBrokerPortList; + } + + + public int getClusterCount() { + return kafkaBrokerList.size(); + } + + private KafkaServer createKafkaServer(int brokerId,String _zkConnectString){ + + int _brokerId = brokerId; + int _kafkaServerPort = TestUtils.findFreePort(); + Properties props = kafka.utils.TestUtils.createBrokerConfig( + _brokerId, + _zkConnectString, + kafka.utils.TestUtils.createBrokerConfig$default$3(), + kafka.utils.TestUtils.createBrokerConfig$default$4(), + _kafkaServerPort, + kafka.utils.TestUtils.createBrokerConfig$default$6(), + kafka.utils.TestUtils.createBrokerConfig$default$7(), + kafka.utils.TestUtils.createBrokerConfig$default$8(), + kafka.utils.TestUtils.createBrokerConfig$default$9(), + kafka.utils.TestUtils.createBrokerConfig$default$10(), + kafka.utils.TestUtils.createBrokerConfig$default$11(), + kafka.utils.TestUtils.createBrokerConfig$default$12(), + kafka.utils.TestUtils.createBrokerConfig$default$13(), + kafka.utils.TestUtils.createBrokerConfig$default$14() + ); + KafkaConfig config = new KafkaConfig(props); + Time mock = new MockTime(); + KafkaServer _kafkaServer = kafka.utils.TestUtils.createServer(config, mock); + kafkaBrokerPortList.add(_kafkaServerPort); + return _kafkaServer; + } + + public String getBootServersList() { + String bootServerString = ""; + Iterator<Integer> ports = kafkaBrokerPortList.iterator(); + while(ports.hasNext()){ + Integer port = ports.next(); + bootServerString = bootServerString+"localhost:"+port+","; + } + bootServerString = bootServerString.substring(0,bootServerString.length()-1); + return bootServerString; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java new file mode 100644 index 0000000..0c59030 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka.writer; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +public class ByPassWatcher implements Watcher { + + @Override + public void process(WatchedEvent event) { + // TODO Auto-generated method stub + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java new file mode 100644 index 0000000..d8b7ba0 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka.writer; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.json.JSONObject; +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.kafka.KafkaClusterTestBase; +import org.apache.commons.lang3.StringUtils; +import kafka.admin.AdminUtils; +import kafka.api.TopicMetadata; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + + +@Slf4j +public class Kafka09TopicProvisionTest { + + + private final KafkaClusterTestBase _kafkaTestHelper; + private int testClusterCount = 5; + + public Kafka09TopicProvisionTest() + throws InterruptedException, RuntimeException { + _kafkaTestHelper = new KafkaClusterTestBase(testClusterCount); + } + + @BeforeSuite + public void beforeSuite() { + log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName()); + _kafkaTestHelper.startCluster(); + } + + @AfterSuite + public void afterSuite() + throws IOException { + _kafkaTestHelper.stopCluster(); + } + + @Test + public void testCluster() + throws IOException, InterruptedException, KeeperException { + int clusterCount = _kafkaTestHelper.getClusterCount(); + Assert.assertEquals(clusterCount,testClusterCount); + int zkPort = _kafkaTestHelper.getZookeeperPort(); + String kafkaBrokerPortList = _kafkaTestHelper.getKafkaBrokerPortList().toString(); + System.out.println("kafkaBrokerPortList : " + kafkaBrokerPortList); + ZooKeeper zk = new ZooKeeper("localhost:"+zkPort, 10000, new ByPassWatcher()); + List<Integer> brokerPortList = new ArrayList<Integer>(); + List<String> ids = zk.getChildren("/brokers/ids", false); + for (String id : ids) { + String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null)); + JSONObject obj = new JSONObject(brokerInfo); + int brokerPort = obj.getInt("port"); + System.out.println(brokerPort); + brokerPortList.add(brokerPort); + } + Assert.assertTrue(_kafkaTestHelper.getKafkaBrokerPortList().equals(brokerPortList)); + } + + @Test + public void testTopicPartitionCreationCount() + throws IOException, InterruptedException { + String topic = "topicPartition4"; + int clusterCount = _kafkaTestHelper.getClusterCount(); + int partionCount = clusterCount/2; + int zkPort = _kafkaTestHelper.getZookeeperPort(); + Properties props = new Properties(); + + // Setting Topic Properties + props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic); + props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, String.valueOf(clusterCount)); + props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, String.valueOf(partionCount)); + props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, "localhost:"+zkPort); + System.out.println(_kafkaTestHelper.getBootServersList()); + + // Setting Producer Properties + props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", _kafkaTestHelper.getBootServersList()); + props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props); + String zookeeperConnect = "localhost:"+_kafkaTestHelper.getZookeeperPort(); + int sessionTimeoutMs = 10 * 1000; + int connectionTimeoutMs = 8 * 1000; + // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then + // createTopic() will only seem to work (it will return without error). The topic will exist in + // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the + // topic. + ZkClient zkClient = new ZkClient( + zookeeperConnect, + sessionTimeoutMs, + connectionTimeoutMs, + ZKStringSerializer$.MODULE$); + boolean isSecureKafkaCluster = false; + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster); + + TopicMetadata metaData = + AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils); + Assert.assertEquals(metaData.partitionsMetadata().size(), partionCount); + + } + + @Test + public void testLiveTopicPartitionCreationCount() + throws IOException, InterruptedException { + String liveClusterCount = System.getProperty("live.cluster.count"); + String liveZookeeper = System.getProperty("live.zookeeper"); + String liveBroker = System.getProperty("live.broker"); + String topic = System.getProperty("live.newtopic"); + String topicReplicationCount = System.getProperty("live.newtopic.replicationCount"); + String topicPartitionCount = System.getProperty("live.newtopic.partitionCount"); + if(StringUtils.isEmpty(liveClusterCount)){ + Assert.assertTrue(true); + return; + } + if(StringUtils.isEmpty(topicPartitionCount)){ + int clusterCount = Integer.parseInt(liveClusterCount); + clusterCount--; + int partionCount = clusterCount/2; + topicReplicationCount = String.valueOf(clusterCount); + topicPartitionCount = String.valueOf(partionCount); + } + + Properties props = new Properties(); + // Setting Topic Properties + props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic); + props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, topicReplicationCount); + props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, topicPartitionCount ); + props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, liveZookeeper); + // Setting Producer Properties + props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers", liveBroker); + props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + Kafka09DataWriter<String> kafka09DataWriter = new Kafka09DataWriter<String>(props); + int sessionTimeoutMs = 10 * 1000; + int connectionTimeoutMs = 8 * 1000; + // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then + // createTopic() will only seem to work (it will return without error). The topic will exist in + // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the + // topic. + ZkClient zkClient = new ZkClient( + liveZookeeper, + sessionTimeoutMs, + connectionTimeoutMs, + ZKStringSerializer$.MODULE$); + boolean isSecureKafkaCluster = false; + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), isSecureKafkaCluster); + + TopicMetadata metaData = + AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils); + Assert.assertEquals(metaData.partitionsMetadata().size(), Integer.parseInt(topicPartitionCount)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java index f6776c0..279812e 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java @@ -50,4 +50,15 @@ public class KafkaWriterConfigurationKeys { static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME = "kafka.schemaRegistry.switchName"; static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT = "true"; + public static final String KAFKA_TOPIC_CONFIG = "writer.kafka."; + static final String TOPIC_NAME = "topic"; + public static final String CLUSTER_ZOOKEEPER = KAFKA_TOPIC_CONFIG + "zookeeper"; + static final String REPLICATION_COUNT = KAFKA_TOPIC_CONFIG + "replicationCount"; + static final int REPLICATION_COUNT_DEFAULT = 1; + static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + "partitionCount"; + static final int PARTITION_COUNT_DEFAULT = 1; + public static final String ZOOKEEPER_SESSION_TIMEOUT = CLUSTER_ZOOKEEPER + ".sto"; + static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 10000; // 10 seconds + public static final String ZOOKEEPER_CONNECTION_TIMEOUT = CLUSTER_ZOOKEEPER + ".cto"; + static final int ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT = 8000; // 8 seconds }
