Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 36711af54 -> 31b2a1ae4


SQOOP-2196: Sqoop2: Add Runner factory for Kafka

(Syed A. Hashmi via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/31b2a1ae
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/31b2a1ae
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/31b2a1ae

Branch: refs/heads/sqoop2
Commit: 31b2a1ae45c56041511da4e519bdc7861f323f07
Parents: 36711af
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Thu Apr 16 18:50:00 2015 -0700
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Thu Apr 16 18:50:00 2015 -0700

----------------------------------------------------------------------
 .../sqoop/common/test/kafka/KafkaLocal.java     |  57 ----------
 .../common/test/kafka/KafkaLocalRunner.java     | 107 +++++++++++++++++++
 .../common/test/kafka/KafkaRealRunner.java      |  74 +++++++++++++
 .../common/test/kafka/KafkaRunnerBase.java      |  31 ++++++
 .../common/test/kafka/KafkaRunnerFactory.java   |  41 +++++++
 .../sqoop/common/test/kafka/TestUtil.java       |  71 ++----------
 .../sqoop/connector/kafka/TestKafkaLoader.java  |   2 +-
 .../test/testcases/KafkaConnectorTestCase.java  |   2 +-
 8 files changed, 266 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/31b2a1ae/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java
deleted file mode 100644
index b90d14e..0000000
--- 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- limitations under the License.
- */
-
-package org.apache.sqoop.common.test.kafka;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import org.apache.commons.io.FileUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * A local Kafka server for running unit tests.
- * Reference: https://gist.github.com/fjavieralba/7930018/
- */
-public class KafkaLocal {
-
-  public KafkaServerStartable kafka;
-  public ZooKeeperLocal zookeeper;
-  private KafkaConfig kafkaConfig;
-
-  public KafkaLocal(Properties kafkaProperties) throws IOException,
-          InterruptedException{
-    kafkaConfig = new KafkaConfig(kafkaProperties);
-
-    //start local kafka broker
-    kafka = new KafkaServerStartable(kafkaConfig);
-  }
-
-  public void start() throws Exception{
-    kafka.startup();
-  }
-
-  public void stop() throws IOException {
-    kafka.shutdown();
-    File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile();
-    FileUtils.deleteDirectory(dir);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/31b2a1ae/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocalRunner.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocalRunner.java
 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocalRunner.java
new file mode 100644
index 0000000..f3268f1
--- /dev/null
+++ 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocalRunner.java
@@ -0,0 +1,107 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.sqoop.common.test.kafka;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.apache.commons.io.FileUtils;
+import org.apache.sqoop.common.test.utils.NetworkUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A local Kafka server for running unit tests.
+ * Reference: https://gist.github.com/fjavieralba/7930018/
+ */
+public class KafkaLocalRunner extends KafkaRunnerBase {
+
+  public KafkaServerStartable kafka;
+  public ZooKeeperLocal zookeeperServer;
+  private KafkaConfig kafkaConfig;
+  private int kafkaLocalPort = 9022;
+  private int zkLocalPort = 2188;
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaLocalRunner.class);
+
+  public KafkaLocalRunner() throws IOException,
+          InterruptedException{
+    kafkaLocalPort = NetworkUtils.findAvailablePort();
+    zkLocalPort = NetworkUtils.findAvailablePort();
+
+    logger.info("Starting kafka server with kafka port " + kafkaLocalPort +
+        " and zookeeper port " + zkLocalPort );
+    try {
+      //start local Zookeeper
+      zookeeperServer = new ZooKeeperLocal(zkLocalPort);
+      logger.info("ZooKeeper instance is successfully started on port " +
+          zkLocalPort);
+
+      Properties kafkaProperties = getKafkaProperties();
+      kafkaConfig = new KafkaConfig(kafkaProperties);
+
+      //start local kafka broker
+      kafka = new KafkaServerStartable(kafkaConfig);
+      logger.info("Kafka Server is successfully started on port " +
+          kafkaLocalPort);
+
+    } catch (Exception e) {
+      logger.error("Error starting the Kafka Server.", e);
+    }
+
+  }
+
+  Properties getKafkaProperties() {
+    Properties kafkaProps = new Properties();
+    kafkaProps.put("broker.id","0");
+    // Kafka expects strings for all properties and KafkaConfig will throw an 
exception otherwise
+    kafkaProps.put("port",Integer.toString(kafkaLocalPort));
+    kafkaProps.put("log.dirs","target/kafka-logs");
+    kafkaProps.put("num.partitions","1");
+    kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString());
+
+    return kafkaProps;
+  }
+
+  @Override
+  public void start() throws Exception {
+    kafka.startup();
+  }
+
+  @Override
+  public void stop() throws IOException {
+    kafka.shutdown();
+    zookeeperServer.stopZookeeper();
+    File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile();
+    FileUtils.deleteDirectory(dir);
+  }
+
+  @Override
+  public String getZkConnectionString() {
+    return zookeeperServer.getConnectString();
+  }
+
+  @Override
+  public String getKafkaUrl() {
+    return "localhost:"+kafkaLocalPort;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/31b2a1ae/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRealRunner.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRealRunner.java
 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRealRunner.java
new file mode 100644
index 0000000..cc9c4fb
--- /dev/null
+++ 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRealRunner.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.common.test.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This class encapsulates a real cluster Kafka service and enables tests
+ * to run kafka tasks against real cluster
+ */
+
+public class KafkaRealRunner extends KafkaRunnerBase {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaLocalRunner.class);
+  private String kafkaServerUrl;
+  private String zkConnectionString;
+  private final String KAFKA_SERVER_URL_PROPERTY = "sqoop.kafka.server.url";
+  private final String ZK_CONNECTION_STRING_PROPERTY = 
"sqoop.kafka.zookeeper.url";
+
+  public KafkaRealRunner() {
+    logger.info("Setting up kafka to point to real cluster");
+    kafkaServerUrl = System.getProperty(KAFKA_SERVER_URL_PROPERTY);
+    if(kafkaServerUrl == null) {
+      logger.error("To run against real cluster, sqoop.kafka.server.url must 
be provided");
+      throw new RuntimeException("To run against real cluster, 
sqoop.kafka.server.url must be provided");
+    }
+    logger.info("Kafka server url: " + kafkaServerUrl);
+
+    zkConnectionString = System.getProperty(
+        ZK_CONNECTION_STRING_PROPERTY);
+    if(zkConnectionString == null) {
+      logger.error("To run against real cluster, sqoop.kafka.zookeeper.url 
must be provided");
+      throw new RuntimeException("To run against real cluster, 
sqoop.kafka.zookeeper.url must be provided");
+    }
+    logger.info("Zookeeper server connection string: " + zkConnectionString);
+  }
+  @Override
+  public void start() throws Exception {
+    // nothing to be done for real server
+  }
+
+  @Override
+  public void stop() throws IOException {
+    // nothing to be done for real server
+  }
+
+  @Override
+  public String getZkConnectionString() {
+    return this.zkConnectionString;
+  }
+
+  @Override
+  public String getKafkaUrl() {
+    return this.kafkaServerUrl;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/31b2a1ae/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerBase.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerBase.java
 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerBase.java
new file mode 100644
index 0000000..025faf2
--- /dev/null
+++ 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerBase.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.common.test.kafka;
+
+import java.io.IOException;
+
+/**
+ * This class provides basic methods which will be overriden by derived classes
+ * to allow using either local kafka service or a real cluster.
+ */
+public abstract class KafkaRunnerBase {
+  public abstract void start() throws Exception;
+  public abstract void stop() throws IOException;
+  public abstract String getZkConnectionString();
+  public abstract String getKafkaUrl();
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/31b2a1ae/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerFactory.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerFactory.java
 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerFactory.java
new file mode 100644
index 0000000..b26ca74
--- /dev/null
+++ 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaRunnerFactory.java
@@ -0,0 +1,41 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.sqoop.common.test.kafka;
+
+import java.io.IOException;
+
+/**
+ * This class encapsulates logic behind which Kafka class to instantiate
+ */
+public class KafkaRunnerFactory {
+
+  private static final String KAFKA_CLASS_PROPERTY = 
"sqoop.kafka.runner.class";
+
+  public static KafkaRunnerBase getKafkaRunner() throws 
ClassNotFoundException, IllegalAccessException,
+                                                        
InstantiationException, InterruptedException, IOException {
+    String className = System.getProperty(KAFKA_CLASS_PROPERTY);
+    if(className == null) {
+      return new KafkaLocalRunner();
+    } else {
+      Class<?> klass = Class.forName(className);
+      return (KafkaRunnerBase) klass.newInstance();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/31b2a1ae/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
index 34b8f1e..09ddcc7 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
@@ -19,16 +19,11 @@
 package org.apache.sqoop.common.test.kafka;
 
 import kafka.message.MessageAndMetadata;
-import org.apache.sqoop.common.test.utils.NetworkUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.List;
-import java.util.Properties;
-import java.util.Random;
 
 /**
  * A utility class for starting/stopping Kafka Server.
@@ -38,72 +33,30 @@ public class TestUtil {
   private static final Logger logger = LoggerFactory.getLogger(TestUtil.class);
   private static TestUtil instance = new TestUtil();
 
-  private Random randPortGen = new Random(System.currentTimeMillis());
-  private KafkaLocal kafkaServer;
-  private ZooKeeperLocal zookeeperServer;
+  private KafkaRunnerBase kafkaServer;
   private KafkaConsumer kafkaConsumer;
-  private String hostname = "localhost";
-  private int kafkaLocalPort = 9022;
-  private int zkLocalPort = 2188;
 
-  private TestUtil() {
-    init();
-  }
+  private TestUtil() {}
 
   public static TestUtil getInstance() {
     return instance;
   }
 
-  private void init() {
-    // get the localhost.
-    try {
-      hostname = InetAddress.getLocalHost().getHostName();
-
-    } catch (UnknownHostException e) {
-      logger.warn("Error getting the value of localhost. " +
-              "Proceeding with 'localhost'.", e);
-    }
-  }
-
-  private boolean startKafkaServer() throws IOException {
-    kafkaLocalPort = NetworkUtils.findAvailablePort();
-    zkLocalPort = NetworkUtils.findAvailablePort();
+  private boolean startKafkaServer() throws IOException, InterruptedException, 
ClassNotFoundException,
+                                            IllegalAccessException, 
InstantiationException {
+    kafkaServer = KafkaRunnerFactory.getKafkaRunner();
 
-    logger.info("Starting kafka server with kafka port " + kafkaLocalPort +
-            " and zookeeper port " + zkLocalPort );
     try {
-      //start local Zookeeper
-      zookeeperServer = new ZooKeeperLocal(zkLocalPort);
-      logger.info("ZooKeeper instance is successfully started on port " +
-              zkLocalPort);
-
-      Properties kafkaProperties = getKafkaProperties();
-
-      kafkaServer = new KafkaLocal(kafkaProperties);
       kafkaServer.start();
-
-      logger.info("Kafka Server is successfully started on port " +
-              kafkaLocalPort);
-      return true;
-
     } catch (Exception e) {
       logger.error("Error starting the Kafka Server.", e);
       return false;
     }
-  }
-
-  Properties getKafkaProperties() {
-    Properties kafkaProps = new Properties();
-    kafkaProps.put("broker.id","0");
-    // Kafka expects strings for all properties and KafkaConfig will throw an 
exception otherwise
-    kafkaProps.put("port",Integer.toString(kafkaLocalPort));
-    kafkaProps.put("log.dirs","target/kafka-logs");
-    kafkaProps.put("num.partitions","1");
-    kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString());
 
-    return kafkaProps;
+    return true;
   }
 
+
   private KafkaConsumer getKafkaConsumer() {
     synchronized (this) {
       if (kafkaConsumer == null) {
@@ -121,7 +74,7 @@ public class TestUtil {
     return getKafkaConsumer().getNextMessage(topic);
   }
 
-  public void prepare() throws IOException {
+  public void prepare() throws Exception {
     boolean startStatus = startKafkaServer();
     if (!startStatus) {
       throw new RuntimeException("Error starting the server!");
@@ -147,16 +100,14 @@ public class TestUtil {
     }
     logger.info("Shutting down the kafka Server.");
     kafkaServer.stop();
-    logger.info("Shutting down Zookeeper Server.");
-    zookeeperServer.stopZookeeper();
     logger.info("Completed the tearDown phase.");
   }
 
   public String getZkUrl() {
-    return zookeeperServer.getConnectString();
+    return kafkaServer.getZkConnectionString();
   }
 
   public String getKafkaServerUrl() {
-    return "localhost:"+kafkaLocalPort;
+    return kafkaServer.getKafkaUrl();
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/31b2a1ae/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
 
b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
index 46c1057..da2a708 100644
--- 
a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
+++ 
b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
@@ -41,7 +41,7 @@ public class TestKafkaLoader {
   private static String TOPIC = "mytopic";
 
   @BeforeClass(alwaysRun = true)
-  public static void setup() throws IOException {
+  public static void setup() throws Exception {
     testUtil.prepare();
     List<String> topics = new ArrayList<String>(1);
     topics.add(TOPIC);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/31b2a1ae/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
 
b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
index 802677c..9aa69ed 100644
--- 
a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
+++ 
b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
@@ -42,7 +42,7 @@ public class KafkaConnectorTestCase extends ConnectorTestCase 
{
   private static final String TOPIC = "mytopic";
 
   @BeforeClass(alwaysRun = true)
-  public static void startKafka() throws IOException {
+  public static void startKafka() throws Exception {
     // starts Kafka server and its dependent zookeeper
     testUtil.prepare();
   }

Reply via email to