Repository: sqoop Updated Branches: refs/heads/sqoop2 9151d305a -> e2e5aa8bf
SQOOP-1852: Sqoop2: Kafka connector supporting TO direction (Gwen Shapira via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e2e5aa8b Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e2e5aa8b Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e2e5aa8b Branch: refs/heads/sqoop2 Commit: e2e5aa8bf531115f8ce4055184a3b8892ce6beb4 Parents: 9151d30 Author: Abraham Elmahrek <[email protected]> Authored: Thu Dec 11 17:53:48 2014 -0600 Committer: Abraham Elmahrek <[email protected]> Committed: Thu Dec 11 17:53:48 2014 -0600 ---------------------------------------------------------------------- common-test/pom.xml | 15 ++ .../sqoop/common/test/kafka/KafkaConsumer.java | 98 +++++++++++ .../sqoop/common/test/kafka/KafkaLocal.java | 57 +++++++ .../sqoop/common/test/kafka/TestUtil.java | 162 +++++++++++++++++++ .../sqoop/common/test/kafka/ZooKeeperLocal.java | 72 +++++++++ connector/connector-kafka/pom.xml | 35 ++++ .../sqoop/connector/kafka/KafkaConnector.java | 115 +++++++++++++ .../connector/kafka/KafkaConnectorErrors.java | 46 ++++++ .../sqoop/connector/kafka/KafkaConstants.java | 42 +++++ .../sqoop/connector/kafka/KafkaLoader.java | 106 ++++++++++++ .../sqoop/connector/kafka/KafkaToDestroyer.java | 35 ++++ .../connector/kafka/KafkaToInitializer.java | 51 ++++++ .../kafka/configuration/LinkConfig.java | 49 ++++++ .../kafka/configuration/LinkConfiguration.java | 31 ++++ .../kafka/configuration/ToJobConfig.java | 28 ++++ .../kafka/configuration/ToJobConfiguration.java | 31 ++++ .../resources/kafka-connector-config.properties | 38 +++++ .../main/resources/sqoopconnector.properties | 18 +++ .../connector/kafka/TestConfigValidator.java | 59 +++++++ .../sqoop/connector/kafka/TestKafkaLoader.java | 96 +++++++++++ connector/pom.xml | 9 +- pom.xml | 23 +++ server/pom.xml | 5 + .../apache/sqoop/handler/JobRequestHandler.java | 1 + test/pom.xml | 5 + .../test/testcases/KafkaConnectorTestCase.java | 78 +++++++++ .../connector/kafka/FromRDBMSToKafkaTest.java | 75 +++++++++ 27 files changed, 1376 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/pom.xml ---------------------------------------------------------------------- diff --git a/common-test/pom.xml b/common-test/pom.xml index 9fd671c..609a875 100644 --- a/common-test/pom.xml +++ b/common-test/pom.xml @@ -60,6 +60,21 @@ limitations under the License. <artifactId>postgresql</artifactId> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java new file mode 100644 index 0000000..78d651b --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaConsumer.java @@ -0,0 +1,98 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A Kafka Consumer implementation. This uses the current thread to fetch the + * next message from the queue and doesn't use a multi threaded implementation. + * So this implements a synchronous blocking call. + * To avoid infinite waiting, a timeout is implemented to wait only for + * 1 second before concluding that the message will not be available. + */ +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger( + KafkaConsumer.class); + + private final ConsumerConnector consumer; + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap; + + public KafkaConsumer() { + consumer = kafka.consumer.Consumer.createJavaConsumerConnector( + createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1")); + } + + private static ConsumerConfig createConsumerConfig(String zkUrl, + String groupId) { + Properties props = new Properties(); + props.put("zookeeper.connect", zkUrl); + props.put("group.id", groupId); + props.put("zookeeper.session.timeout.ms", "1000"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + props.put("auto.offset.reset", "smallest"); + props.put("consumer.timeout.ms","1000"); + return new ConsumerConfig(props); + } + + public void initTopicList(List<String> topics) { + Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); + for (String topic : topics) { + // we need only single threaded consumers + topicCountMap.put(topic, new Integer(1)); + } + consumerMap = consumer.createMessageStreams(topicCountMap); + } + + public MessageAndMetadata getNextMessage(String topic) { + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); + // it has only a single stream, because there is only one consumer + KafkaStream stream = streams.get(0); + final ConsumerIterator<byte[], byte[]> it = stream.iterator(); + int counter = 0; + try { + if (it.hasNext()) { + return it.next(); + } else { + return null; + } + } catch (ConsumerTimeoutException e) { + logger.error("0 messages available to fetch for the topic " + topic); + return null; + } + } + + public void shutdown() { + consumer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java new file mode 100644 index 0000000..b90d14e --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/KafkaLocal.java @@ -0,0 +1,57 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +/** + * A local Kafka server for running unit tests. + * Reference: https://gist.github.com/fjavieralba/7930018/ + */ +public class KafkaLocal { + + public KafkaServerStartable kafka; + public ZooKeeperLocal zookeeper; + private KafkaConfig kafkaConfig; + + public KafkaLocal(Properties kafkaProperties) throws IOException, + InterruptedException{ + kafkaConfig = new KafkaConfig(kafkaProperties); + + //start local kafka broker + kafka = new KafkaServerStartable(kafkaConfig); + } + + public void start() throws Exception{ + kafka.startup(); + } + + public void stop() throws IOException { + kafka.shutdown(); + File dir = new File(kafkaConfig.logDirs().head()).getAbsoluteFile(); + FileUtils.deleteDirectory(dir); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java new file mode 100644 index 0000000..34b8f1e --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/TestUtil.java @@ -0,0 +1,162 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import kafka.message.MessageAndMetadata; +import org.apache.sqoop.common.test.utils.NetworkUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +/** + * A utility class for starting/stopping Kafka Server. + */ +public class TestUtil { + + private static final Logger logger = LoggerFactory.getLogger(TestUtil.class); + private static TestUtil instance = new TestUtil(); + + private Random randPortGen = new Random(System.currentTimeMillis()); + private KafkaLocal kafkaServer; + private ZooKeeperLocal zookeeperServer; + private KafkaConsumer kafkaConsumer; + private String hostname = "localhost"; + private int kafkaLocalPort = 9022; + private int zkLocalPort = 2188; + + private TestUtil() { + init(); + } + + public static TestUtil getInstance() { + return instance; + } + + private void init() { + // get the localhost. + try { + hostname = InetAddress.getLocalHost().getHostName(); + + } catch (UnknownHostException e) { + logger.warn("Error getting the value of localhost. " + + "Proceeding with 'localhost'.", e); + } + } + + private boolean startKafkaServer() throws IOException { + kafkaLocalPort = NetworkUtils.findAvailablePort(); + zkLocalPort = NetworkUtils.findAvailablePort(); + + logger.info("Starting kafka server with kafka port " + kafkaLocalPort + + " and zookeeper port " + zkLocalPort ); + try { + //start local Zookeeper + zookeeperServer = new ZooKeeperLocal(zkLocalPort); + logger.info("ZooKeeper instance is successfully started on port " + + zkLocalPort); + + Properties kafkaProperties = getKafkaProperties(); + + kafkaServer = new KafkaLocal(kafkaProperties); + kafkaServer.start(); + + logger.info("Kafka Server is successfully started on port " + + kafkaLocalPort); + return true; + + } catch (Exception e) { + logger.error("Error starting the Kafka Server.", e); + return false; + } + } + + Properties getKafkaProperties() { + Properties kafkaProps = new Properties(); + kafkaProps.put("broker.id","0"); + // Kafka expects strings for all properties and KafkaConfig will throw an exception otherwise + kafkaProps.put("port",Integer.toString(kafkaLocalPort)); + kafkaProps.put("log.dirs","target/kafka-logs"); + kafkaProps.put("num.partitions","1"); + kafkaProps.put("zookeeper.connect",zookeeperServer.getConnectString()); + + return kafkaProps; + } + + private KafkaConsumer getKafkaConsumer() { + synchronized (this) { + if (kafkaConsumer == null) { + kafkaConsumer = new KafkaConsumer(); + } + } + return kafkaConsumer; + } + + public void initTopicList(List<String> topics) { + getKafkaConsumer().initTopicList(topics); + } + + public MessageAndMetadata getNextMessageFromConsumer(String topic) { + return getKafkaConsumer().getNextMessage(topic); + } + + public void prepare() throws IOException { + boolean startStatus = startKafkaServer(); + if (!startStatus) { + throw new RuntimeException("Error starting the server!"); + } + try { + Thread.sleep(3 * 1000); // add this sleep time to + // ensure that the server is fully started before proceeding with tests. + } catch (InterruptedException e) { + // ignore + } + getKafkaConsumer(); + logger.info("Completed the prepare phase."); + } + + public void tearDown() throws IOException { + logger.info("Shutting down the Kafka Consumer."); + getKafkaConsumer().shutdown(); + try { + Thread.sleep(3 * 1000); // add this sleep time to + // ensure that the server is fully started before proceeding with tests. + } catch (InterruptedException e) { + // ignore + } + logger.info("Shutting down the kafka Server."); + kafkaServer.stop(); + logger.info("Shutting down Zookeeper Server."); + zookeeperServer.stopZookeeper(); + logger.info("Completed the tearDown phase."); + } + + public String getZkUrl() { + return zookeeperServer.getConnectString(); + } + + public String getKafkaServerUrl() { + return "localhost:"+kafkaLocalPort; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java ---------------------------------------------------------------------- diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java new file mode 100644 index 0000000..27660bf --- /dev/null +++ b/common-test/src/main/java/org/apache/sqoop/common/test/kafka/ZooKeeperLocal.java @@ -0,0 +1,72 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.sqoop.common.test.kafka; + +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Properties; + +public class ZooKeeperLocal { + private int zkPort; + private ZooKeeperServer zookeeper; + private NIOServerCnxnFactory factory; + File dir; + + + public ZooKeeperLocal(int zkPort){ + int numConnections = 5000; + int tickTime = 2000; + + this.zkPort = zkPort; + + String dataDirectory = "target"; + dir = new File(dataDirectory, "zookeeper").getAbsoluteFile(); + + try { + this.zookeeper = new ZooKeeperServer(dir,dir,tickTime); + this.factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0); + factory.startup(zookeeper); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void stopZookeeper() throws IOException { + zookeeper.shutdown(); + factory.shutdown(); + FileUtils.deleteDirectory(dir); + } + + public String getConnectString() { + return "127.0.0.1:"+zkPort; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/pom.xml b/connector/connector-kafka/pom.xml new file mode 100644 index 0000000..8786bff --- /dev/null +++ b/connector/connector-kafka/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>connector</artifactId> + <groupId>org.apache.sqoop</groupId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kafka</artifactId> + <name>Sqoop Kafka Connector</name> + + <dependencies> + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop-spi</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector-sdk</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop-common-test</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java new file mode 100644 index 0000000..84b4be8 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kafka; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; + + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +public class KafkaConnector extends SqoopConnector { + + private static final To TO = new To( + KafkaToInitializer.class, + KafkaLoader.class, + KafkaToDestroyer.class); + + /** + * Retrieve connector version. + * + * @return Version encoded as a string + */ + @Override + public String getVersion() { + return VersionInfo.getBuildVersion(); + } + + /** + * @param locale + * @return the resource bundle associated with the given locale. + */ + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle(KafkaConstants.RESOURCE_BUNDLE_NAME, locale); + } + + /** + * @return Get link configuration group class + */ + @Override + public Class getLinkConfigurationClass() { + return LinkConfiguration.class; + } + + /** + * @param direction + * @return Get job configuration group class per direction type or null if + * not supported + */ + @Override + public Class getJobConfigurationClass(Direction direction) { + return ToJobConfiguration.class; + } + + @Override + public List<Direction> getSupportedDirections() { + // TODO: Remove when we add the FROM part of the connector (SQOOP-1583) + return Arrays.asList(Direction.TO); + } + + /** + * @return an <tt>From</tt> that provides classes for performing import. + */ + @Override + public From getFrom() { + //TODO: SQOOP-1583 + return null; + } + + /** + * @return an <tt>To</tt> that provides classes for performing export.n + */ + @Override + public To getTo() { + return TO; + } + + /** + * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} object that can upgrade the + * configs related to the link and job + * + * @return RespositoryUpgrader object + */ + @Override + public ConnectorConfigurableUpgrader getConfigurableUpgrader() { + // Nothing to upgrade at this point + return null; + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java new file mode 100644 index 0000000..f94efea --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnectorErrors.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.sqoop.common.ErrorCode; + +public enum KafkaConnectorErrors implements ErrorCode { + + KAFKA_CONNECTOR_0000("Unknown error occurred."), + KAFKA_CONNECTOR_0001("Error occurred while sending data to Kafka") + ; + + private final String message; + + private KafkaConnectorErrors(String message) { + this.message = message; + } + + @Override + public String getCode() { + return name(); + } + + /** + * @return the message associated with error code. + */ + @Override + public String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java new file mode 100644 index 0000000..9d3877d --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.sqoop.job.Constants; + +public class KafkaConstants extends Constants { + // Resource bundle name + public static final String RESOURCE_BUNDLE_NAME = "kafka-connector-config"; + + // Kafka properties keys + public static final String MESSAGE_SERIALIZER_KEY = "serializer.class"; + public static final String KEY_SERIALIZER_KEY = "key.serializer.class"; + public static final String BROKER_LIST_KEY = "metadata.broker.list"; + public static final String REQUIRED_ACKS_KEY = "request.required.acks"; + public static final String PRODUCER_TYPE = "producer.type"; + + // Kafka properties default values + public static final String DEFAULT_MESSAGE_SERIALIZER = + "kafka.serializer.StringEncoder"; + public static final String DEFAULT_KEY_SERIALIZER = + "kafka.serializer.StringEncoder"; + public static final String DEFAULT_REQUIRED_ACKS = "-1"; + public static final String DEFAULT_PRODUCER_TYPE = "sync"; + public static final int DEFAULT_BATCH_SIZE = 100; + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java new file mode 100644 index 0000000..5d79516 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaLoader.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import kafka.producer.KeyedMessage; +import kafka.javaapi.producer.Producer; +import kafka.producer.ProducerConfig; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +public class KafkaLoader extends Loader<LinkConfiguration,ToJobConfiguration> { + private static final Logger LOG = Logger.getLogger(KafkaLoader.class); + + private List<KeyedMessage<String, String>> messageList = + new ArrayList<KeyedMessage<String, String>>(KafkaConstants.DEFAULT_BATCH_SIZE); + private Producer producer; + + @Override + public void load(LoaderContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) throws + Exception { + producer = getProducer(linkConfiguration); + LOG.info("got producer"); + String topic = jobConfiguration.toJobConfig.topic; + LOG.info("topic is:"+topic); + String batchUUID = UUID.randomUUID().toString(); + String record; + + while ((record = context.getDataReader().readTextRecord()) != null) { + // create a message and add to buffer + KeyedMessage<String, String> data = new KeyedMessage<String, String> + (topic, null, batchUUID, record); + messageList.add(data); + // If we have enough messages, send the batch to Kafka + if (messageList.size() >= KafkaConstants.DEFAULT_BATCH_SIZE) { + sendToKafka(messageList); + } + } + + if (messageList.size() > 0) { + sendToKafka(messageList); + } + + producer.close(); + } + + private void sendToKafka(List<KeyedMessage<String,String>> messageList) { + try { + producer.send(messageList); + messageList.clear(); + } catch (Exception ex) { + throw new SqoopException(KafkaConnectorErrors.KAFKA_CONNECTOR_0001); + } + } + + /** + * Initialize a Kafka producer using configs in LinkConfiguration + * @param linkConfiguration + * @return + */ + Producer getProducer(LinkConfiguration linkConfiguration) { + Properties kafkaProps = generateDefaultKafkaProps(); + kafkaProps.put(KafkaConstants.BROKER_LIST_KEY, linkConfiguration.linkConfig.brokerList); + ProducerConfig config = new ProducerConfig(kafkaProps); + return new Producer<String, String>(config); + } + + /** + * Generate producer properties object with some defaults + * @return + */ + private Properties generateDefaultKafkaProps() { + Properties props = new Properties(); + props.put(KafkaConstants.MESSAGE_SERIALIZER_KEY, + KafkaConstants.DEFAULT_MESSAGE_SERIALIZER); + props.put(KafkaConstants.KEY_SERIALIZER_KEY, + KafkaConstants.DEFAULT_KEY_SERIALIZER); + props.put(KafkaConstants.REQUIRED_ACKS_KEY, + KafkaConstants.DEFAULT_REQUIRED_ACKS); + props.put(KafkaConstants.PRODUCER_TYPE,KafkaConstants.DEFAULT_PRODUCER_TYPE); + return props; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java new file mode 100644 index 0000000..c522d91 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToDestroyer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class KafkaToDestroyer extends Destroyer<LinkConfiguration,ToJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(KafkaToDestroyer.class); + + + public void destroy(DestroyerContext context, LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) { + LOG.info("Running Kafka Connector destroyer. This does nothing except log this message."); + + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java new file mode 100644 index 0000000..e1b065a --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaToInitializer.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.utils.ClassUtils; + +import java.util.List; + +public class KafkaToInitializer extends Initializer<LinkConfiguration,ToJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(KafkaToInitializer.class); + + @Override + public void initialize(InitializerContext context,LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration) { + LOG.info("Running Kafka Connector initializer. This does nothing except log this message."); + } + + + @Override + public List<String> getJars(InitializerContext context, LinkConfiguration + linkConfiguration, ToJobConfiguration toJobConfiguration) { + List<String> jars = super.getJars(context, linkConfiguration, toJobConfiguration); + // Jars for Kafka, Scala and Yammer (required by Kafka) + jars.add(ClassUtils.jarForClass("kafka.javaapi.producer.Producer")); + jars.add(ClassUtils.jarForClass("scala.collection.immutable.StringLike")); + jars.add(ClassUtils.jarForClass("com.yammer.metrics.Metrics")); + return jars; + } + + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java new file mode 100644 index 0000000..98112e7 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfig.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.validators.AbstractValidator; + +@ConfigClass +public class LinkConfig { + @Input(size=1024, validators = { @Validator(CSVURIValidator.class) }) public String brokerList; + @Input(size=255, validators = { @Validator(CSVURIValidator.class) }) public String zookeeperConnect; + + public static class CSVURIValidator extends AbstractValidator<String> { + + // validate that given string is a comma-separated list of host:port + @Override + public void validate(String str) { + if(str == null || str !="") { + String[] pairs = str.split("\\s*,\\s*"); + for (String pair: pairs) { + String[] parts = pair.split("\\s*:\\s*"); + if (parts.length == 1) { + addMessage(Status.ERROR,"can't parse into host:port pairs"); + } + } + } else { + addMessage(Status.ERROR, "Can't be null nor empty"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java new file mode 100644 index 0000000..3471307 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/LinkConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class LinkConfiguration { + @Config + public LinkConfig linkConfig; + + public LinkConfiguration() { + linkConfig = new LinkConfig(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java new file mode 100644 index 0000000..04387d9 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfig.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.validators.NotEmpty; + +@ConfigClass +public class ToJobConfig { + @Input(size=255, validators = { @Validator(NotEmpty.class) }) public String topic; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java new file mode 100644 index 0000000..d294b27 --- /dev/null +++ b/connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/ToJobConfiguration.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class ToJobConfiguration { + @Config + public ToJobConfig toJobConfig; + + public ToJobConfiguration() { + toJobConfig = new ToJobConfig(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/resources/kafka-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/resources/kafka-connector-config.properties b/connector/connector-kafka/src/main/resources/kafka-connector-config.properties new file mode 100644 index 0000000..0d6fca3 --- /dev/null +++ b/connector/connector-kafka/src/main/resources/kafka-connector-config.properties @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic Kafka Connector Resources + +############################ + +# Link Config +linkConfig.label = Link configuration +linkConfig.help = Here you supply information necessary to connect to Kafka + +linkConfig.brokerList.label = List of Kafka brokers +linkConfig.brokerList.help = Comma-separated list of Kafka brokers in the form of host:port. \ + It doesn't need to contain all brokers, but at least two are recommended for high availability + +linkConfig.zookeeperConnect.label = Zookeeper address +linkConfig.zookeeperConnect.help = Address of Zookeeper used by the Kafka cluster. Usually host:port. \ + Multiple zookeeper nodes are supported. If Kafka is stored in its own znode \ + use host:port\kafka +# To Job Config +# +toJobConfig.label = ToJob configuration +toJobConfig.help = Configuration necessary when writing data to Kafka + +toJobConfig.topic.label = Kafka topic +toJobConfig.topic.help = Name of Kafka topic where we'll send the data http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/main/resources/sqoopconnector.properties ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/main/resources/sqoopconnector.properties b/connector/connector-kafka/src/main/resources/sqoopconnector.properties new file mode 100644 index 0000000..90d5dd6 --- /dev/null +++ b/connector/connector-kafka/src/main/resources/sqoopconnector.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic Kafka Connector Properties +org.apache.sqoop.connector.class = org.apache.sqoop.connector.kafka.KafkaConnector +org.apache.sqoop.connector.name = kafka-connector \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java new file mode 100644 index 0000000..b61d979 --- /dev/null +++ b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestConfigValidator.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import org.apache.sqoop.connector.kafka.configuration.LinkConfig; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestConfigValidator { + @Test + public void testValidURI() { + String[] URI = { + "broker1:9092", + "broker1:9092,broker2:9092", + "zk1:2181/kafka", + "zk1:2181,zk2:2181/kafka" + }; + + for (String uri: URI) { + LinkConfig.CSVURIValidator validator = new LinkConfig.CSVURIValidator(); + validator.validate(uri); + assertTrue(validator.getStatus().canProceed()); + } + } + + @Test + public void testInvalidURI() { + String[] URI = { + "", + "broker", + "broker1:9092,broker" + }; + for (String uri: URI) { + LinkConfig.CSVURIValidator validator = new LinkConfig.CSVURIValidator(); + validator.validate(uri); + assertFalse(validator.getStatus().canProceed()); + } + + + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java new file mode 100644 index 0000000..f896e9e --- /dev/null +++ b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kafka; + +import kafka.message.MessageAndMetadata; +import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration; +import org.apache.sqoop.common.test.kafka.TestUtil; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TestKafkaLoader { + + private static TestUtil testUtil = TestUtil.getInstance(); + private static final int NUMBER_OF_ROWS = 1000; + private static KafkaLoader loader; + private static String TOPIC = "mytopic"; + + @BeforeClass + public static void setup() throws IOException { + testUtil.prepare(); + List<String> topics = new ArrayList<String>(1); + topics.add(TOPIC); + testUtil.initTopicList(topics); + loader = new KafkaLoader(); + } + + @AfterClass + public static void tearDown() throws IOException { + testUtil.tearDown(); + } + + @Test + public void testLoader() throws Exception { + LoaderContext context = new LoaderContext(null, new DataReader() { + private long index = 0L; + + @Override + public Object[] readArrayRecord() { + return null; + } + + @Override + public String readTextRecord() { + if (index++ < NUMBER_OF_ROWS) { + return index + "," + (double)index + ",'" + index + "'"; + } else { + return null; + } + } + + @Override + public Object readContent() { + return null; + } + }, null); + LinkConfiguration linkConf = new LinkConfiguration(); + ToJobConfiguration jobConf = new ToJobConfiguration(); + linkConf.linkConfig.brokerList = testUtil.getKafkaServerUrl(); + linkConf.linkConfig.zookeeperConnect = testUtil.getZkUrl(); + jobConf.toJobConfig.topic = TOPIC; + + loader.load(context, linkConf, jobConf); + + for(int i=1;i<=NUMBER_OF_ROWS;i++) { + MessageAndMetadata<byte[],byte[]> fetchedMsg = + testUtil.getNextMessageFromConsumer(TOPIC); + Assert.assertEquals(i + "," + (double) i + "," + "'" + i + "'", + new String((byte[]) fetchedMsg.message(), "UTF-8")); + } + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/connector/pom.xml ---------------------------------------------------------------------- diff --git a/connector/pom.xml b/connector/pom.xml index da4ed3e..dfa7e88 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -37,10 +37,11 @@ limitations under the License. <module>connector-generic-jdbc</module> <module>connector-hdfs</module> <module>connector-kite</module> - <!-- Uncomment and finish connectors after sqoop framework will become stable - <module>connector-mysql-jdbc</module> - <module>connector-mysql-fastpath</module> - --> + <module>connector-kafka</module> + <!-- Uncomment and finish connectors after sqoop framework will become stable + <module>connector-mysql-jdbc</module> + <module>connector-mysql-fastpath</module> + --> </modules> </project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e182176..efb9659 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,8 @@ limitations under the License. <joda.version>2.4</joda.version> <kitesdk.version>0.17.0</kitesdk.version> <slf4j.version>1.6.1</slf4j.version> + <zookeeper.version>3.4.6</zookeeper.version> + <kafka.version>0.8.1.1</kafka.version> </properties> <dependencies> @@ -353,6 +355,17 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kafka</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kafka</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> <artifactId>sqoop-connector-mysql-jdbc</artifactId> <version>${project.version}</version> </dependency> @@ -527,6 +540,16 @@ limitations under the License. <version>${tomcat.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${zookeeper.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${kafka.version}</version> + </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index 1adcca0..77477ee 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -84,6 +84,11 @@ limitations under the License. <artifactId>sqoop-connector-kite</artifactId> </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kafka</artifactId> + </dependency> + <!-- <dependency> <groupId>org.apache.sqoop.connector</groupId> http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java index dc237b8..c9f3dd7 100644 --- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java @@ -92,6 +92,7 @@ public class JobRequestHandler implements RequestHandler { @Override public JsonBean handleEvent(RequestContext ctx) { + LOG.info("Got job request"); switch (ctx.getMethod()) { case GET: if (STATUS.equals(ctx.getLastURLElement())) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index eedb545..35d36c1 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -88,6 +88,11 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kafka</artifactId> + </dependency> + + <dependency> <groupId>org.codehaus.cargo</groupId> <artifactId>cargo-core-container-tomcat</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java new file mode 100644 index 0000000..41d43c0 --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/testcases/KafkaConnectorTestCase.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.testcases; + +import kafka.message.MessageAndMetadata; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.model.MConfigList; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.apache.sqoop.common.test.kafka.TestUtil; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; + +public class KafkaConnectorTestCase extends ConnectorTestCase { + private static TestUtil testUtil = TestUtil.getInstance(); + private static final String TOPIC = "mytopic"; + + @BeforeClass + public static void startKafka() throws IOException { + // starts Kafka server and its dependent zookeeper + testUtil.prepare(); + } + + @AfterClass + public static void stopKafka() throws IOException { + testUtil.tearDown(); + } + + protected void fillKafkaLinkConfig(MLink link) { + MConfigList configs = link.getConnectorLinkConfig(); + configs.getStringInput("linkConfig.brokerList").setValue(testUtil.getKafkaServerUrl()); + configs.getStringInput("linkConfig.zookeeperConnect").setValue(testUtil.getZkUrl()); + + } + + protected void fillKafkaToConfig(MJob job){ + MConfigList toConfig = job.getJobConfig(Direction.TO); + toConfig.getStringInput("toJobConfig.topic").setValue(TOPIC); + List<String> topics = new ArrayList<String>(1); + topics.add(TOPIC); + testUtil.initTopicList(topics); + } + + /** + * Compare strings in content to the messages in Kafka topic + * @param content + * @throws UnsupportedEncodingException + */ + protected void validateContent(String[] content) throws UnsupportedEncodingException { + for(String str: content) { + MessageAndMetadata<byte[],byte[]> fetchedMsg = + testUtil.getNextMessageFromConsumer(TOPIC); + Assert.assertEquals(str, + new String(fetchedMsg.message(), "UTF-8")); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/e2e5aa8b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java new file mode 100644 index 0000000..93d657c --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.kafka; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.model.MConfigList; +import org.apache.sqoop.model.MDriverConfig; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.testcases.KafkaConnectorTestCase; +import org.junit.Test; + + +public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase { + + private static final String[] input = { + "1,'USA','San Francisco'", + "2,'USA','Sunnyvale'", + "3,'Czech Republic','Brno'", + "4,'USA','Palo Alto'" + }; + + @Test + public void testBasic() throws Exception { + createAndLoadTableCities(); + + // Kafka link + MLink kafkaLink = getClient().createLink("kafka-connector"); + fillKafkaLinkConfig(kafkaLink); + saveLink(kafkaLink); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // Job creation + MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), kafkaLink.getPersistenceId()); + + // set rdbms "FROM" job config + MConfigList fromConfig = job.getJobConfig(Direction.FROM); + fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName())); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + + // set Kafka "TO" job config + fillKafkaToConfig(job); + + // driver config + MDriverConfig driverConfig = job.getDriverConfig(); + driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3); + saveJob(job); + + executeJob(job); + + // this will assert the content of the array matches the content of the topic + validateContent(input); + } + + +}
