Repository: sqoop Updated Branches: refs/heads/sqoop2 6ca31c505 -> f98fc2885
SQOOP-1738: Sqoop2: HDFS Connector : Check for output directory (Jarek Jarcec Cecho via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/f98fc288 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/f98fc288 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/f98fc288 Branch: refs/heads/sqoop2 Commit: f98fc28859fa58037f44017fb9934a403567442d Parents: 6ca31c5 Author: Abraham Elmahrek <[email protected]> Authored: Tue Mar 17 12:39:30 2015 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Tue Mar 17 12:39:30 2015 -0700 ---------------------------------------------------------------------- .../sqoop/error/code/HdfsConnectorError.java | 4 +- .../sqoop/connector/hdfs/HdfsToInitializer.java | 28 +++ .../sqoop/connector/hdfs/TestToInitializer.java | 67 +++++++ .../connector/hdfs/OutputDirectoryTest.java | 177 +++++++++++++++++++ 4 files changed, 275 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/f98fc288/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java index 8514541..c85e7fc 100644 --- a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java +++ b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java @@ -32,7 +32,9 @@ public enum HdfsConnectorError implements ErrorCode{ GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"), /** Error occurs during loader run */ GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run"), - GENERIC_HDFS_CONNECTOR_0006("Unknown job type") + GENERIC_HDFS_CONNECTOR_0006("Unknown job type"), + + GENERIC_HDFS_CONNECTOR_0007("Invalid output directory"), ; http://git-wip-us.apache.org/repos/asf/sqoop/blob/f98fc288/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index ad500c2..83bac27 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -18,11 +18,18 @@ package org.apache.sqoop.connector.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; +import java.io.IOException; + public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> { /** * Initialize new submission based on given configuration properties. Any @@ -37,5 +44,26 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { Configuration configuration = HdfsUtils.createConfiguration(linkConfig); HdfsUtils.configurationToContext(configuration, context.getContext()); + + // Verification that given HDFS directory either don't exists or is empty + try { + FileSystem fs = FileSystem.get(configuration); + Path path = new Path(jobConfig.toJobConfig.outputDirectory); + + if(fs.exists(path)) { + if(fs.isFile(path)) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory already exists and is a file"); + } + + if(fs.isDirectory(path)) { + FileStatus[] fileStatuses = fs.listStatus(path); + if(fileStatuses.length != 0) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty"); + } + } + } + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e); + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/f98fc288/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java new file mode 100644 index 0000000..1daa25a --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import com.google.common.io.Files; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.testng.annotations.Test; + +import java.io.File; + +/** + * + */ +public class TestToInitializer extends TestHdfsBase { + + @Test(expectedExceptions = SqoopException.class) + public void testOutputDirectoryIsAFile() throws Exception { + File file = File.createTempFile("MastersOfOrion", ".txt"); + file.createNewFile(); + + LinkConfiguration linkConfig = new LinkConfiguration(); + ToJobConfiguration jobConfig = new ToJobConfiguration(); + + jobConfig.toJobConfig.outputDirectory = file.getAbsolutePath(); + + InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); + + Initializer initializer = new HdfsToInitializer(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + } + + @Test(expectedExceptions = SqoopException.class) + public void testOutputDirectoryIsNotEmpty() throws Exception { + File dir = Files.createTempDir(); + File file = File.createTempFile("MastersOfOrion", ".txt", dir); + + LinkConfiguration linkConfig = new LinkConfiguration(); + ToJobConfiguration jobConfig = new ToJobConfiguration(); + + jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath(); + + InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); + + Initializer initializer = new HdfsToInitializer(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/f98fc288/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java new file mode 100644 index 0000000..b454263 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import org.apache.hadoop.fs.Path; +import org.apache.sqoop.client.ClientError; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.error.code.HdfsConnectorError; +import org.apache.sqoop.model.MConfigList; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +/** + */ +public class OutputDirectoryTest extends ConnectorTestCase { + @Test + public void testOutputDirectoryIsAFile() throws Exception { + createAndLoadTableCities(); + + hdfsClient.createNewFile(new Path(getMapreduceDirectory())); + + // RDBMS link + MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsConnection); + saveLink(rdbmsConnection); + + // HDFS link + MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); + saveLink(hdfsConnection); + + // Job creation + MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId()); + + // Set rdbms "FROM" config + MConfigList fromConfig = job.getJobConfig(Direction.FROM); + fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName())); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + + // fill the hdfs "TO" config + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + + saveJob(job); + + assertJobSubmissionFailure(job, + HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(), + "is a file" + ); + + dropTable(); + } + + @Test + public void testOutputDirectoryIsNotEmpty() throws Exception { + createAndLoadTableCities(); + + hdfsClient.mkdirs(new Path(getMapreduceDirectory())); + hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x")); + + // RDBMS link + MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsConnection); + saveLink(rdbmsConnection); + + // HDFS link + MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); + saveLink(hdfsConnection); + + // Job creation + MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId()); + + // Set rdbms "FROM" config + MConfigList fromConfig = job.getJobConfig(Direction.FROM); + fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName())); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + + // fill the hdfs "TO" config + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + + saveJob(job); + + assertJobSubmissionFailure(job, + HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(), + "is not empty" + ); + + dropTable(); + } + + @Test + public void testOutputDirectoryIsEmpty() throws Exception { + createAndLoadTableCities(); + + hdfsClient.mkdirs(new Path(getMapreduceDirectory())); + + // RDBMS link + MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsConnection); + saveLink(rdbmsConnection); + + // HDFS link + MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); + saveLink(hdfsConnection); + + // Job creation + MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId()); + + // Set rdbms "FROM" config + MConfigList fromConfig = job.getJobConfig(Direction.FROM); + fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName())); + fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + + // fill the hdfs "TO" config + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + + saveJob(job); + + executeJob(job); + + // Assert correct output + assertTo( + "1,'USA','2004-10-23','San Francisco'", + "2,'USA','2004-10-24','Sunnyvale'", + "3,'Czech Republic','2004-10-25','Brno'", + "4,'USA','2004-10-26','Palo Alto'" + ); + + dropTable(); + } + + public void assertJobSubmissionFailure(MJob job, String ...fragments) throws Exception { + // Try to execute the job and verify that the it was not successful + try { + executeJob(job); + fail("Expected failure in the job submission."); + } catch (SqoopException ex) { + // Top level exception should be CLIENT_0001 + assertEquals(ClientError.CLIENT_0001, ex.getErrorCode()); + + // We can directly verify the ErrorCode from SqoopException as client side + // is not rebuilding SqoopExceptions per missing ErrorCodes. E.g. the cause + // will be generic Throwable and not SqoopException instance. + Throwable cause = ex.getCause(); + assertNotNull(cause); + + for(String fragment : fragments) { + assertTrue(cause.getMessage().contains(fragment), "Expected fragment " + fragment + " in error message " + cause.getMessage()); + } + } + } +}
