Repository: sqoop Updated Branches: refs/heads/sqoop2 51dd8375f -> 59ffd8802
SQOOP-1588: Sqoop2: TO-side: Write data to HDFS (Qian Xu via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/59ffd880 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/59ffd880 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/59ffd880 Branch: refs/heads/sqoop2 Commit: 59ffd880258e63775d58905e6059da1c4287aa44 Parents: 51dd837 Author: Abraham Elmahrek <[email protected]> Authored: Mon Nov 17 21:43:21 2014 -0800 Committer: Abraham Elmahrek <[email protected]> Committed: Mon Nov 17 22:03:17 2014 -0800 ---------------------------------------------------------------------- connector/connector-kite/pom.xml | 103 ++++++++++++ .../sqoop/connector/kite/KiteConnector.java | 99 +++++++++++ .../connector/kite/KiteConnectorConstants.java | 32 ++++ .../connector/kite/KiteConnectorError.java | 46 +++++ .../connector/kite/KiteDatasetExecutor.java | 167 +++++++++++++++++++ .../apache/sqoop/connector/kite/KiteLoader.java | 79 +++++++++ .../sqoop/connector/kite/KiteToDestroyer.java | 69 ++++++++ .../sqoop/connector/kite/KiteToInitializer.java | 72 ++++++++ .../kite/configuration/LinkConfig.java | 30 ++++ .../kite/configuration/LinkConfiguration.java | 33 ++++ .../kite/configuration/ToJobConfig.java | 46 +++++ .../kite/configuration/ToJobConfiguration.java | 33 ++++ .../connector/kite/util/InputValidation.java | 40 +++++ .../connector/kite/util/KiteDataTypeUtil.java | 160 ++++++++++++++++++ .../main/resources/connector-configs.properties | 48 ++++++ .../resources/kite-connector-config.properties | 40 +++++ .../main/resources/sqoopconnector.properties | 18 ++ .../sqoop/connector/kite/TestKiteExecutor.java | 106 ++++++++++++ .../sqoop/connector/kite/TestKiteLoader.java | 97 +++++++++++ .../connector/kite/TestKiteToDestroyer.java | 110 ++++++++++++ .../connector/kite/TestKiteToInitializer.java | 88 ++++++++++ .../src/test/resources/log4j.properties | 24 +++ .../sqoop/connector/common/FileFormat.java | 40 +++++ .../apache/sqoop/connector/common/JarUtil.java | 57 +++++++ connector/pom.xml | 1 + pom.xml | 41 +++++ server/pom.xml | 5 + test/pom.xml | 5 + 28 files changed, 1689 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-kite/pom.xml b/connector/connector-kite/pom.xml new file mode 100644 index 0000000..10ed099 --- /dev/null +++ b/connector/connector-kite/pom.xml @@ -0,0 +1,103 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kite</artifactId> + <name>Sqoop Kite Connector</name> + + <dependencies> + <!-- Common modules --> + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>sqoop-spi</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector-sdk</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + + <!-- Testing specified modules --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <scope>test</scope> + </dependency> + + <!-- Kite required modules --> + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-data-core</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + </dependencies> + + <build> + <finalName>sqoop</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java new file mode 100644 index 0000000..c864882 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +/** + * Kite connector enables access to data in HDFS or HBase in diverse file + * formats (CSV, Avro and Parquet). The power behind the scenes is + * <a href="http://kitesdk.org/">Kite Library</a>. + */ +public class KiteConnector extends SqoopConnector { + + private static final To TO = new To( + KiteToInitializer.class, + KiteLoader.class, + KiteToDestroyer.class); + + @Override + public String getVersion() { + return VersionInfo.getBuildVersion(); + } + + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle( + KiteConnectorConstants.RESOURCE_BUNDLE_NAME, locale); + } + + @Override + public Class getLinkConfigurationClass() { + return LinkConfiguration.class; + } + + @Override + public Class getJobConfigurationClass(Direction jobType) { + switch (jobType) { + case FROM: + // TODO: SQOOP-1647 + return null; + case TO: + return ToJobConfiguration.class; + default: + return null; + } + } + + @Override + public List<Direction> getSupportedDirections() { + // TODO: No need to override, when SQOOP-1647 is done + return Arrays.asList(Direction.TO); + } + + @Override + public From getFrom() { + // TODO: SQOOP-1647 + return null; + } + + @Override + public To getTo() { + return TO; + } + + @Override + public ConnectorConfigurableUpgrader getConfigurableUpgrader() { + // TODO: SQOOP-1624 + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorConstants.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorConstants.java new file mode 100644 index 0000000..d16b85e --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorConstants.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.job.Constants; + +public final class KiteConnectorConstants extends Constants { + + // Resource bundle name + public static final String RESOURCE_BUNDLE_NAME = "kite-connector-config"; + + private KiteConnectorConstants() { + // Disable explicit object creation + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java new file mode 100644 index 0000000..d67c8de --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.common.ErrorCode; + +public enum KiteConnectorError implements ErrorCode { + + /** Unsupported dataset URI scheme */ + GENERIC_KITE_CONNECTOR_0000("Unsupported dataset URI scheme"), + + /** Destination is not empty */ + GENERIC_KITE_CONNECTOR_0001("Dataset is not empty"), + + ; + + private final String message; + + private KiteConnectorError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java new file mode 100644 index 0000000..9432e4b --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Closeables; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.connector.kite.util.KiteDataTypeUtil; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetWriter; +import org.kitesdk.data.Datasets; +import org.kitesdk.data.Format; +import org.kitesdk.data.URIBuilder; +import org.kitesdk.data.spi.filesystem.FileSystemDataset; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * The class arranges to perform dataset operations (without thread safety + * guarantee). + */ +public class KiteDatasetExecutor { + + private final Dataset<GenericRecord> dataset; + + private DatasetWriter<GenericRecord> writer; + + /** + * Creates a new dataset. + */ + public KiteDatasetExecutor(String uri, org.apache.sqoop.schema.Schema schema, + FileFormat format) { + Schema datasetSchema = KiteDataTypeUtil.createAvroSchema(schema); + Format datasetFormat = KiteDataTypeUtil.toFormat(format); + DatasetDescriptor descriptor = new DatasetDescriptor.Builder() + .property("kite.allow.csv", "true") + .schema(datasetSchema) + .format(datasetFormat) + .build(); + dataset = Datasets.create(uri, descriptor); + } + + @VisibleForTesting + protected KiteDatasetExecutor(Dataset<GenericRecord> dataset) { + this.dataset = dataset; + } + + /** + * Writes a data record into dataset. + * + * Note that `closeWriter()` should be called explicitly, when no more data is + * going to be written. + */ + public void writeRecord(Object[] data) { + Schema schema = dataset.getDescriptor().getSchema(); + GenericRecord record = KiteDataTypeUtil.createGenericRecord(data, schema); + getOrNewWriter().write(record); + } + + private DatasetWriter<GenericRecord> getOrNewWriter() { + if (writer == null) { + writer = dataset.newWriter(); + } + return writer; + } + + @VisibleForTesting + protected boolean isWriterClosed() { + return writer == null || !writer.isOpen(); + } + + /** + * Closes the writer and releases any system resources. + */ + public void closeWriter() { + if (writer != null) { + Closeables.closeQuietly(writer); + writer = null; + } + } + + /** + * Checks the existence by a specified dataset URI. + */ + public static boolean datasetExists(String uri) { + return Datasets.exists(uri); + } + + /** + * Deletes current dataset physically. + */ + public void deleteDataset() { + deleteDataset(dataset.getUri().toString()); + } + + /** + * Deletes particular dataset physically. + */ + public static boolean deleteDataset(String uri) { + return Datasets.delete(uri); + } + + /** + * Merges a dataset into this. + */ + public void mergeDataset(String uri) { + FileSystemDataset<GenericRecord> update = Datasets.load(uri); + if (dataset instanceof FileSystemDataset) { + ((FileSystemDataset<GenericRecord>) dataset).merge(update); + } else { + throw new SqoopException( + KiteConnectorError.GENERIC_KITE_CONNECTOR_0000, uri); + } + } + + private static final String TEMPORARY_DATASET_PREFIX = "/temp_"; + + /** + * Workaround for managing temporary datasets. + */ + public static String suggestTemporaryDatasetUri(String uri) { + if (uri.startsWith("dataset:hdfs:")) { + return uri + TEMPORARY_DATASET_PREFIX + UUID.randomUUID(); + } else { + throw new SqoopException( + KiteConnectorError.GENERIC_KITE_CONNECTOR_0000, uri); + } + } + + /** + * Workaround for managing temporary datasets. + */ + public static String[] listTemporaryDatasetUris(String uri) { + String repo = URIBuilder.REPO_SCHEME + + uri.substring(URIBuilder.DATASET_SCHEME.length()); + List<String> result = new ArrayList<String>(); + for (URI match : Datasets.list(repo)) { + if (match.toString().contains(TEMPORARY_DATASET_PREFIX)) { + result.add(match.toString()); + } + } + return result.toArray(new String[result.size()]); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java new file mode 100644 index 0000000..709fd94 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.apache.sqoop.schema.Schema; + +/** + * This class allows Kite connector to load data into a target system. + */ +public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(KiteLoader.class); + + @VisibleForTesting + protected KiteDatasetExecutor getExecutor(String uri, Schema schema, + FileFormat format) { + // Note that instead of creating a dataset at destination, we create a + // temporary dataset by every KiteLoader instance. They will be merged when + // all data portions are written successfully. Unfortunately, KiteLoader is + // not able to pass the temporary dataset uri to KiteToDestroyer. So we + // delegate KiteDatasetExecutor to manage name convention for datasets. + uri = KiteDatasetExecutor.suggestTemporaryDatasetUri(uri); + + return new KiteDatasetExecutor(uri, schema, format); + } + + @Override + public void load(LoaderContext context, LinkConfiguration linkConfig, + ToJobConfiguration jobConfig) throws Exception { + KiteDatasetExecutor executor = getExecutor(jobConfig.toJobConfig.uri, + context.getSchema(), linkConfig.linkConfig.fileFormat); + LOG.info("Temporary dataset created."); + + DataReader reader = context.getDataReader(); + Object[] array; + boolean success = false; + long count = 0L; + + try { + while ((array = reader.readArrayRecord()) != null) { + executor.writeRecord(array); + count++; + } + LOG.info(count + " data record(s) have been written into dataset."); + success = true; + } finally { + executor.closeWriter(); + + if (!success) { + LOG.error("Fail to write data, dataset will be removed."); + executor.deleteDataset(); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java new file mode 100644 index 0000000..25912b4 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.log4j.Logger; +import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; +import org.apache.sqoop.schema.Schema; + +/** + * This classes allows connector to define work to complete execution. + * + * When import is done successfully, temporary created datasets will be merged. + * In case of errors, they will be removed physically. + */ +public class KiteToDestroyer extends Destroyer<LinkConfiguration, + ToJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(KiteToDestroyer.class); + + @Override + public void destroy(DestroyerContext context, + LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { + LOG.info("Running Kite connector destroyer"); + String[] uris = KiteDatasetExecutor.listTemporaryDatasetUris( + jobConfig.toJobConfig.uri); + if (context.isSuccess()) { + KiteDatasetExecutor executor = getExecutor( + jobConfig.toJobConfig.uri, context.getSchema(), + linkConfig.linkConfig.fileFormat); + for (String uri : uris) { + executor.mergeDataset(uri); + LOG.info(String.format("Temporary dataset %s has been merged", uri)); + } + } else { + for (String uri : uris) { + KiteDatasetExecutor.deleteDataset(uri); + LOG.warn(String.format("Failed to import. " + + "Temporary dataset %s has been deleted", uri)); + } + } + } + + @VisibleForTesting + protected KiteDatasetExecutor getExecutor(String uri, Schema schema, + FileFormat format) { + return new KiteDatasetExecutor(uri, schema, format); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java new file mode 100644 index 0000000..f78786d --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.common.JarUtil; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; + +import java.util.List; +import java.util.regex.Pattern; + +/** + * This class allows connector to define initialization work for execution. + * + * It will check whether dataset exists in destination already. + */ +public class KiteToInitializer extends Initializer<LinkConfiguration, + ToJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(KiteToInitializer.class); + + // Minimal dependencies for the MR job + private static final Pattern[] JAR_NAME_PATTERNS = { + Pattern.compile("/kite-"), + Pattern.compile("/jackson-(annotations|core|databind)-\\d+"), + }; + + @Override + public void initialize(InitializerContext context, + LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { + if (KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri)) { + LOG.error("Overwrite an existing dataset is not expected in new create mode."); + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001); + } + } + + @Override + public List<String> getJars(InitializerContext context, + LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { + List<String> jars = super.getJars(context, linkConfig, jobConfig); + jars.addAll(JarUtil.getMatchedJars(JAR_NAME_PATTERNS)); + return jars; + } + + @Override + public Schema getSchema(InitializerContext context, + LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { + // TO-direction does not have a schema, so return a dummy schema. + return new Schema("Kite dataset"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java new file mode 100644 index 0000000..89bd9b3 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite.configuration; + +import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; + +@ConfigClass +public class LinkConfig { + + @Input + public FileFormat fileFormat = FileFormat.AVRO; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfiguration.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfiguration.java new file mode 100644 index 0000000..573ff44 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfiguration.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class LinkConfiguration { + + @Config + public LinkConfig linkConfig; + + public LinkConfiguration() { + linkConfig = new LinkConfig(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java new file mode 100644 index 0000000..70b7dc3 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite.configuration; + +import org.apache.sqoop.connector.kite.util.InputValidation; +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.validators.AbstractValidator; + +@ConfigClass(validators = {@Validator(ToJobConfig.ConfigValidator.class)}) +public class ToJobConfig { + + @Input(size = 255) + public String uri; + + public static class ConfigValidator extends AbstractValidator<ToJobConfig> { + + @Override + public void validate(ToJobConfig config) { + try { + InputValidation.validateDatasetUriScheme(config.uri); + } catch (IllegalArgumentException ex) { + addMessage(Status.ERROR, ex.toString()); + } + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfiguration.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfiguration.java new file mode 100644 index 0000000..ca9596c --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfiguration.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class ToJobConfiguration { + + @Config + public ToJobConfig toJobConfig; + + public ToJobConfiguration() { + toJobConfig = new ToJobConfig(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java new file mode 100644 index 0000000..53fab02 --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite.util; + +import java.util.regex.Pattern; + +/** + * The helper class arranges to validate user inputs. + */ +public class InputValidation { + + private static Pattern DATASET_URI_PATTERN = Pattern + .compile("^dataset:(hive|hdfs|file):.*$"); + + /** + * Validates the correctness of user input dataset uri. + */ + public static void validateDatasetUriScheme(String uri) + throws IllegalArgumentException { + if (!DATASET_URI_PATTERN.matcher(uri).matches()) { + throw new IllegalArgumentException("Invalid dataset URI: " + uri); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/KiteDataTypeUtil.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/KiteDataTypeUtil.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/KiteDataTypeUtil.java new file mode 100644 index 0000000..23a715f --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/KiteDataTypeUtil.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite.util; + +import com.google.common.collect.ImmutableMap; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.schema.type.Column; +import org.apache.sqoop.schema.type.ColumnType; +import org.joda.time.LocalDate; +import org.joda.time.LocalDateTime; +import org.kitesdk.data.Format; +import org.kitesdk.data.Formats; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +/** + * The helper class provides methods to convert Sqoop data types to Kite + * supported data types. + */ +public class KiteDataTypeUtil { + + public static final String SQOOP_TYPE = "SqoopType"; + public static final String DEFAULT_SQOOP_SCHEMA_NAMESPACE = "sqoop"; + private static final ImmutableMap<FileFormat, Format> TO_FORMAT_LOOKUP = + ImmutableMap.<FileFormat, Format>builder() + .put(FileFormat.CSV, Formats.CSV) + .put(FileFormat.AVRO, Formats.AVRO) + .put(FileFormat.PARQUET, Formats.PARQUET) + .build(); + + /** + * Creates an Avro schema from a Sqoop schema. + */ + public static Schema createAvroSchema( + org.apache.sqoop.schema.Schema sqoopSchema) { + String name = sqoopSchema.getName(); + String doc = sqoopSchema.getNote(); + String namespace = DEFAULT_SQOOP_SCHEMA_NAMESPACE; + Schema schema = Schema.createRecord(name, doc, namespace, false); + + List<Schema.Field> fields = new ArrayList<Schema.Field>(); + for (Column column : sqoopSchema.getColumns()) { + Schema.Field field = new Schema.Field(column.getName(), + createAvroFieldSchema(column), null, null); + field.addProp(SQOOP_TYPE, column.getType().toString()); + fields.add(field); + } + schema.setFields(fields); + return schema; + } + + private static Schema createAvroFieldSchema(Column column) { + Schema.Type type = toAvroType(column.getType()); + if (!column.getNullable()) { + return Schema.create(type); + } else { + List<Schema> union = new ArrayList<Schema>(); + union.add(Schema.create(type)); + union.add(Schema.create(Schema.Type.NULL)); + return Schema.createUnion(union); + } + } + + private static Schema.Type toAvroType(ColumnType type) + throws IllegalArgumentException { + switch (type) { + case ARRAY: + return Schema.Type.ARRAY; + case BINARY: + return Schema.Type.BYTES; + case BIT: + return Schema.Type.BOOLEAN; + case DATE: + case DATE_TIME: + case TIME: + // TODO: SQOOP-1616 + return Schema.Type.LONG; + case DECIMAL: + // TODO: SQOOP-1616 + return Schema.Type.STRING; + case ENUM: + case SET: + return Schema.Type.ENUM; + case FIXED_POINT: + return Schema.Type.LONG; + case FLOATING_POINT: + return Schema.Type.DOUBLE; + case MAP: + return Schema.Type.MAP; + case TEXT: + return Schema.Type.STRING; + case UNKNOWN: + return Schema.Type.NULL; + default: + throw new IllegalArgumentException( + "Unsupported Sqoop Data Type " + type); + } + } + + /** + * Creates a GenericRecord instance from a Sqoop record. + */ + public static GenericRecord createGenericRecord(Object[] array, + Schema schema) { + GenericRecord record = new GenericData.Record(schema); + List<Schema.Field> fields = schema.getFields(); + + assert array.length == fields.size(); + for (int i = 0; i < array.length; i++) { + String key = fields.get(i).name(); + Object value = toAvro(array[i]); + record.put(key, value); + } + return record; + } + + private static Object toAvro(Object o) { + if (o instanceof BigDecimal) { + return ((BigDecimal) o).toPlainString(); + } else if (o instanceof LocalDate) { + return ((LocalDate) o).toDate().getTime(); + } else if (o instanceof LocalDateTime) { + return ((LocalDateTime) o).toDate().getTime(); + } + return o; + } + + /** + * Converts Sqoop (user input) FileFormat to Kite supported file format. + */ + public static Format toFormat(FileFormat format) + throws IllegalArgumentException { + if (!TO_FORMAT_LOOKUP.containsKey(format)) { + throw new IllegalArgumentException( + "Unsupported File Output Format " + format); + } + return TO_FORMAT_LOOKUP.get(format); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/resources/connector-configs.properties ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/resources/connector-configs.properties b/connector/connector-kite/src/main/resources/connector-configs.properties new file mode 100644 index 0000000..0e37916 --- /dev/null +++ b/connector/connector-kite/src/main/resources/connector-configs.properties @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Kite Connector Resources + +############################ +# Link Config +# +linkConfig.label = Link Configuration +linkConfig.help = You must supply the information requested in order to create a \ + connection object. + +linkConfig.fileFormat.label = File format +linkConfig.fileFormat.help = Format in which data should be serialized + +linkConfig.compression.label = Compression format +linkConfig.compression.help = Compression that should be used for the data + +# To Job Config +# +toDataset.label = To Kite Dataset Configuration +toDataset.help = You must supply the information requested in order to \ + get information where you want to store your data. + +toDataset.uri.label = Dataset URI +toDataset.uri.help = Location to store dataset (i.e. \ + "dataset:hdfs://host:port/user/me/job", \ + "dataset:hive://host:port/table") + +# From Job Config +# +fromDataset.label = From Kite Dataset Configuration +fromDataset.help = Specifies information required to get data from Hadoop ecosystem + +fromDataset.uri.label = Dataset URI +fromDataset.uri.help = Dataset URI that should be exported \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/resources/kite-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/resources/kite-connector-config.properties b/connector/connector-kite/src/main/resources/kite-connector-config.properties new file mode 100644 index 0000000..27c77b4 --- /dev/null +++ b/connector/connector-kite/src/main/resources/kite-connector-config.properties @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Kite Connector Resources + +############################ +# Link Config +# +linkConfig.label = Link Configuration +linkConfig.help = You must supply the information requested in order to create a \ + connection object. + +linkConfig.fileFormat.label = File format +linkConfig.fileFormat.help = Format in which data should be serialized + +linkConfig.compression.label = Compression format +linkConfig.compression.help = Compression that should be used for the data + +# To Job Config +# +toJobConfig.label = To Kite Dataset Configuration +toJobConfig.help = You must supply the information requested in order to \ + get information where you want to store your data. + +toJobConfig.uri.label = Dataset URI +toJobConfig.uri.help = Location to store dataset (i.e. \ + "dataset:hdfs://host:port/user/me/job", \ + "dataset:hive://host:port/table") http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/main/resources/sqoopconnector.properties ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/resources/sqoopconnector.properties b/connector/connector-kite/src/main/resources/sqoopconnector.properties new file mode 100644 index 0000000..75d900e --- /dev/null +++ b/connector/connector-kite/src/main/resources/sqoopconnector.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Kite Connector Properties +org.apache.sqoop.connector.class = org.apache.sqoop.connector.kite.KiteConnector +org.apache.sqoop.connector.name = kite-connector http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java new file mode 100644 index 0000000..5e4edc5 --- /dev/null +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.kite; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetWriter; + +import static junit.framework.TestCase.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.Mock; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestKiteExecutor { + + @Mock + private Dataset<GenericRecord> datasetMock; + + @Mock + private DatasetDescriptor descriptorMock; + + @Mock + private DatasetWriter<GenericRecord> writerMock; + + private KiteDatasetExecutor executor; + + @Before + public void setUp() { + initMocks(this); + when(datasetMock.newWriter()).thenReturn(writerMock); + when(datasetMock.getDescriptor()).thenReturn(descriptorMock); + when(descriptorMock.getSchema()).thenReturn( + new Schema.Parser().parse("{\"name\":\"test\",\"type\":\"record\"," + + "\"fields\":[]}")); + + executor = new KiteDatasetExecutor(datasetMock); + } + + @After + public void tearDown() { + executor.closeWriter(); + assertTrue(executor.isWriterClosed()); + } + + @Test + public void testWriteRecord() { + // setup + final int NUMBER_OF_ROWS = 10; + when(descriptorMock.getSchema()).thenReturn( + new Schema.Parser().parse("{" + + "\"name\":\"test\",\"type\":\"record\"," + + "\"fields\":[" + + "{\"name\":\"f1\",\"type\":\"int\"}," + + "{\"name\":\"f2\",\"type\":\"string\"}" + + "]}")); + + // exercise + for (int i = 0; i < NUMBER_OF_ROWS; i++) { + executor.writeRecord(new Object[]{42, "foo"}); + } + + // verify + verify(writerMock, times(NUMBER_OF_ROWS)).write(any(GenericRecord.class)); + verifyNoMoreInteractions(writerMock); + } + + @Test + public void testCloseWriter() { + // setup + when(writerMock.isOpen()).thenReturn(true); + executor.writeRecord(new Object[]{}); + assertTrue(!executor.isWriterClosed()); + + // exercise + executor.closeWriter(); + + // verify + verify(writerMock, times(1)).close(); + assertTrue(executor.isWriterClosed()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java new file mode 100644 index 0000000..a1016a0 --- /dev/null +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.LoaderContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.Text; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.Mock; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestKiteLoader { + + private KiteLoader loader; + + @Mock + private KiteDatasetExecutor executorMock; + + @Before + public void setUp() { + initMocks(this); + + loader = new KiteLoader() { + @Override + protected KiteDatasetExecutor getExecutor(String uri, Schema schema, + FileFormat format) { + return executorMock; + } + }; + } + + @Test + public void testLoader() throws Exception { + // setup + final int NUMBER_OF_ROWS = 1000; + org.apache.sqoop.schema.Schema schema = + new org.apache.sqoop.schema.Schema("TestLoader"); + schema.addColumn(new Text("TextCol")); + DataReader reader = new DataReader() { + private long index = 0L; + @Override + public Object[] readArrayRecord() { + if (index++ < NUMBER_OF_ROWS) { + return new Object[]{ + Long.toString(index), + }; + } else { + return null; + } + } + @Override + public String readTextRecord() { + return null; + } + @Override + public Object readContent() { + return null; + } + }; + LoaderContext context = new LoaderContext(null, reader, schema); + LinkConfiguration linkConfig = new LinkConfiguration(); + ToJobConfiguration jobConfig = new ToJobConfiguration(); + + // exercise + loader.load(context, linkConfig, jobConfig); + + // verify + verify(executorMock, times(NUMBER_OF_ROWS)).writeRecord( + any(Object[].class)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java new file mode 100644 index 0000000..4051fda --- /dev/null +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.DestroyerContext; +import org.apache.sqoop.schema.Schema; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.Mock; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.verifyStatic; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(KiteDatasetExecutor.class) +public class TestKiteToDestroyer { + + private KiteToDestroyer destroyer; + + private LinkConfiguration linkConfig; + + private ToJobConfiguration jobConfig; + + private final String[] expectedUris = new String[]{"a", "b"}; + + @Mock + private KiteDatasetExecutor executorMock; + + @Before + public void setUp() { + initMocks(this); + mockStatic(KiteDatasetExecutor.class); + + destroyer = new KiteToDestroyer() { + @Override + protected KiteDatasetExecutor getExecutor(String uri, Schema schema, + FileFormat format) { + return executorMock; + } + }; + + linkConfig = new LinkConfiguration(); + linkConfig.linkConfig.fileFormat = FileFormat.AVRO; + jobConfig = new ToJobConfiguration(); + jobConfig.toJobConfig.uri = "dataset:file:/foo/bar"; + } + + @Test + public void testDestroyForSuccessfulJob() { + // setup + DestroyerContext context = new DestroyerContext(null, true, null); + when(KiteDatasetExecutor.listTemporaryDatasetUris(jobConfig.toJobConfig.uri)) + .thenReturn(expectedUris); + + // exercise + destroyer.destroy(context, linkConfig, jobConfig); + + // verify + for (String uri : expectedUris) { + verify(executorMock, times(1)).mergeDataset(uri); + } + } + + @Test + public void testDestroyForFailedJob() { + // setup + DestroyerContext context = new DestroyerContext(null, false, null); + when(KiteDatasetExecutor.listTemporaryDatasetUris(jobConfig.toJobConfig.uri)) + .thenReturn(expectedUris); + for (String uri : expectedUris) { + when(KiteDatasetExecutor.deleteDataset(uri)).thenReturn(true); + } + + // exercise + destroyer.destroy(context, linkConfig, jobConfig); + + // verify + for (String uri : expectedUris) { + verifyStatic(times(1)); + KiteDatasetExecutor.deleteDataset(uri); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java new file mode 100644 index 0000000..5f0525d --- /dev/null +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.schema.Schema; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.Mock; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.powermock.api.mockito.PowerMockito.mockStatic; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(KiteDatasetExecutor.class) +public class TestKiteToInitializer { + + private KiteToInitializer initializer; + + @Mock + private KiteDatasetExecutor executorMock; + + @Before + public void setUp() { + initMocks(this); + mockStatic(KiteDatasetExecutor.class); + + initializer = new KiteToInitializer(); + } + + @Test + public void testInitializePassed() { + // setup + ToJobConfiguration jobConfig = new ToJobConfiguration(); + jobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist"; + when(KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri)) + .thenReturn(false); + + // exercise + initializer.initialize(null, null, jobConfig); + } + + @Test(expected=SqoopException.class) + public void testInitializeFailed() { + // setup + ToJobConfiguration jobConfig = new ToJobConfiguration(); + jobConfig.toJobConfig.uri = "dataset:file:/ds/exist"; + when(KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri)) + .thenReturn(true); + + // exercise + initializer.initialize(null, null, jobConfig); + } + + @Test + public void testGetSchema() { + // exercise + Schema schema = initializer.getSchema(null, null, null); + + // verify + assertNotNull(schema); + assertTrue(schema.isEmpty()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-kite/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/resources/log4j.properties b/connector/connector-kite/src/test/resources/log4j.properties new file mode 100644 index 0000000..44ffced --- /dev/null +++ b/connector/connector-kite/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/FileFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/FileFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/FileFormat.java new file mode 100644 index 0000000..0625fce --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/FileFormat.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.common; + +/** + * Various supported file formats to write + */ +public enum FileFormat { + + /** + * Comma separated text file + */ + CSV, + + /** + * Avro data file + */ + AVRO, + + /** + * Parquet file + */ + PARQUET + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/JarUtil.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/JarUtil.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/JarUtil.java new file mode 100644 index 0000000..acdda6d --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/JarUtil.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.common; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Pattern; + +/** + * The helper class provides methods for loading jars. + */ +public class JarUtil { + + /** + * Returns a list of matched jars from current thread or an empty list. + */ + public static List<String> getMatchedJars(Pattern[] patterns) { + List<String> jars = new LinkedList<String>(); + for (URL url : ((URLClassLoader) Thread.currentThread() + .getContextClassLoader()).getURLs()) { + if (isDesiredJar(url.getPath(), patterns) && + !jars.contains(url.getFile())) { + jars.add(url.toString()); + } + } + return jars; + } + + private static boolean isDesiredJar(String path, Pattern[] patterns) { + if (path.endsWith(".jar")) { + for (Pattern pattern : patterns) { + if (pattern.matcher(path).find()) { + return true; + } + } + } + return false; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/connector/pom.xml ---------------------------------------------------------------------- diff --git a/connector/pom.xml b/connector/pom.xml index e98a0fc..da4ed3e 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -36,6 +36,7 @@ limitations under the License. <module>connector-sdk</module> <module>connector-generic-jdbc</module> <module>connector-hdfs</module> + <module>connector-kite</module> <!-- Uncomment and finish connectors after sqoop framework will become stable <module>connector-mysql-jdbc</module> <module>connector-mysql-fastpath</module> http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 233d3ce..e626c7d 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ limitations under the License. <json-simple.version>1.1</json-simple.version> <junit.version>4.11</junit.version> <mockito.version>1.9.5</mockito.version> + <powermock.version>1.5.6</powermock.version> <log4j.version>1.2.16</log4j.version> <servlet.version>2.5</servlet.version> <cargo.version>1.3.2</cargo.version> @@ -113,6 +114,8 @@ limitations under the License. <jdbc.teradata.version>14.00.00.21</jdbc.teradata.version> <jdbc.netezza.version>6.0</jdbc.netezza.version> <joda.version>2.4</joda.version> + <kitesdk.version>0.17.0</kitesdk.version> + <slf4j.version>1.6.1</slf4j.version> </properties> <dependencies> @@ -339,6 +342,17 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kite</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kite</artifactId> + <type>test-jar</type> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> <artifactId>sqoop-connector-mysql-jdbc</artifactId> <version>${project.version}</version> </dependency> @@ -420,6 +434,16 @@ limitations under the License. <version>${log4j.version}</version> </dependency> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <version>${derby.version}</version> @@ -486,6 +510,23 @@ limitations under the License. <scope>test</scope> </dependency> <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-data-core</artifactId> + <version>${kitesdk.version}</version> + </dependency> + <dependency> <groupId>org.apache.tomcat</groupId> <artifactId>catalina</artifactId> <version>${tomcat.version}</version> http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index 559e389..1adcca0 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -79,6 +79,11 @@ limitations under the License. <artifactId>sqoop-connector-hdfs</artifactId> </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kite</artifactId> + </dependency> + <!-- <dependency> <groupId>org.apache.sqoop.connector</groupId> http://git-wip-us.apache.org/repos/asf/sqoop/blob/59ffd880/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index 2dbb8c5..cafa250 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -83,6 +83,11 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-kite</artifactId> + </dependency> + + <dependency> <groupId>org.codehaus.cargo</groupId> <artifactId>cargo-core-container-tomcat</artifactId> </dependency>
