Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 9151d305a -> e2e5aa8bf


SQOOP-1852: Sqoop2: Kafka connector supporting TO direction

(Gwen Shapira via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e2e5aa8b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e2e5aa8b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e2e5aa8b

Branch: refs/heads/sqoop2
Commit: e2e5aa8bf531115f8ce4055184a3b8892ce6beb4
Parents: 9151d30
Author: Abraham Elmahrek <[email protected]>
Authored: Thu Dec 11 17:53:48 2014 -0600
Committer: Abraham Elmahrek <[email protected]>
Committed: Thu Dec 11 17:53:48 2014 -0600

----------------------------------------------------------------------
 common-test/pom.xml                             |  15 ++
 .../sqoop/common/test/kafka/KafkaConsumer.java  |  98 +++++++++++
 .../sqoop/common/test/kafka/KafkaLocal.java     |  57 +++++++
 .../sqoop/common/test/kafka/TestUtil.java       | 162 +++++++++++++++++++
 .../sqoop/common/test/kafka/ZooKeeperLocal.java |  72 +++++++++
 connector/connector-kafka/pom.xml               |  35 ++++
 .../sqoop/connector/kafka/KafkaConnector.java   | 115 +++++++++++++
 .../connector/kafka/KafkaConnectorErrors.java   |  46 ++++++
 .../sqoop/connector/kafka/KafkaConstants.java   |  42 +++++
 .../sqoop/connector/kafka/KafkaLoader.java      | 106 ++++++++++++
 .../sqoop/connector/kafka/KafkaToDestroyer.java |  35 ++++
 .../connector/kafka/KafkaToInitializer.java     |  51 ++++++
 .../kafka/configuration/LinkConfig.java         |  49 ++++++
 .../kafka/configuration/LinkConfiguration.java  |  31 ++++
 .../kafka/configuration/ToJobConfig.java        |  28 ++++
 .../kafka/configuration/ToJobConfiguration.java |  31 ++++
 .../resources/kafka-connector-config.properties |  38 +++++
 .../main/resources/sqoopconnector.properties    |  18 +++
 .../connector/kafka/TestConfigValidator.java    |  59 +++++++
 .../sqoop/connector/kafka/TestKafkaLoader.java  |  96 +++++++++++
 connector/pom.xml                               |   9 +-
 pom.xml                                         |  23 +++
 server/pom.xml                                  |   5 +
 .../apache/sqoop/handler/JobRequestHandler.java |   1 +
 test/pom.xml                                    |   5 +
 .../test/testcases/KafkaConnectorTestCase.java  |  78 +++++++++
 .../connector/kafka/FromRDBMSToKafkaTest.java   |  75 +++++++++
 27 files changed, 1376 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/pom.xml
----------------------------------------------------------------------
diff --git a/common-test/pom.xml b/common-test/pom.xml
index 9fd671c..609a875 100644
--- a/common-test/pom.xml
+++ b/common-test/pom.xml
@@ -60,6 +60,21 @@ limitations under the License.
       <artifactId>postgresql</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java
 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..78d651b
--- /dev/null
+++ 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java
@@ -0,0 +1,98 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.sqoop.common.test.kafka;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Kafka Consumer implementation. This uses the current thread to fetch the
+ * next message from the queue and doesn't use a multi threaded implementation.
+ * So this implements a synchronous blocking call.
+ * To avoid infinite waiting, a timeout is implemented to wait only for
+ * 1 second before concluding that the message will not be available.
+ */
+public class KafkaConsumer {
+
+  private static final Logger logger = LoggerFactory.getLogger(
+          KafkaConsumer.class);
+
+  private final ConsumerConnector consumer;
+  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
+
+  public KafkaConsumer() {
+    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
+            createConsumerConfig(TestUtil.getInstance().getZkUrl(), 
"group_1"));
+  }
+
+  private static ConsumerConfig createConsumerConfig(String zkUrl,
+                                                     String groupId) {
+    Properties props = new Properties();
+    props.put("zookeeper.connect", zkUrl);
+    props.put("group.id", groupId);
+    props.put("zookeeper.session.timeout.ms", "1000");
+    props.put("zookeeper.sync.time.ms", "200");
+    props.put("auto.commit.interval.ms", "1000");
+    props.put("auto.offset.reset", "smallest");
+    props.put("consumer.timeout.ms","1000");
+    return new ConsumerConfig(props);
+  }
+
+  public void initTopicList(List<String> topics) {
+    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+    for (String topic : topics) {
+      // we need only single threaded consumers
+      topicCountMap.put(topic, new Integer(1));
+    }
+    consumerMap = consumer.createMessageStreams(topicCountMap);
+  }
+
+  public MessageAndMetadata getNextMessage(String topic) {
+    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+    // it has only a single stream, because there is only one consumer
+    KafkaStream stream = streams.get(0);
+    final ConsumerIterator<byte[], byte[]> it = stream.iterator();
+    int counter = 0;
+    try {
+      if (it.hasNext()) {
+        return it.next();
+      } else {
+        return null;
+      }
+    } catch (ConsumerTimeoutException e) {
+      logger.error("0 messages available to fetch for the topic " + topic);
+      return null;
+    }
+  }
+
+  public void shutdown() {
+    consumer.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java
new file mode 100644
index 0000000..b90d14e
--- /dev/null
+++ 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java
@@ -0,0 +1,57 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.sqoop.common.test.kafka;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A local Kafka server for running unit tests.
+ * Reference: https://gist.github.com/fjavieralba/7930018/
+ */
+public class KafkaLocal {
+
+  public KafkaServerStartable kafka;
+  public ZooKeeperLocal zookeeper;
+  private KafkaConfig kafkaConfig;
+
+  public KafkaLocal(Properties kafkaProperties) throws IOException,
+          InterruptedException{
+    kafkaConfig = new KafkaConfig(kafkaProperties);
+
+    //start local kafka broker
+    kafka = new KafkaServerStartable(kafkaConfig);
+  }
+
+  public void start() throws Exception{
+    kafka.startup();
+  }
+
+  public void stop() throws IOException {
+    kafka.shutdown();
+    File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile();
+    FileUtils.deleteDirectory(dir);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
new file mode 100644
index 0000000..34b8f1e
--- /dev/null
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java
@@ -0,0 +1,162 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.sqoop.common.test.kafka;
+
+import kafka.message.MessageAndMetadata;
+import org.apache.sqoop.common.test.utils.NetworkUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * A utility class for starting/stopping Kafka Server.
+ */
+public class TestUtil {
+
+  private static final Logger logger = LoggerFactory.getLogger(TestUtil.class);
+  private static TestUtil instance = new TestUtil();
+
+  private Random randPortGen = new Random(System.currentTimeMillis());
+  private KafkaLocal kafkaServer;
+  private ZooKeeperLocal zookeeperServer;
+  private KafkaConsumer kafkaConsumer;
+  private String hostname = "localhost";
+  private int kafkaLocalPort = 9022;
+  private int zkLocalPort = 2188;
+
+  private TestUtil() {
+    init();
+  }
+
+  public static TestUtil getInstance() {
+    return instance;
+  }
+
+  private void init() {
+    // get the localhost.
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+
+    } catch (UnknownHostException e) {
+      logger.warn("Error getting the value of localhost. " +
+              "Proceeding with 'localhost'.", e);
+    }
+  }
+
+  private boolean startKafkaServer() throws IOException {
+    kafkaLocalPort = NetworkUtils.findAvailablePort();
+    zkLocalPort = NetworkUtils.findAvailablePort();
+
+    logger.info("Starting kafka server with kafka port " + kafkaLocalPort +
+            " and zookeeper port " + zkLocalPort );
+    try {
+      //start local Zookeeper
+      zookeeperServer = new ZooKeeperLocal(zkLocalPort);
+      logger.info("ZooKeeper instance is successfully started on port " +
+              zkLocalPort);
+
+      Properties kafkaProperties = getKafkaProperties();
+
+      kafkaServer = new KafkaLocal(kafkaProperties);
+      kafkaServer.start();
+
+      logger.info("Kafka Server is successfully started on port " +
+              kafkaLocalPort);
+      return true;
+
+    } catch (Exception e) {
+      logger.error("Error starting the Kafka Server.", e);
+      return false;
+    }
+  }
+
+  Properties getKafkaProperties() {
+    Properties kafkaProps = new Properties();
+    kafkaProps.put("broker.id","0");
+    // Kafka expects strings for all properties and KafkaConfig will throw an 
exception otherwise
+    kafkaProps.put("port",Integer.toString(kafkaLocalPort));
+    kafkaProps.put("log.dirs","target/kafka-logs");
+    kafkaProps.put("num.partitions","1");
+    kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString());
+
+    return kafkaProps;
+  }
+
+  private KafkaConsumer getKafkaConsumer() {
+    synchronized (this) {
+      if (kafkaConsumer == null) {
+        kafkaConsumer = new KafkaConsumer();
+      }
+    }
+    return kafkaConsumer;
+  }
+
+  public void initTopicList(List<String> topics) {
+    getKafkaConsumer().initTopicList(topics);
+  }
+
+  public MessageAndMetadata getNextMessageFromConsumer(String topic) {
+    return getKafkaConsumer().getNextMessage(topic);
+  }
+
+  public void prepare() throws IOException {
+    boolean startStatus = startKafkaServer();
+    if (!startStatus) {
+      throw new RuntimeException("Error starting the server!");
+    }
+    try {
+      Thread.sleep(3 * 1000);   // add this sleep time to
+      // ensure that the server is fully started before proceeding with tests.
+    } catch (InterruptedException e) {
+      // ignore
+    }
+    getKafkaConsumer();
+    logger.info("Completed the prepare phase.");
+  }
+
+  public void tearDown() throws IOException {
+    logger.info("Shutting down the Kafka Consumer.");
+    getKafkaConsumer().shutdown();
+    try {
+      Thread.sleep(3 * 1000);   // add this sleep time to
+      // ensure that the server is fully started before proceeding with tests.
+    } catch (InterruptedException e) {
+      // ignore
+    }
+    logger.info("Shutting down the kafka Server.");
+    kafkaServer.stop();
+    logger.info("Shutting down Zookeeper Server.");
+    zookeeperServer.stopZookeeper();
+    logger.info("Completed the tearDown phase.");
+  }
+
+  public String getZkUrl() {
+    return zookeeperServer.getConnectString();
+  }
+
+  public String getKafkaServerUrl() {
+    return "localhost:"+kafkaLocalPort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java
----------------------------------------------------------------------
diff --git 
a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java
 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java
new file mode 100644
index 0000000..27660bf
--- /dev/null
+++ 
b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java
@@ -0,0 +1,72 @@
+/**
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ limitations under the License.
+ */
+
+package org.apache.sqoop.common.test.kafka;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
+public class ZooKeeperLocal {
+  private int zkPort;
+  private ZooKeeperServer zookeeper;
+  private NIOServerCnxnFactory factory;
+  File dir;
+
+
+  public ZooKeeperLocal(int zkPort){
+    int numConnections = 5000;
+    int tickTime = 2000;
+
+    this.zkPort = zkPort;
+
+    String dataDirectory = "target";
+    dir = new File(dataDirectory, "zookeeper").getAbsoluteFile();
+
+    try {
+      this.zookeeper = new ZooKeeperServer(dir,dir,tickTime);
+      this.factory = new NIOServerCnxnFactory();
+      factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0);
+      factory.startup(zookeeper);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void stopZookeeper() throws IOException {
+    zookeeper.shutdown();
+    factory.shutdown();
+    FileUtils.deleteDirectory(dir);
+  }
+
+  public String getConnectString() {
+    return "127.0.0.1:"+zkPort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-kafka/pom.xml 
b/connector/connector-kafka/pom.xml
new file mode 100644
index 0000000..8786bff
--- /dev/null
+++ b/connector/connector-kafka/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>connector</artifactId>
+    <groupId>org.apache.sqoop</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.sqoop.connector</groupId>
+  <artifactId>sqoop-connector-kafka</artifactId>
+  <name>Sqoop Kafka Connector</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-spi</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>connector-sdk</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-common-test</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java
new file mode 100644
index 0000000..84b4be8
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.kafka;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+public class KafkaConnector extends SqoopConnector {
+
+  private static final To TO = new To(
+          KafkaToInitializer.class,
+          KafkaLoader.class,
+          KafkaToDestroyer.class);
+
+  /**
+   * Retrieve connector version.
+   *
+   * @return Version encoded as a string
+   */
+  @Override
+  public String getVersion() {
+    return VersionInfo.getBuildVersion();
+  }
+
+  /**
+   * @param locale
+   * @return the resource bundle associated with the given locale.
+   */
+  @Override
+  public ResourceBundle getBundle(Locale locale) {
+    return ResourceBundle.getBundle(KafkaConstants.RESOURCE_BUNDLE_NAME, 
locale);
+  }
+
+  /**
+   * @return Get link configuration group class
+   */
+  @Override
+  public Class getLinkConfigurationClass() {
+    return LinkConfiguration.class;
+  }
+
+  /**
+   * @param direction
+   * @return Get job configuration group class per direction type or null if
+   * not supported
+   */
+  @Override
+  public Class getJobConfigurationClass(Direction direction) {
+    return ToJobConfiguration.class;
+  }
+
+  @Override
+  public List<Direction> getSupportedDirections() {
+    // TODO: Remove when we add the FROM part of the connector (SQOOP-1583)
+    return Arrays.asList(Direction.TO);
+  }
+
+  /**
+   * @return an <tt>From</tt> that provides classes for performing import.
+   */
+  @Override
+  public From getFrom() {
+    //TODO: SQOOP-1583
+    return null;
+  }
+
+  /**
+   * @return an <tt>To</tt> that provides classes for performing export.n
+   */
+  @Override
+  public To getTo() {
+    return TO;
+  }
+
+  /**
+   * Returns an {@linkplain 
org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} object that can 
upgrade the
+   * configs related to the link and job
+   *
+   * @return RespositoryUpgrader object
+   */
+  @Override
+  public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
+    // Nothing to upgrade at this point
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java
new file mode 100644
index 0000000..f94efea
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum KafkaConnectorErrors implements ErrorCode {
+
+  KAFKA_CONNECTOR_0000("Unknown error occurred."),
+  KAFKA_CONNECTOR_0001("Error occurred while sending data to Kafka")
+  ;
+
+  private final String message;
+
+  private KafkaConnectorErrors(String message) {
+    this.message = message;
+  }
+
+  @Override
+  public String getCode() {
+    return name();
+  }
+
+  /**
+   * @return the message associated with error code.
+   */
+  @Override
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java
new file mode 100644
index 0000000..9d3877d
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka;
+
+import org.apache.sqoop.job.Constants;
+
+public class KafkaConstants extends Constants {
+  // Resource bundle name
+  public static final String RESOURCE_BUNDLE_NAME = "kafka-connector-config";
+
+  // Kafka properties keys
+  public static final String MESSAGE_SERIALIZER_KEY = "serializer.class";
+  public static final String KEY_SERIALIZER_KEY = "key.serializer.class";
+  public static final String BROKER_LIST_KEY = "metadata.broker.list";
+  public static final String REQUIRED_ACKS_KEY = "request.required.acks";
+  public static final String PRODUCER_TYPE = "producer.type";
+
+  // Kafka properties default values
+  public static final String DEFAULT_MESSAGE_SERIALIZER =
+          "kafka.serializer.StringEncoder";
+  public static final String DEFAULT_KEY_SERIALIZER =
+          "kafka.serializer.StringEncoder";
+  public static final String DEFAULT_REQUIRED_ACKS = "-1";
+  public static final String DEFAULT_PRODUCER_TYPE = "sync";
+  public static final int DEFAULT_BATCH_SIZE = 100;
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
new file mode 100644
index 0000000..5d79516
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka;
+
+import kafka.producer.KeyedMessage;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+public class KafkaLoader extends Loader<LinkConfiguration,ToJobConfiguration> {
+  private static final Logger LOG = Logger.getLogger(KafkaLoader.class);
+
+  private List<KeyedMessage<String, String>> messageList =
+          new ArrayList<KeyedMessage<String, 
String>>(KafkaConstants.DEFAULT_BATCH_SIZE);
+  private Producer producer;
+
+  @Override
+  public void load(LoaderContext context,LinkConfiguration linkConfiguration, 
ToJobConfiguration jobConfiguration) throws
+          Exception {
+    producer = getProducer(linkConfiguration);
+    LOG.info("got producer");
+    String topic = jobConfiguration.toJobConfig.topic;
+    LOG.info("topic is:"+topic);
+    String batchUUID = UUID.randomUUID().toString();
+    String record;
+
+    while ((record = context.getDataReader().readTextRecord()) != null) {
+      // create a message and add to buffer
+      KeyedMessage<String, String> data = new KeyedMessage<String, String>
+              (topic, null, batchUUID, record);
+      messageList.add(data);
+      // If we have enough messages, send the batch to Kafka
+      if (messageList.size() >= KafkaConstants.DEFAULT_BATCH_SIZE) {
+        sendToKafka(messageList);
+      }
+    }
+
+    if (messageList.size() > 0) {
+      sendToKafka(messageList);
+    }
+
+    producer.close();
+  }
+
+  private void sendToKafka(List<KeyedMessage<String,String>> messageList) {
+    try {
+      producer.send(messageList);
+      messageList.clear();
+    } catch (Exception ex) {
+      throw new SqoopException(KafkaConnectorErrors.KAFKA_CONNECTOR_0001);
+    }
+  }
+
+  /**
+   * Initialize a Kafka producer using configs in LinkConfiguration
+   * @param linkConfiguration
+   * @return
+   */
+  Producer getProducer(LinkConfiguration linkConfiguration) {
+    Properties kafkaProps =  generateDefaultKafkaProps();
+    kafkaProps.put(KafkaConstants.BROKER_LIST_KEY, 
linkConfiguration.linkConfig.brokerList);
+    ProducerConfig config = new ProducerConfig(kafkaProps);
+    return new Producer<String, String>(config);
+  }
+
+  /**
+   * Generate producer properties object with some defaults
+   * @return
+   */
+  private Properties generateDefaultKafkaProps() {
+    Properties props = new Properties();
+    props.put(KafkaConstants.MESSAGE_SERIALIZER_KEY,
+            KafkaConstants.DEFAULT_MESSAGE_SERIALIZER);
+    props.put(KafkaConstants.KEY_SERIALIZER_KEY,
+            KafkaConstants.DEFAULT_KEY_SERIALIZER);
+    props.put(KafkaConstants.REQUIRED_ACKS_KEY,
+            KafkaConstants.DEFAULT_REQUIRED_ACKS);
+    
props.put(KafkaConstants.PRODUCER_TYPE,KafkaConstants.DEFAULT_PRODUCER_TYPE);
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java
new file mode 100644
index 0000000..c522d91
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class KafkaToDestroyer extends 
Destroyer<LinkConfiguration,ToJobConfiguration> {
+
+  private static final Logger LOG = Logger.getLogger(KafkaToDestroyer.class);
+
+
+  public void destroy(DestroyerContext context, LinkConfiguration 
linkConfiguration, ToJobConfiguration jobConfiguration) {
+    LOG.info("Running Kafka Connector destroyer. This does nothing except log 
this message.");
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
new file mode 100644
index 0000000..e1b065a
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.utils.ClassUtils;
+
+import java.util.List;
+
+public class KafkaToInitializer extends 
Initializer<LinkConfiguration,ToJobConfiguration> {
+
+  private static final Logger LOG = Logger.getLogger(KafkaToInitializer.class);
+
+  @Override
+  public void initialize(InitializerContext context,LinkConfiguration 
linkConfiguration, ToJobConfiguration jobConfiguration) {
+    LOG.info("Running Kafka Connector initializer. This does nothing except 
log this message.");
+  }
+
+
+  @Override
+  public List<String> getJars(InitializerContext context, LinkConfiguration
+          linkConfiguration, ToJobConfiguration toJobConfiguration) {
+    List<String> jars = super.getJars(context, linkConfiguration, 
toJobConfiguration);
+    // Jars for Kafka, Scala and Yammer (required by Kafka)
+    jars.add(ClassUtils.jarForClass("kafka.javaapi.producer.Producer"));
+    jars.add(ClassUtils.jarForClass("scala.collection.immutable.StringLike"));
+    jars.add(ClassUtils.jarForClass("com.yammer.metrics.Metrics"));
+    return jars;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java
new file mode 100644
index 0000000..98112e7
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+
+@ConfigClass
+public class LinkConfig {
+  @Input(size=1024, validators = { @Validator(CSVURIValidator.class) }) public 
String brokerList;
+  @Input(size=255, validators = { @Validator(CSVURIValidator.class) })  public 
String zookeeperConnect;
+
+  public static class CSVURIValidator extends AbstractValidator<String> {
+
+    // validate that given string is a comma-separated list of host:port
+    @Override
+    public void validate(String str) {
+      if(str == null || str !="") {
+        String[] pairs = str.split("\\s*,\\s*");
+        for (String pair: pairs) {
+          String[] parts = pair.split("\\s*:\\s*");
+          if (parts.length == 1) {
+            addMessage(Status.ERROR,"can't parse into host:port pairs");
+          }
+        }
+      } else {
+        addMessage(Status.ERROR, "Can't be null nor empty");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java
new file mode 100644
index 0000000..3471307
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka.configuration;
+
+import org.apache.sqoop.model.Config;
+import org.apache.sqoop.model.ConfigurationClass;
+
+@ConfigurationClass
+public class LinkConfiguration {
+  @Config
+  public LinkConfig linkConfig;
+
+  public LinkConfiguration() {
+    linkConfig = new LinkConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java
new file mode 100644
index 0000000..04387d9
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+@ConfigClass
+public class ToJobConfig {
+  @Input(size=255, validators = { @Validator(NotEmpty.class) }) public String 
topic;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java
 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..d294b27
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka.configuration;
+
+import org.apache.sqoop.model.Config;
+import org.apache.sqoop.model.ConfigurationClass;
+
+@ConfigurationClass
+public class ToJobConfiguration {
+  @Config
+  public ToJobConfig toJobConfig;
+
+  public ToJobConfiguration() {
+    toJobConfig = new ToJobConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/resources/kafka-connector-config.properties
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/resources/kafka-connector-config.properties
 
b/connector/connector-kafka/src/main/resources/kafka-connector-config.properties
new file mode 100644
index 0000000..0d6fca3
--- /dev/null
+++ 
b/connector/connector-kafka/src/main/resources/kafka-connector-config.properties
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Generic Kafka Connector Resources
+
+############################
+
+# Link Config
+linkConfig.label = Link configuration
+linkConfig.help = Here you supply information necessary to connect to Kafka
+
+linkConfig.brokerList.label = List of Kafka brokers
+linkConfig.brokerList.help = Comma-separated list of Kafka brokers in the form 
of host:port. \
+                             It doesn't need to contain all brokers, but at 
least two are recommended for high availability
+
+linkConfig.zookeeperConnect.label = Zookeeper address
+linkConfig.zookeeperConnect.help = Address of Zookeeper used by the Kafka 
cluster. Usually host:port. \
+                                   Multiple zookeeper nodes are supported. If 
Kafka is stored in its own znode \
+                                   use host:port\kafka
+# To Job Config
+#
+toJobConfig.label = ToJob configuration
+toJobConfig.help = Configuration necessary when writing data to Kafka
+
+toJobConfig.topic.label = Kafka topic
+toJobConfig.topic.help = Name of Kafka topic where we'll send the data

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/resources/sqoopconnector.properties
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/main/resources/sqoopconnector.properties 
b/connector/connector-kafka/src/main/resources/sqoopconnector.properties
new file mode 100644
index 0000000..90d5dd6
--- /dev/null
+++ b/connector/connector-kafka/src/main/resources/sqoopconnector.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Generic Kafka Connector Properties
+org.apache.sqoop.connector.class = 
org.apache.sqoop.connector.kafka.KafkaConnector
+org.apache.sqoop.connector.name = kafka-connector
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java
 
b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java
new file mode 100644
index 0000000..b61d979
--- /dev/null
+++ 
b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka;
+
+import org.apache.sqoop.connector.kafka.configuration.LinkConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestConfigValidator {
+  @Test
+  public void testValidURI() {
+    String[] URI = {
+            "broker1:9092",
+            "broker1:9092,broker2:9092",
+            "zk1:2181/kafka",
+            "zk1:2181,zk2:2181/kafka"
+    };
+
+    for (String uri: URI) {
+      LinkConfig.CSVURIValidator validator = new LinkConfig.CSVURIValidator();
+      validator.validate(uri);
+      assertTrue(validator.getStatus().canProceed());
+    }
+  }
+
+  @Test
+  public void testInvalidURI() {
+    String[] URI = {
+            "",
+            "broker",
+            "broker1:9092,broker"
+    };
+    for (String uri: URI) {
+      LinkConfig.CSVURIValidator validator = new LinkConfig.CSVURIValidator();
+      validator.validate(uri);
+      assertFalse(validator.getStatus().canProceed());
+    }
+
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
 
b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
new file mode 100644
index 0000000..f896e9e
--- /dev/null
+++ 
b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kafka;
+
+import kafka.message.MessageAndMetadata;
+import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
+import org.apache.sqoop.common.test.kafka.TestUtil;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.LoaderContext;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestKafkaLoader {
+
+  private static TestUtil testUtil = TestUtil.getInstance();
+  private static final int NUMBER_OF_ROWS = 1000;
+  private static KafkaLoader loader;
+  private static String TOPIC = "mytopic";
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    testUtil.prepare();
+    List<String> topics = new ArrayList<String>(1);
+    topics.add(TOPIC);
+    testUtil.initTopicList(topics);
+    loader = new KafkaLoader();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    testUtil.tearDown();
+  }
+
+  @Test
+  public void testLoader() throws Exception {
+    LoaderContext context = new LoaderContext(null, new DataReader() {
+      private long index = 0L;
+
+      @Override
+      public Object[] readArrayRecord() {
+        return null;
+      }
+
+      @Override
+      public String readTextRecord() {
+        if (index++ < NUMBER_OF_ROWS) {
+          return index + "," + (double)index + ",'" + index + "'";
+        } else {
+          return null;
+        }
+      }
+
+      @Override
+      public Object readContent() {
+        return null;
+      }
+    }, null);
+    LinkConfiguration linkConf = new LinkConfiguration();
+    ToJobConfiguration jobConf = new ToJobConfiguration();
+    linkConf.linkConfig.brokerList = testUtil.getKafkaServerUrl();
+    linkConf.linkConfig.zookeeperConnect = testUtil.getZkUrl();
+    jobConf.toJobConfig.topic = TOPIC;
+
+    loader.load(context, linkConf, jobConf);
+
+    for(int i=1;i<=NUMBER_OF_ROWS;i++) {
+      MessageAndMetadata<byte[],byte[]> fetchedMsg =
+              testUtil.getNextMessageFromConsumer(TOPIC);
+      Assert.assertEquals(i + "," + (double) i + "," + "'" + i + "'",
+              new String((byte[]) fetchedMsg.message(), "UTF-8"));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/pom.xml
----------------------------------------------------------------------
diff --git a/connector/pom.xml b/connector/pom.xml
index da4ed3e..dfa7e88 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -37,10 +37,11 @@ limitations under the License.
     <module>connector-generic-jdbc</module>
     <module>connector-hdfs</module>
     <module>connector-kite</module>
-      <!-- Uncomment and finish connectors after sqoop framework will become 
stable
-      <module>connector-mysql-jdbc</module>
-      <module>connector-mysql-fastpath</module>
-      -->
+    <module>connector-kafka</module>
+    <!-- Uncomment and finish connectors after sqoop framework will become 
stable
+    <module>connector-mysql-jdbc</module>
+    <module>connector-mysql-fastpath</module>
+    -->
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e182176..efb9659 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,8 @@ limitations under the License.
     <joda.version>2.4</joda.version>
     <kitesdk.version>0.17.0</kitesdk.version>
     <slf4j.version>1.6.1</slf4j.version>
+    <zookeeper.version>3.4.6</zookeeper.version>
+    <kafka.version>0.8.1.1</kafka.version>
   </properties>
 
   <dependencies>
@@ -353,6 +355,17 @@ limitations under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.sqoop.connector</groupId>
+        <artifactId>sqoop-connector-kafka</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.sqoop.connector</groupId>
+        <artifactId>sqoop-connector-kafka</artifactId>
+        <version>${project.version}</version>
+        <type>test-jar</type>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.sqoop.connector</groupId>
         <artifactId>sqoop-connector-mysql-jdbc</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -527,6 +540,16 @@ limitations under the License.
         <version>${tomcat.version}</version>
         <scope>provided</scope>
       </dependency>
+      <dependency>
+        <groupId>org.apache.zookeeper</groupId>
+        <artifactId>zookeeper</artifactId>
+        <version>${zookeeper.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka_2.10</artifactId>
+        <version>${kafka.version}</version>
+      </dependency>
    </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 1adcca0..77477ee 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -84,6 +84,11 @@ limitations under the License.
       <artifactId>sqoop-connector-kite</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.sqoop.connector</groupId>
+      <artifactId>sqoop-connector-kafka</artifactId>
+    </dependency>
+
 <!--
     <dependency>
       <groupId>org.apache.sqoop.connector</groupId>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java 
b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index dc237b8..c9f3dd7 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -92,6 +92,7 @@ public class JobRequestHandler implements RequestHandler {
 
   @Override
   public JsonBean handleEvent(RequestContext ctx) {
+    LOG.info("Got job request");
     switch (ctx.getMethod()) {
     case GET:
       if (STATUS.equals(ctx.getLastURLElement())) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index eedb545..35d36c1 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -88,6 +88,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.sqoop.connector</groupId>
+      <artifactId>sqoop-connector-kafka</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.codehaus.cargo</groupId>
       <artifactId>cargo-core-container-tomcat</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
 
b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
new file mode 100644
index 0000000..41d43c0
--- /dev/null
+++ 
b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.test.testcases;
+
+import kafka.message.MessageAndMetadata;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.apache.sqoop.common.test.kafka.TestUtil;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaConnectorTestCase extends ConnectorTestCase {
+  private static TestUtil testUtil = TestUtil.getInstance();
+  private static final String TOPIC = "mytopic";
+
+  @BeforeClass
+  public static void startKafka() throws IOException {
+    // starts Kafka server and its dependent zookeeper
+    testUtil.prepare();
+  }
+
+  @AfterClass
+  public static void stopKafka() throws IOException {
+    testUtil.tearDown();
+  }
+
+  protected void fillKafkaLinkConfig(MLink link) {
+    MConfigList configs = link.getConnectorLinkConfig();
+    
configs.getStringInput("linkConfig.brokerList").setValue(testUtil.getKafkaServerUrl());
+    
configs.getStringInput("linkConfig.zookeeperConnect").setValue(testUtil.getZkUrl());
+
+  }
+
+  protected void fillKafkaToConfig(MJob job){
+    MConfigList toConfig = job.getJobConfig(Direction.TO);
+    toConfig.getStringInput("toJobConfig.topic").setValue(TOPIC);
+    List<String> topics = new ArrayList<String>(1);
+    topics.add(TOPIC);
+    testUtil.initTopicList(topics);
+  }
+
+  /**
+   * Compare strings in content to the messages in Kafka topic
+   * @param content
+   * @throws UnsupportedEncodingException
+   */
+  protected void validateContent(String[] content) throws 
UnsupportedEncodingException {
+    for(String str: content) {
+      MessageAndMetadata<byte[],byte[]> fetchedMsg =
+              testUtil.getNextMessageFromConsumer(TOPIC);
+      Assert.assertEquals(str,
+              new String(fetchedMsg.message(), "UTF-8"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
 
b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
new file mode 100644
index 0000000..93d657c
--- /dev/null
+++ 
b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.kafka;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MDriverConfig;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.testcases.KafkaConnectorTestCase;
+import org.junit.Test;
+
+
+public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
+
+  private static final String[] input = {
+          "1,'USA','San Francisco'",
+          "2,'USA','Sunnyvale'",
+          "3,'Czech Republic','Brno'",
+          "4,'USA','Palo Alto'"
+  };
+
+  @Test
+  public void testBasic() throws Exception {
+    createAndLoadTableCities();
+
+    // Kafka link
+    MLink kafkaLink = getClient().createLink("kafka-connector");
+    fillKafkaLinkConfig(kafkaLink);
+    saveLink(kafkaLink);
+
+    // RDBMS link
+    MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLink);
+    saveLink(rdbmsLink);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), 
kafkaLink.getPersistenceId());
+
+    // set rdbms "FROM" job config
+    MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+    
fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
+    
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+
+    // set Kafka  "TO" job config
+    fillKafkaToConfig(job);
+
+    // driver config
+    MDriverConfig driverConfig = job.getDriverConfig();
+    driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
+    saveJob(job);
+
+    executeJob(job);
+
+    // this will assert the content of the array matches the content of the 
topic
+    validateContent(input);
+  }
+
+
+}

Reply via email to