Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a871e5c5d -> 603f22de0


[Gobblin 190][GOBBLIN-190] Kafka Sink replication factor and partition creation.

Closes #2126 from dallaybatta/GOBBLIN-190


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/603f22de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/603f22de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/603f22de

Branch: refs/heads/master
Commit: 603f22de008737d290a9ed18fb6a76544df119a5
Parents: a871e5c
Author: [email protected] <[email protected]>
Authored: Wed Nov 1 15:00:54 2017 -0700
Committer: Abhishek Tiwari <[email protected]>
Committed: Wed Nov 1 15:00:54 2017 -0700

----------------------------------------------------------------------
 gobblin-modules/gobblin-kafka-09/build.gradle   |   8 +-
 .../gobblin/kafka/writer/Kafka09DataWriter.java |  42 ++++-
 .../gobblin/kafka/KafkaClusterTestBase.java     | 128 +++++++++++++
 .../gobblin/kafka/writer/ByPassWatcher.java     |  30 +++
 .../kafka/writer/Kafka09TopicProvisionTest.java | 184 +++++++++++++++++++
 .../writer/KafkaWriterConfigurationKeys.java    |  11 ++
 6 files changed, 401 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle 
b/gobblin-modules/gobblin-kafka-09/build.gradle
index 44a5b80..54ba448 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -72,7 +72,13 @@ configurations {
 }
 
 test {
-  workingDir rootProject.rootDir
+  workingDir rootProject.rootDir  
+  systemProperty "live.newtopic", System.getProperty("live.newtopic")
+  systemProperty "live.newtopic.replicationCount", 
System.getProperty("live.newtopic.replicationCount")
+  systemProperty "live.newtopic.partitionCount", 
System.getProperty("live.newtopic.partitionCount")
+  systemProperty "live.cluster.count", System.getProperty("live.cluster.count")
+  systemProperty "live.zookeeper", System.getProperty("live.zookeeper")
+  systemProperty "live.broker", System.getProperty("live.broker")
 }
 
 ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 2cb00e1..89b637a 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -31,8 +31,15 @@ import com.google.common.base.Throwables;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import kafka.utils.ZkUtils;
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
 import lombok.extern.slf4j.Slf4j;
 
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.gobblin.writer.WriteCallback;
 import org.apache.gobblin.writer.WriteResponse;
@@ -49,6 +56,7 @@ import org.apache.gobblin.writer.WriteResponseMapper;
 @Slf4j
 public class Kafka09DataWriter<D> implements AsyncDataWriter<D> {
 
+  
   private static final WriteResponseMapper<RecordMetadata> 
WRITE_RESPONSE_WRAPPER =
       new WriteResponseMapper<RecordMetadata>() {
 
@@ -94,6 +102,7 @@ public class Kafka09DataWriter<D> implements 
AsyncDataWriter<D> {
 
   public Kafka09DataWriter(Producer producer, Config config) {
     this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
+    provisionTopic(topic,config);
     this.producer = producer;
   }
 
@@ -121,6 +130,37 @@ public class Kafka09DataWriter<D> implements 
AsyncDataWriter<D> {
   @Override
   public void flush()
       throws IOException {
-    this.producer.flush();
+         this.producer.flush();
   }
+  
+  private void provisionTopic(String topicName,Config config) {
+    String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
+    if(!config.hasPath(zooKeeperPropKey)) {
+     log.debug("Topic "+topicName+" is configured without the partition and 
replication");
+     return;
+    }
+    String zookeeperConnect = config.getString(zooKeeperPropKey);
+    int sessionTimeoutMs = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT, 
KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+    int connectionTimeoutMs = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT, 
KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you 
don't, then
+    // createTopic() will only seem to work (it will return without error).  
The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka 
itself does not create the
+    // topic.
+    ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, 
connectionTimeoutMs, ZKStringSerializer$.MODULE$);
+    // Security for Kafka was added in Kafka 0.9.0.0
+    ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(zookeeperConnect), false);
+    int partitions = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.PARTITION_COUNT, 
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+    int replication = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.REPLICATION_COUNT, 
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+    Properties topicConfig = new Properties(); 
+    if(AdminUtils.topicExists(zkUtils, topicName)) {
+          log.debug("Topic"+topicName+" already Exists with replication: 
"+replication+" and partitions :"+partitions);
+       return;
+    } 
+    try {
+       AdminUtils.createTopic(zkUtils, topicName, partitions, replication, 
topicConfig);
+    } catch (RuntimeException e) {
+       throw new RuntimeException(e);
+    }
+       log.info("Created Topic "+topicName+" with replication: "+replication+" 
and partitions :"+partitions);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
new file mode 100644
index 0000000..8cbe983
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaClusterTestBase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.zk.EmbeddedZookeeper;
+
+import org.apache.gobblin.test.TestUtils;
+
+public class KafkaClusterTestBase extends KafkaTestBase {
+
+       int clusterCount; 
+       EmbeddedZookeeper _zkServer;
+       String _zkConnectString;
+       ZkClient _zkClient;
+       List<KafkaServer> kafkaBrokerList = new ArrayList<KafkaServer>();
+       List<Integer> kafkaBrokerPortList = new ArrayList<Integer>();
+       
+       public KafkaClusterTestBase(int clusterCount) throws 
InterruptedException, RuntimeException {
+               super();
+               this.clusterCount = clusterCount;
+       }
+
+       public void startCluster() {
+               // Start Zookeeper.
+               _zkServer = new EmbeddedZookeeper();
+               _zkConnectString = "127.0.0.1:"+_zkServer.port();
+               _zkClient = new ZkClient(_zkConnectString, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+               // Start Kafka Cluster.
+           for(int i=0;i<clusterCount;i++) {
+               KafkaServer _kafkaServer = 
createKafkaServer(i,_zkConnectString);
+               kafkaBrokerList.add(_kafkaServer);
+           }
+       }
+
+       public void stopCluster() {
+               Iterator<KafkaServer> iter = kafkaBrokerList.iterator();
+               while(iter.hasNext()){
+                       KafkaServer server = iter.next();
+                       try {
+                               server.shutdown(); 
+                       } catch (RuntimeException e) {
+                               // Simply Ignore.
+                       }
+               }
+       }
+
+       public int getZookeeperPort() {
+               return _zkServer.port();
+       }
+
+       public List<KafkaServer> getBrokerList() {
+               return kafkaBrokerList;
+       }
+
+       public List<Integer> getKafkaBrokerPortList() {
+               return kafkaBrokerPortList;
+       }
+
+       
+       public int getClusterCount() {
+               return kafkaBrokerList.size();
+       }
+
+       private KafkaServer createKafkaServer(int brokerId,String 
_zkConnectString){
+               
+               int _brokerId = brokerId;
+               int _kafkaServerPort = TestUtils.findFreePort();        
+               Properties props = kafka.utils.TestUtils.createBrokerConfig(
+          _brokerId,
+          _zkConnectString,
+          kafka.utils.TestUtils.createBrokerConfig$default$3(),
+          kafka.utils.TestUtils.createBrokerConfig$default$4(),
+          _kafkaServerPort,
+          kafka.utils.TestUtils.createBrokerConfig$default$6(),
+          kafka.utils.TestUtils.createBrokerConfig$default$7(),
+          kafka.utils.TestUtils.createBrokerConfig$default$8(),
+          kafka.utils.TestUtils.createBrokerConfig$default$9(),
+          kafka.utils.TestUtils.createBrokerConfig$default$10(),
+          kafka.utils.TestUtils.createBrokerConfig$default$11(),
+          kafka.utils.TestUtils.createBrokerConfig$default$12(),
+          kafka.utils.TestUtils.createBrokerConfig$default$13(),
+          kafka.utils.TestUtils.createBrokerConfig$default$14()
+          );
+      KafkaConfig config = new KafkaConfig(props);
+      Time mock = new MockTime();
+      KafkaServer _kafkaServer = kafka.utils.TestUtils.createServer(config, 
mock);
+      kafkaBrokerPortList.add(_kafkaServerPort);
+      return _kafkaServer;
+       }
+
+       public String getBootServersList() {
+               String bootServerString = "";
+               Iterator<Integer> ports =  kafkaBrokerPortList.iterator();
+               while(ports.hasNext()){
+                       Integer port = ports.next();
+                       bootServerString = 
bootServerString+"localhost:"+port+",";
+               }
+               bootServerString = 
bootServerString.substring(0,bootServerString.length()-1);
+               return bootServerString;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
new file mode 100644
index 0000000..0c59030
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/ByPassWatcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class ByPassWatcher implements Watcher {
+
+       @Override
+       public void process(WatchedEvent event) {
+               // TODO Auto-generated method stub
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
new file mode 100644
index 0000000..d8b7ba0
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/writer/Kafka09TopicProvisionTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.json.JSONObject;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.kafka.KafkaClusterTestBase;
+import org.apache.commons.lang3.StringUtils;
+import kafka.admin.AdminUtils;
+import kafka.api.TopicMetadata;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+
+@Slf4j
+public class Kafka09TopicProvisionTest {
+
+
+  private final KafkaClusterTestBase _kafkaTestHelper;
+  private int testClusterCount = 5;
+  
+  public Kafka09TopicProvisionTest()
+      throws InterruptedException, RuntimeException {
+    _kafkaTestHelper = new KafkaClusterTestBase(testClusterCount);
+  }
+
+  @BeforeSuite
+  public void beforeSuite() {
+    log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+    _kafkaTestHelper.startCluster();
+  }
+
+  @AfterSuite
+  public void afterSuite()
+      throws IOException {
+         _kafkaTestHelper.stopCluster();
+  }
+
+  @Test
+  public void testCluster()
+       throws IOException, InterruptedException, KeeperException {
+         int clusterCount = _kafkaTestHelper.getClusterCount();
+         Assert.assertEquals(clusterCount,testClusterCount);
+         int zkPort = _kafkaTestHelper.getZookeeperPort();
+      String kafkaBrokerPortList = 
_kafkaTestHelper.getKafkaBrokerPortList().toString();
+      System.out.println("kafkaBrokerPortList : " + kafkaBrokerPortList);
+         ZooKeeper zk = new ZooKeeper("localhost:"+zkPort, 10000, new 
ByPassWatcher());
+         List<Integer> brokerPortList = new ArrayList<Integer>();
+      List<String> ids = zk.getChildren("/brokers/ids", false);
+      for (String id : ids) {
+          String brokerInfo = new String(zk.getData("/brokers/ids/" + id, 
false, null));
+          JSONObject obj = new JSONObject(brokerInfo);
+          int brokerPort = obj.getInt("port");
+          System.out.println(brokerPort);
+          brokerPortList.add(brokerPort);
+      }
+      
Assert.assertTrue(_kafkaTestHelper.getKafkaBrokerPortList().equals(brokerPortList));
+  }
+   
+  @Test
+  public void testTopicPartitionCreationCount()
+      throws IOException, InterruptedException {
+    String topic = "topicPartition4";
+    int clusterCount = _kafkaTestHelper.getClusterCount();
+    int partionCount = clusterCount/2;
+    int zkPort = _kafkaTestHelper.getZookeeperPort();
+    Properties props = new Properties();
+    
+    // Setting Topic Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, 
String.valueOf(clusterCount));
+    props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT,  
String.valueOf(partionCount));
+    props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, 
"localhost:"+zkPort);
+    System.out.println(_kafkaTestHelper.getBootServersList());
+    
+    // Setting Producer Properties
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
 _kafkaTestHelper.getBootServersList());    
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
 "org.apache.kafka.common.serialization.StringSerializer");
+    
+    Kafka09DataWriter<String> kafka09DataWriter = new 
Kafka09DataWriter<String>(props);
+    String zookeeperConnect = "localhost:"+_kafkaTestHelper.getZookeeperPort();
+    int sessionTimeoutMs = 10 * 1000;
+    int connectionTimeoutMs = 8 * 1000;
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you 
don't, then
+    // createTopic() will only seem to work (it will return without error).  
The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka 
itself does not create the
+    // topic.
+    ZkClient zkClient = new ZkClient(
+        zookeeperConnect,
+        sessionTimeoutMs,
+        connectionTimeoutMs,
+        ZKStringSerializer$.MODULE$);
+    boolean isSecureKafkaCluster = false;
+    ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(zookeeperConnect), isSecureKafkaCluster);
+    
+    TopicMetadata metaData =
+               AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
+    Assert.assertEquals(metaData.partitionsMetadata().size(), partionCount);
+
+  }
+  
+  @Test
+  public void testLiveTopicPartitionCreationCount()
+      throws IOException, InterruptedException {
+       String liveClusterCount = System.getProperty("live.cluster.count");
+       String liveZookeeper = System.getProperty("live.zookeeper");
+       String liveBroker = System.getProperty("live.broker");
+       String topic = System.getProperty("live.newtopic");
+       String topicReplicationCount = 
System.getProperty("live.newtopic.replicationCount");
+       String topicPartitionCount = 
System.getProperty("live.newtopic.partitionCount");
+       if(StringUtils.isEmpty(liveClusterCount)){
+               Assert.assertTrue(true);
+               return;
+       }
+       if(StringUtils.isEmpty(topicPartitionCount)){
+               int clusterCount = Integer.parseInt(liveClusterCount);
+               clusterCount--;
+               int partionCount = clusterCount/2;
+               topicReplicationCount = String.valueOf(clusterCount);
+               topicPartitionCount = String.valueOf(partionCount);
+       }
+       
+    Properties props = new Properties();
+    // Setting Topic Properties
+    props.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, topic);
+    props.setProperty(KafkaWriterConfigurationKeys.REPLICATION_COUNT, 
topicReplicationCount);
+    props.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, 
topicPartitionCount );
+    props.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, 
liveZookeeper);
+    // Setting Producer Properties
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"bootstrap.servers",
 liveBroker);    
+    
props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer",
 "org.apache.kafka.common.serialization.StringSerializer");
+    
+    Kafka09DataWriter<String> kafka09DataWriter = new 
Kafka09DataWriter<String>(props);
+    int sessionTimeoutMs = 10 * 1000;
+    int connectionTimeoutMs = 8 * 1000;
+    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you 
don't, then
+    // createTopic() will only seem to work (it will return without error).  
The topic will exist in
+    // only ZooKeeper and will be returned when listing topics, but Kafka 
itself does not create the
+    // topic.
+    ZkClient zkClient = new ZkClient(
+       liveZookeeper,
+        sessionTimeoutMs,
+        connectionTimeoutMs,
+        ZKStringSerializer$.MODULE$);
+    boolean isSecureKafkaCluster = false;
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(liveZookeeper), 
isSecureKafkaCluster);
+    
+    TopicMetadata metaData =
+               AdminUtils.fetchTopicMetadataFromZk(topic,zkUtils);
+    Assert.assertEquals(metaData.partitionsMetadata().size(), 
Integer.parseInt(topicPartitionCount));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/603f22de/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index f6776c0..279812e 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -50,4 +50,15 @@ public class KafkaWriterConfigurationKeys {
   static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME = 
"kafka.schemaRegistry.switchName";
   static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT = "true";
 
+  public static final String KAFKA_TOPIC_CONFIG = "writer.kafka.";
+  static final String TOPIC_NAME = "topic";
+  public static final String CLUSTER_ZOOKEEPER = KAFKA_TOPIC_CONFIG + 
"zookeeper";
+  static final String REPLICATION_COUNT = KAFKA_TOPIC_CONFIG + 
"replicationCount";
+  static final int REPLICATION_COUNT_DEFAULT = 1;
+  static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + "partitionCount";
+  static final int PARTITION_COUNT_DEFAULT = 1;
+  public static final String ZOOKEEPER_SESSION_TIMEOUT = CLUSTER_ZOOKEEPER + 
".sto";
+  static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 10000; // 10 seconds
+  public static final String ZOOKEEPER_CONNECTION_TIMEOUT = CLUSTER_ZOOKEEPER 
+ ".cto";
+  static final int ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT = 8000; // 8 seconds
 }

Reply via email to