Repository: sqoop Updated Branches: refs/heads/sqoop2 9ee44d4f0 -> bf4ae0b3c
SQOOP-2126: Sqoop2: Implement FTP TO Connector Support (Jonathan Seidman via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bf4ae0b3 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bf4ae0b3 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bf4ae0b3 Branch: refs/heads/sqoop2 Commit: bf4ae0b3c77f5ffc037193cb12695a450b30d822 Parents: 9ee44d4 Author: Abraham Elmahrek <[email protected]> Authored: Fri May 15 11:18:35 2015 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Fri May 15 11:18:35 2015 -0700 ---------------------------------------------------------------------- connector/connector-ftp/pom.xml | 92 +++++++++ .../sqoop/connector/ftp/FtpConnector.java | 137 ++++++++++++ .../sqoop/connector/ftp/FtpConnectorError.java | 52 +++++ .../connector/ftp/FtpConnectorUpgrader.java | 46 +++++ .../sqoop/connector/ftp/FtpConstants.java | 37 ++++ .../apache/sqoop/connector/ftp/FtpLoader.java | 74 +++++++ .../sqoop/connector/ftp/FtpToDestroyer.java | 41 ++++ .../sqoop/connector/ftp/FtpToInitializer.java | 44 ++++ .../connector/ftp/configuration/LinkConfig.java | 76 +++++++ .../ftp/configuration/LinkConfiguration.java | 34 +++ .../ftp/configuration/ToJobConfig.java | 36 ++++ .../ftp/configuration/ToJobConfiguration.java | 34 +++ .../ftp/ftpclient/FtpConnectorClient.java | 207 +++++++++++++++++++ .../resources/ftp-connector-config.properties | 51 +++++ .../src/main/resources/log4j.properties | 24 +++ .../main/resources/sqoopconnector.properties | 18 ++ .../sqoop/connector/ftp/TestFtpLoader.java | 125 +++++++++++ .../configuration/TestLinkConfiguration.java | 142 +++++++++++++ .../configuration/TestToJobConfiguration.java | 58 ++++++ .../ftp/ftpclient/TestFtpConnectorClient.java | 148 +++++++++++++ .../src/test/resources/log4j.properties | 24 +++ connector/pom.xml | 1 + docs/src/site/sphinx/Connectors.rst | 63 ++++++ pom.xml | 11 + server/pom.xml | 5 + 25 files changed, 1580 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/pom.xml b/connector/connector-ftp/pom.xml new file mode 100644 index 0000000..9a70dfc --- /dev/null +++ b/connector/connector-ftp/pom.xml @@ -0,0 +1,92 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-ftp</artifactId> + <name>Sqoop FTP Connector</name> + + <properties> + <slf4j.version>1.6.1</slf4j.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector-sdk</artifactId> + </dependency> + + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + <version>3.3</version> + </dependency> + + <dependency> + <groupId>org.mockftpserver</groupId> + <artifactId>MockFtpServer</artifactId> + <version>2.4</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + </dependency> + </dependencies> + + <build> + <finalName>sqoop</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java new file mode 100644 index 0000000..ffef1bf --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.ftp; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.VersionInfo; +import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +/** + * Implementation of a Sqoop 2 connector to support data movement to/from an + * FTP server. + */ +public class FtpConnector extends SqoopConnector { + + /** + * Define the TO instance. + */ + private static final To TO = new To(FtpToInitializer.class, + FtpLoader.class, + FtpToDestroyer.class); + + /** + * {@inheritDoc} + * + * Since this is a built-in connector it will return the same version as + * the rest of the Sqoop code. + */ + @Override + public String getVersion() { + return VersionInfo.getBuildVersion(); + } + + /** + * Return the configuration resource bundle for this connector. + * + * @param locale The Locale object. + * + * @return The resource bundle associated with the input locale. + */ + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle(FtpConstants.RESOURCE_BUNDLE_NAME, locale); + } + + /** + * Get the class encapsulating link configuration for this connector. + * + * @return The link configuration class for this connector. + */ + @Override + public Class getLinkConfigurationClass() { + return LinkConfiguration.class; + } + + /** + * Get the appropriate job configuration class for the input direction. + * + * @param direction Whether to return TO or FROM configuration class. + * + * @return Job configuration class for given direction. + */ + @Override + public Class getJobConfigurationClass(Direction direction) { + switch (direction) { + case TO: + return ToJobConfiguration.class; + default: + return null; + } + } + + /** + * Get the object which defines classes for performing import jobs. + * + * @return the From object defining classes for performing import. + */ + @Override + public From getFrom() { + return null; + } + + /** + * Get the object which defines classes for performing export jobs. + * + * @return the To object defining classes for performing export. + */ + @Override + public To getTo() { + return TO; + } + + /** + * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} + * object that can upgrade the connection and job configs. + * + * @return configurable upgrader object + */ + @Override + public ConnectorConfigurableUpgrader getConfigurableUpgrader() { + return new FtpConnectorUpgrader(); + } + + /** + * Return a List of directions supported by this connector. + * + * @return list of enums representing supported directions. + */ + public List<Direction> getSupportedDirections() { + return Arrays.asList(Direction.TO); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java new file mode 100644 index 0000000..77c36ea --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp; + +import org.apache.sqoop.common.ErrorCode; + +/** + * Error messages for FTP connector. + */ +public enum FtpConnectorError implements ErrorCode { + FTP_CONNECTOR_0000("Unknown error occurred."), + FTP_CONNECTOR_0001("Error occurred connecting to FTP server."), + FTP_CONNECTOR_0002("Error occurred disconnecting from FTP server."), + FTP_CONNECTOR_0003("Error occurred transferring data to FTP server."), + FTP_CONNECTOR_0004("Unknown job type") + ; + + private final String message; + + private FtpConnectorError(String message) { + this.message = message; + } + + /** + * {@inheritDoc} + */ + public String getCode() { + return name(); + } + + /** + * {@inheritDoc} + */ + public String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java new file mode 100644 index 0000000..159ba4e --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.connector.ftp; + +import org.apache.sqoop.configurable.ConfigurableUpgradeUtil; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.model.MLinkConfig; +import org.apache.sqoop.model.MToConfig; + +//NOTE: All config types have the similar upgrade path at this point +public class FtpConnectorUpgrader extends ConnectorConfigurableUpgrader { + + /** + * {@inheritDoc} + */ + @Override + public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), + upgradeTarget.getConfigs()); + } + + /** + * {@inheritDoc} + */ + @Override + public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) { + ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), + upgradeTarget.getConfigs()); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java new file mode 100644 index 0000000..8e8b4ad --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.ftp; + +import org.apache.sqoop.job.Constants; + +/** + * Constants for FTP connector. + */ +public final class FtpConstants extends Constants { + + /** + * Name of resource bundle for configuring this connector. + */ + public static final String RESOURCE_BUNDLE_NAME = "ftp-connector-config"; + + /** + * Default port for FTP. + */ + public static final int DEFAULT_PORT = 21; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java new file mode 100644 index 0000000..0ed1eee --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp; + +import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; + +import java.util.UUID; + +/** + * Class to receive data from a From instance and load to a To instance. + */ +public class FtpLoader extends Loader<LinkConfiguration, ToJobConfiguration> { + + /** + * Number of records written by last call to load() method. + */ + private long rowsWritten = 0; + + /** + * Load data to target directory on FTP server. + * + * @param context Loader context object + * @param linkConfiguration Link configuration + * @param toJobConfig Job configuration + * @throws Exception Re-thrown from FTP client code. + */ + @Override + public void load(LoaderContext context, LinkConfiguration linkConfiguration, + ToJobConfiguration toJobConfig) throws Exception { + DataReader reader = context.getDataReader(); + String outputDir = toJobConfig.toJobConfig.outputDirectory; + // Create a unique filename for writing records, since this method will + // likely get called multiple times for a single source file/dataset: + String path = outputDir + "/" + UUID.randomUUID() + ".txt"; + + FtpConnectorClient client = + new FtpConnectorClient(linkConfiguration.linkConfig.server, + linkConfiguration.linkConfig.port); + client.connect(linkConfiguration.linkConfig.username, + linkConfiguration.linkConfig.password); + rowsWritten = client.uploadStream(reader, path); + client.disconnect(); + } + + /** + * Return the number of rows witten by the last call to load() method. + * + * @return Number of rows written by call to loader. + */ + @Override + public long getRowsWritten() { + return rowsWritten; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java new file mode 100644 index 0000000..50208ce --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp; + +import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +/** + * Perform any clean up, etc. tasks when the Sqoop execution completes. + */ +public class FtpToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> { + /** + * Callback to clean up after job execution. + * + * @param context Destroyer context + * @param linkConfig link configuration object + * @param jobConfig TO job configuration object + */ + @Override + public void destroy(DestroyerContext context, LinkConfiguration linkConfig, + ToJobConfiguration jobConfig) { + // do nothing at this point + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java new file mode 100644 index 0000000..96a100d --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp; + +import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; + +/** + * Perform any required initialization before execution of job. + */ +public class FtpToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> { + + /** + * Initialize new submission based on given configuration properties. Any + * needed temporary values might be saved to context object and they will be + * promoted to all other part of the workflow automatically. + * + * @param context Initializer context object + * @param linkConfig link configuration object + * @param jobConfig TO job configuration object + */ + @Override + public void initialize(InitializerContext context, LinkConfiguration linkConfig, + ToJobConfiguration jobConfig) { + // do nothing at this point + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java new file mode 100644 index 0000000..ed9c2cc --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp.configuration; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient; +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.Status; +import org.apache.sqoop.validation.validators.AbstractValidator; +import org.apache.sqoop.validation.validators.NotEmpty; + +/** + * Attributes for FTP connector link configuration. + */ +@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)}) +public class LinkConfig { + + /** + * FTP server hostname. + */ + @Input(size = 256, validators = {@Validator(NotEmpty.class)}) + public String server; + + /** + * FTP server port. Default is port 21. + */ + @Input + public Integer port; + + /** + * Username for server login. + */ + @Input(size = 256, validators = {@Validator(NotEmpty.class)}) + public String username; + + /** + * Password for server login. + */ + @Input(size = 256, sensitive = true) + public String password; + + /** + * Validate that we can log into the server using the supplied credentials. + */ + public static class ConfigValidator extends AbstractValidator<LinkConfig> { + @Override + public void validate(LinkConfig linkConfig) { + try { + FtpConnectorClient client = + new FtpConnectorClient(linkConfig.server, linkConfig.port); + client.connect(linkConfig.username, linkConfig.password); + client.disconnect(); + } catch (SqoopException e) { + addMessage(Status.WARNING, "Can't connect to the FTP server " + + linkConfig.server + " error is " + e.getMessage()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java new file mode 100644 index 0000000..60f1836 --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp.configuration; + +import org.apache.sqoop.model.Config; +import org.apache.sqoop.model.ConfigurationClass; + +/** + * Class to encapsulate link attributes for FTP connector. + */ +@ConfigurationClass +public class LinkConfiguration { + @Config + public LinkConfig linkConfig; + + public LinkConfiguration() { + linkConfig = new LinkConfig(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java new file mode 100644 index 0000000..dc46946 --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; +import org.apache.sqoop.model.Validator; +import org.apache.sqoop.validation.validators.NotEmpty; + +/** + * Attributes for FTP connector TO configuration. + */ +@ConfigClass +public class ToJobConfig { + + /** + * Directory on FTP server to write data to. + */ + @Input(size = 260, validators = {@Validator(NotEmpty.class)}) + public String outputDirectory; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java new file mode 100644 index 0000000..636afbb --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Config; + +/** + * Class to encapsulate TO configuration. + */ +@ConfigurationClass +public class ToJobConfiguration { + @Config + public ToJobConfig toJobConfig; + + public ToJobConfiguration() { + toJobConfig = new ToJobConfig(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java new file mode 100644 index 0000000..7a65173 --- /dev/null +++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.ftp.ftpclient; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPReply; +import org.apache.log4j.Logger; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.ftp.FtpConstants; +import org.apache.sqoop.connector.ftp.FtpConnectorError; +import org.apache.sqoop.etl.io.DataReader; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; + +/** + * Class encapsulating functionality to interact with an FTP server. This class + * uses the Apache Commons Net libraries to provide the FTP functionality. See + * http://commons.apache.org/proper/commons-net/. + */ +public class FtpConnectorClient { + + /** + * Apache Commons Net FTP client. + */ + private FTPClient ftpClient = null; + + /** + * Hostname for FTP server. + */ + private String ftpServer = null; + + /** + * Port for FTP server. + */ + private int ftpPort = FtpConstants.DEFAULT_PORT; + + private static final Logger LOG = Logger.getLogger(FtpConnectorClient.class); + + /** + * Constructor to initialize client. + * + * @param server Hostname of FTP server. + * @param port Port number of FTP server. Pass in null to use default port + * of 21. + */ + public FtpConnectorClient(String server, Integer port) { + ftpClient = new FTPClient(); + ftpServer = server; + if (port != null) { + ftpPort = port.intValue(); + } + } + + /** + * Connect to the FTP server. + * + * @param username Username for server login. + * @param pass Password for server login. + * + * @throws SqoopException Thrown if an error occurs while connecting to server. + */ + public void connect(String username, String pass) + throws SqoopException { + + try { + ftpClient.connect(ftpServer, ftpPort); + LOG.info(getServerReplyAsString()); + int replyCode = ftpClient.getReplyCode(); + if (!FTPReply.isPositiveCompletion(replyCode)) { + ftpClient.disconnect(); + LOG.error("Operation failed. Server reply code: " + replyCode); + throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001, + "Server reply code=" + replyCode); + } + + boolean success = ftpClient.login(username, pass); + LOG.info(getServerReplyAsString()); + if (!success) { + ftpClient.disconnect(); + LOG.error("Could not login to the server" + ftpServer); + throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001); + } else { + LOG.info("logged into " + ftpServer); + } + } catch (IOException e) { + LOG.error("Caught IOException connecting to FTP server: " + + e.getMessage()); + throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001, + "Caught IOException: " + e.getMessage(), e); + } + } + + /** + * Log out and disconnect from FTP server. + * + * @throws SqoopException Thrown if an error occurs while disconnecting from + * server. + */ + public void disconnect() + throws SqoopException { + try { + ftpClient.logout(); + ftpClient.disconnect(); + } catch (IOException e) { + LOG.error("Caught IOException disconnecting from FTP server: " + + e.getMessage()); + throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0002, + "Caught IOException: " + e.getMessage(), e); + } + } + + /** + * Stream records to a file on the FTP server. + * + * @param reader a DataReader object containing data passed from source + * connector. + * @param path directory on FTP server to write files to. + * + * @return Number of records written in call to this method. + * + * @throws SqoopException thrown if error occurs during interaction with + * FTP server. + * @throws Exception thrown if error occurs in DataReader. + */ + public long uploadStream(DataReader reader, String path) + throws SqoopException, Exception { + + OutputStream output = null; + + long recordsWritten = 0; + + try { + output = ftpClient.storeFileStream(path); + if (!FTPReply.isPositivePreliminary(ftpClient.getReplyCode())) { + LOG.error("File transfer failed, server reply=" + + getServerReplyAsString()); + throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003, + getServerReplyAsString()); + } else { + String record; + while ((record = reader.readTextRecord()) != null) { + LOG.info("Writing record to FTP server:" + record); + output.write(record.getBytes()); + output.write(("\n").getBytes()); + recordsWritten++; + } + + output.close(); + if (!ftpClient.completePendingCommand()) { + LOG.error("File transfer failed, server reply=" + + getServerReplyAsString()); + throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003, + getServerReplyAsString()); + } + } + } catch (IOException e) { + LOG.error("Caught IOException: " + e.getMessage()); + throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003, + "Caught IOException: " + e.getMessage(), e); + } finally { + try { + if (output != null) { + output.close(); + } + } catch (IOException e) { + LOG.error("Caught IOException closing FTP output stream: " + + e.getMessage()); + // Throw this in case there was an underlying issue with closing the stream: + throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003, + "Caught IOException closing output stream to FTP server: " + + e.getMessage(), e); + } + } + + return recordsWritten; + } + + /** + * Turn a collection of reply strings from the FTP server into a single string. + * @return String containing a concatenation of replies from the server. + */ + private String getServerReplyAsString() { + String[] replies = ftpClient.getReplyStrings(); + return StringUtils.join(replies); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/ftp-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/resources/ftp-connector-config.properties b/connector/connector-ftp/src/main/resources/ftp-connector-config.properties new file mode 100644 index 0000000..d84157f --- /dev/null +++ b/connector/connector-ftp/src/main/resources/ftp-connector-config.properties @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# FTP Connector Resources + +############################ + +# Link Config +linkConfig.label = Link configuration +linkConfig.help = Parameters required to connect to an FTP server. + +# FTP server hostname +linkConfig.server.label = FTP server hostname +linkConfig.server.help = Hostname for the FTP server. + +# FTP server port +linkConfig.port.label = FTP server port (21) +linkConfig.port.help = Port for the FTP server. 21 by default. + +# username string +linkConfig.username.label = Username +linkConfig.username.help = Enter the username to be used for connecting to the \ + FTP server. + +# password string +linkConfig.password.label = Password +linkConfig.password.help = Enter the password to be used for connecting to the \ + FTP server. + +# To Job Config +# +toJobConfig.label = To FTP configuration +toJobConfig.help = Parameters required to store data on the FTP server. + +toJobConfig.outputDirectory.label = Output directory +toJobConfig.outputDirectory.help = Directory on the FTP server to write data to. + +toJobConfig.ignored.label = Ignored +toJobConfig.ignored.help = This value is ignored. http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/resources/log4j.properties b/connector/connector-ftp/src/main/resources/log4j.properties new file mode 100644 index 0000000..c62f102 --- /dev/null +++ b/connector/connector-ftp/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to INFO and its only appender to A1. +log4j.rootLogger=INFO, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/sqoopconnector.properties ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/main/resources/sqoopconnector.properties b/connector/connector-ftp/src/main/resources/sqoopconnector.properties new file mode 100644 index 0000000..0864f9f --- /dev/null +++ b/connector/connector-ftp/src/main/resources/sqoopconnector.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# FTP Connector Properties +org.apache.sqoop.connector.class = org.apache.sqoop.connector.ftp.FtpConnector +org.apache.sqoop.connector.name = ftp-connector http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java new file mode 100644 index 0000000..33c808a --- /dev/null +++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp; + +import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration; +import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration; +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.LoaderContext; + +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.mockftpserver.fake.filesystem.DirectoryEntry; +import org.mockftpserver.fake.filesystem.FileSystem; +import org.mockftpserver.fake.filesystem.UnixFakeFileSystem; +import org.mockftpserver.fake.FakeFtpServer; +import org.mockftpserver.fake.UserAccount; + +/** + * Unit tests for {@link org.apache.sqoop.connector.ftp.Loader} class. + * + * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide + * a mock FTP server implementation. + */ +public class TestFtpLoader { + + private FakeFtpServer fakeFtpServer; + private int port; + private String username = "user"; + private String password = "pass"; + private FtpLoader loader; + + @Test + public void testLoader() { + + final int NUMBER_OF_ROWS = 1000; + + DataReader reader = new DataReader() { + private long index = 0L; + @Override + public Object[] readArrayRecord() { + return null; + } + + @Override + public String readTextRecord() { + if (index++ < NUMBER_OF_ROWS) { + return index + "," + (double)index + ",'" + index + "'"; + } else { + return null; + } + } + + @Override + public Object readContent() { + return null; + } + }; + + try { + LoaderContext context = new LoaderContext(null, reader, null); + LinkConfiguration linkConfig = new LinkConfiguration(); + linkConfig.linkConfig.username = username; + linkConfig.linkConfig.password = password; + linkConfig.linkConfig.server = "localhost"; + linkConfig.linkConfig.port = port; + ToJobConfiguration jobConfig = new ToJobConfiguration(); + jobConfig.toJobConfig.outputDirectory = "uploads"; + loader.load(context, linkConfig, jobConfig); + Long rowsWritten = loader.getRowsWritten(); + Assert.assertTrue(rowsWritten == NUMBER_OF_ROWS, + ("actual rows written=" + rowsWritten + " instead of " + + NUMBER_OF_ROWS)); + } catch(Exception e) { + Assert.fail("caught exception: " + e.getMessage()); + } + } + + /** + * Create mock FTP server for testing, and add a user account for testing. + */ + @BeforeClass(alwaysRun = true) + public void setUp() throws Exception { + + loader = new FtpLoader(); + + fakeFtpServer = new FakeFtpServer(); + fakeFtpServer.setServerControlPort(0); + + FileSystem fileSystem = new UnixFakeFileSystem(); + fileSystem.add(new DirectoryEntry("/uploads")); + fakeFtpServer.setFileSystem(fileSystem); + + UserAccount userAccount = new UserAccount(username, password, "/"); + fakeFtpServer.addUserAccount(userAccount); + + fakeFtpServer.start(); + port = fakeFtpServer.getServerControlPort(); + } + + /** + * Stop mock FTP server. + */ + @AfterClass(alwaysRun = true) + public void tearDown() throws Exception { + fakeFtpServer.stop(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java new file mode 100644 index 0000000..b926a1d --- /dev/null +++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp.configuration; + +import org.apache.sqoop.validation.ConfigValidationResult; +import org.apache.sqoop.validation.ConfigValidationRunner; +import org.apache.sqoop.validation.Status; + +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.mockftpserver.fake.filesystem.FileEntry; +import org.mockftpserver.fake.filesystem.FileSystem; +import org.mockftpserver.fake.filesystem.UnixFakeFileSystem; +import org.mockftpserver.fake.FakeFtpServer; +import org.mockftpserver.fake.UserAccount; + +/** + * Unit tests for {@link org.apache.sqoop.connector.ftp.configuration.LinkConfiguration}. + * + * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide + * a mock FTP server implementation. + */ +public class TestLinkConfiguration { + + private FakeFtpServer fakeFtpServer; + private int port; + private String username = "user"; + private String password = "pass"; + + /** + * Test valid configuration. + */ + @Test + public void testValidConfig() { + ConfigValidationRunner runner = new ConfigValidationRunner(); + ConfigValidationResult result; + LinkConfiguration config = new LinkConfiguration(); + config.linkConfig.username = username; + config.linkConfig.password = password; + config.linkConfig.server = "localhost"; + config.linkConfig.port = port; + result = runner.validate(config); + Assert.assertTrue(result.getStatus() == Status.OK, + "Test of valid configuration failed"); + } + + /** + * Test empty username. + */ + @Test + public void testEmptyUsername() { + ConfigValidationRunner runner = new ConfigValidationRunner(); + ConfigValidationResult result; + LinkConfiguration config = new LinkConfiguration(); + config.linkConfig.username = ""; + config.linkConfig.password = password; + config.linkConfig.server = "localhost"; + config.linkConfig.port = port; + result = runner.validate(config); + Assert.assertFalse(result.getStatus() == Status.OK, + "Test of empty username failed"); + } + + /** + * Test invalid username. + */ + @Test + public void testInvalidUsername() { + ConfigValidationRunner runner = new ConfigValidationRunner(); + ConfigValidationResult result; + LinkConfiguration config = new LinkConfiguration(); + config.linkConfig.username = "baduser"; + config.linkConfig.password = password; + config.linkConfig.server = "localhost"; + config.linkConfig.port = port; + result = runner.validate(config); + Assert.assertFalse(result.getStatus() == Status.OK, + "Test of invalid username failed"); + } + + /** + * Test empty server. + */ + @Test + public void TestEmptyServer() { + ConfigValidationRunner runner = new ConfigValidationRunner(); + ConfigValidationResult result; + LinkConfiguration config = new LinkConfiguration(); + config.linkConfig.username = username; + config.linkConfig.password = password; + config.linkConfig.server = ""; + config.linkConfig.port = port; + result = runner.validate(config); + Assert.assertFalse(result.getStatus() == Status.OK, + "Test of empty server name failed"); + } + + /** + * Create mock FTP server for testing, and add a user account for testing. + */ + @BeforeClass(alwaysRun = true) + public void setUp() throws Exception { + fakeFtpServer = new FakeFtpServer(); + fakeFtpServer.setServerControlPort(0); + + FileSystem fileSystem = new UnixFakeFileSystem(); + fileSystem.add(new FileEntry("/home/user/file.txt", "abcdef")); + fakeFtpServer.setFileSystem(fileSystem); + + UserAccount userAccount = new UserAccount(username, password, "/"); + fakeFtpServer.addUserAccount(userAccount); + + fakeFtpServer.start(); + port = fakeFtpServer.getServerControlPort(); + } + + /** + * Stop mock FTP server. + */ + @AfterClass(alwaysRun = true) + public void tearDown() throws Exception { + fakeFtpServer.stop(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java new file mode 100644 index 0000000..2870a16 --- /dev/null +++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp.configuration; + +import org.apache.sqoop.validation.ConfigValidationResult; +import org.apache.sqoop.validation.ConfigValidationRunner; + +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Unit tests for {@link org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration} class. + */ +public class TestToJobConfiguration { + + /** + * Test a non-empty directory name. + */ + @Test + public void testValidDirectory() { + ConfigValidationRunner runner = new ConfigValidationRunner(); + ConfigValidationResult result; + ToJobConfiguration config = new ToJobConfiguration(); + config.toJobConfig.outputDirectory = "testdir"; + result = runner.validate(config); + Assert.assertTrue(result.getStatus().canProceed(), + "Test of valid directory failed"); + } + + /** + * Test an invalid, empty directory name. + */ + @Test + public void testEmptyDirectory() { + ConfigValidationRunner runner = new ConfigValidationRunner(); + ConfigValidationResult result; + ToJobConfiguration config = new ToJobConfiguration(); + config.toJobConfig.outputDirectory = ""; + result = runner.validate(config); + Assert.assertFalse(result.getStatus().canProceed(), + "Test of empty directory failed"); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java new file mode 100644 index 0000000..b9430b3 --- /dev/null +++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.ftp.ftpclient; + +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.etl.io.DataReader; + +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.mockftpserver.fake.filesystem.DirectoryEntry; +import org.mockftpserver.fake.filesystem.FileSystem; +import org.mockftpserver.fake.filesystem.UnixFakeFileSystem; +import org.mockftpserver.fake.FakeFtpServer; +import org.mockftpserver.fake.UserAccount; + +/** + * Unit tests for {@link org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient} class. + * + * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide + * a mock FTP server implementation. + * + * Note that this duplicates other tests currently, but leaving for now in case + * additional functionality is added to FtpConnectorClient. + */ +public class TestFtpConnectorClient { + + private FakeFtpServer fakeFtpServer; + private int port; + private String username = "user"; + private String password = "pass"; + + /** + * Test connect and login to FTP server. + */ + @Test + public void testValidLogin() { + try { + FtpConnectorClient client = new FtpConnectorClient("localhost", port); + client.connect(username, password); + client.disconnect(); + } catch (SqoopException e) { + Assert.fail("login failed " + e.getMessage()); + } + } + + /** + * Test invalid login to FTP server. + */ + @Test + public void testInvalidLogin() { + try { + FtpConnectorClient client = new FtpConnectorClient("localhost", port); + client.connect("baduser", "badpass"); + client.disconnect(); + Assert.fail("expected exception for invalid login"); + } catch (SqoopException e) { + // Expected + } + } + + /** + * Test streaming upload. + */ + @Test + public void testUploadStream() { + + final int NUMBER_OF_ROWS = 1000; + + DataReader reader = new DataReader() { + private long index = 0L; + @Override + public Object[] readArrayRecord() { + return null; + } + + @Override + public String readTextRecord() { + if (index++ < NUMBER_OF_ROWS) { + return index + "," + (double)index + ",'" + index + "'"; + } else { + return null; + } + } + + @Override + public Object readContent() { + return null; + } + }; + + try { + FtpConnectorClient client = new FtpConnectorClient("localhost", port); + client.connect(username, password); + long rowsWritten = client.uploadStream(reader, "/uploads/test.txt"); + client.disconnect(); + Assert.assertTrue(rowsWritten == NUMBER_OF_ROWS, + ("actual rows written=" + rowsWritten + " instead of " + + NUMBER_OF_ROWS)); + } catch(Exception e) { + Assert.fail("caught exception: " + e.getMessage()); + } + } + + /** + * Create mock FTP server for testing, and add a user account for testing. + */ + @BeforeClass(alwaysRun = true) + public void setUp() throws Exception { + fakeFtpServer = new FakeFtpServer(); + fakeFtpServer.setServerControlPort(0); + + FileSystem fileSystem = new UnixFakeFileSystem(); + fileSystem.add(new DirectoryEntry("/uploads")); + fakeFtpServer.setFileSystem(fileSystem); + + UserAccount userAccount = new UserAccount(username, password, "/"); + fakeFtpServer.addUserAccount(userAccount); + + fakeFtpServer.start(); + port = fakeFtpServer.getServerControlPort(); + } + + /** + * Stop mock FTP server. + */ + @AfterClass(alwaysRun = true) + public void tearDown() throws Exception { + fakeFtpServer.stop(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/connector/connector-ftp/src/test/resources/log4j.properties b/connector/connector-ftp/src/test/resources/log4j.properties new file mode 100644 index 0000000..44ffced --- /dev/null +++ b/connector/connector-ftp/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/pom.xml ---------------------------------------------------------------------- diff --git a/connector/pom.xml b/connector/pom.xml index c999061..1b69180 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -38,6 +38,7 @@ limitations under the License. <module>connector-hdfs</module> <module>connector-kite</module> <module>connector-kafka</module> + <module>connector-ftp</module> <module>connector-sftp</module> <!-- Uncomment and finish connectors after sqoop framework will become stable <module>connector-mysql-jdbc</module> http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/docs/src/site/sphinx/Connectors.rst ---------------------------------------------------------------------- diff --git a/docs/src/site/sphinx/Connectors.rst b/docs/src/site/sphinx/Connectors.rst index 4e24793..721e92a 100644 --- a/docs/src/site/sphinx/Connectors.rst +++ b/docs/src/site/sphinx/Connectors.rst @@ -459,3 +459,66 @@ Loader ------ During the *loading* phase, the connector will create uniquely named files in the *output directory* for each partition of data received from the **FROM** connector. + +++++++++++++++ +FTP Connector +++++++++++++++ + +The FTP connector supports moving data between an FTP server and other supported Sqoop2 connectors. + +Currently only the TO direction is supported to write records to an FTP server. A FROM connector is pending (SQOOP-2127). + +----- +Usage +----- + +To use the FTP Connector, create a link for the connector and a job that uses the link. + +**Link Configuration** +++++++++++++++++++++++ + +Inputs associated with the link configuration include: + ++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+ +| Input | Type | Description | Example | ++=============================+=========+=======================================================================+============================+ +| FTP server hostname | String | Hostname for the FTP server. | ftp.example.com | +| | | *Required*. | | ++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+ +| FTP server port | Integer | Port number for the FTP server. Defaults to 21. | 2100 | +| | | *Optional*. | | ++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+ +| Username | String | The username to provide when connecting to the FTP server. | sqoop | +| | | *Required*. | | ++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+ +| Password | String | The password to provide when connecting to the FTP server. | sqoop | +| | | *Required* | | ++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+ + +**Notes** +========= + +1. The FTP connector will attempt to connect to the FTP server as part of the link validation process. If for some reason a connection can not be established, you'll see a corresponding warning message. + +**TO Job Configuration** +++++++++++++++++++++++++ + +Inputs associated with the Job configuration for the TO direction include: + ++-----------------------------+---------+-------------------------------------------------------------------------+-----------------------------------+ +| Input | Type | Description | Example | ++=============================+=========+=========================================================================+===================================+ +| Output directory | String | The location on the FTP server that the connector will write files to. | uploads | +| | | *Required* | | ++-----------------------------+---------+-------------------------------------------------------------------------+-----------------------------------+ + +**Notes** +========= + +1. The *output directory* value needs to be an existing directory on the FTP server. + +------ +Loader +------ + +During the *loading* phase, the connector will create uniquely named files in the *output directory* for each partition of data received from the **FROM** connector. http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 955276a..531009a 100644 --- a/pom.xml +++ b/pom.xml @@ -319,6 +319,17 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-ftp</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-ftp</artifactId> + <type>test-jar</type> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.sqoop.connector</groupId> <artifactId>sqoop-connector-kite</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index 416b5c0..aabefc0 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -99,6 +99,11 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.sqoop.connector</groupId> + <artifactId>sqoop-connector-ftp</artifactId> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope>
