Repository: sqoop Updated Branches: refs/heads/sqoop2 f98fc2885 -> a9f7b3ddd
SQOOP-2228: Sqoop2: HDFS Connector: Import data to temporary directory before moving them to target directory (Jarek Jarcec Cecho via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a9f7b3dd Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a9f7b3dd Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a9f7b3dd Branch: refs/heads/sqoop2 Commit: a9f7b3ddd0b00557901dd2268b66b799a4f1c684 Parents: f98fc28 Author: Abraham Elmahrek <[email protected]> Authored: Wed Mar 18 14:36:05 2015 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Wed Mar 18 14:36:05 2015 -0700 ---------------------------------------------------------------------- .../sqoop/error/code/HdfsConnectorError.java | 2 + .../sqoop/connector/hdfs/HdfsConstants.java | 3 + .../apache/sqoop/connector/hdfs/HdfsLoader.java | 2 +- .../sqoop/connector/hdfs/HdfsToDestroyer.java | 45 ++++++-- .../sqoop/connector/hdfs/HdfsToInitializer.java | 23 ++-- .../apache/sqoop/connector/hdfs/TestLoader.java | 4 +- .../sqoop/connector/hdfs/TestToDestroyer.java | 114 +++++++++++++++++++ .../sqoop/connector/hdfs/TestToInitializer.java | 21 ++++ 8 files changed, 196 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java index c85e7fc..6cd66cc 100644 --- a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java +++ b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java @@ -36,6 +36,8 @@ public enum HdfsConnectorError implements ErrorCode{ GENERIC_HDFS_CONNECTOR_0007("Invalid output directory"), + GENERIC_HDFS_CONNECTOR_0008("Error occurs during destroyer run"), + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java index bd74bec..9d20a79 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java @@ -30,4 +30,7 @@ public final class HdfsConstants extends Constants { public static final char DEFAULT_RECORD_DELIMITER = '\n'; + public static final String PREFIX = "org.apache.sqoop.connector.hdfs."; + + public static final String WORK_DIRECTORY = PREFIX + "work_dir"; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 0ced6d0..96913e8 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -57,7 +57,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { HdfsUtils.contextToConfiguration(context.getContext(), conf); DataReader reader = context.getDataReader(); - String directoryName = toJobConfig.toJobConfig.outputDirectory; + String directoryName = context.getString(HdfsConstants.WORK_DIRECTORY); String codecname = getCompressionCodecName(toJobConfig); CompressionCodec codec = null; http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java index 3c85be8..11b2ae3 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java @@ -17,22 +17,51 @@ */ package org.apache.sqoop.connector.hdfs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; +import java.io.IOException; + public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(HdfsToDestroyer.class); + /** - * Callback to clean up after job execution. - * - * @param context Destroyer context - * @param linkConfig link configuration object - * @param jobConfig TO job configuration object + * {@inheritDoc} */ @Override - public void destroy(DestroyerContext context, LinkConfiguration linkConfig, - ToJobConfiguration jobConfig) { - // do nothing at this point + public void destroy(DestroyerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { + Configuration configuration = new Configuration(); + HdfsUtils.contextToConfiguration(context.getContext(), configuration); + + String workingDirectory = context.getString(HdfsConstants.WORK_DIRECTORY); + Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory); + + try { + FileSystem fs = FileSystem.get(configuration); + + // If we succeeded, we need to move all files from working directory + if(context.isSuccess()) { + FileStatus[] fileStatuses = fs.listStatus(new Path(workingDirectory)); + for (FileStatus status : fileStatuses) { + LOG.info("Committing file: " + status.getPath().toString() + " of size " + status.getLen()); + fs.rename(status.getPath(), new Path(targetDirectory, status.getPath().getName())); + } + } + + // Clean up working directory + fs.delete(new Path(workingDirectory), true); + } catch (IOException e) { + throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0008, e); + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 83bac27..05ceb23 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; @@ -29,19 +30,22 @@ import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import java.io.IOException; +import java.util.UUID; public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> { + + private static final Logger LOG = Logger.getLogger(HdfsToInitializer.class); + /** - * Initialize new submission based on given configuration properties. Any - * needed temporary values might be saved to context object and they will be - * promoted to all other part of the workflow automatically. - * - * @param context Initializer context object - * @param linkConfig link configuration object - * @param jobConfig TO job configuration object + * {@inheritDoc} */ @Override public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) { + assert jobConfig != null; + assert linkConfig != null; + assert jobConfig.toJobConfig != null; + assert jobConfig.toJobConfig.outputDirectory != null; + Configuration configuration = HdfsUtils.createConfiguration(linkConfig); HdfsUtils.configurationToContext(configuration, context.getContext()); @@ -65,5 +69,10 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi } catch (IOException e) { throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e); } + + // Building working directory + String workingDirectory = jobConfig.toJobConfig.outputDirectory + "/." + UUID.randomUUID(); + LOG.info("Using working directory: " + workingDirectory); + context.getContext().setString(HdfsConstants.WORK_DIRECTORY, workingDirectory); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index b7c81ec..3b81715 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -103,6 +103,7 @@ public class TestLoader extends TestHdfsBase { .addColumn(new Text("col3")); Configuration conf = new Configuration(); + conf.set("org.apache.sqoop.job.connector.from.context." + HdfsConstants.WORK_DIRECTORY, outputDirectory); PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); LoaderContext context = new LoaderContext(prefixContext, new DataReader() { private long index = 0L; @@ -128,7 +129,6 @@ public class TestLoader extends TestHdfsBase { }, null); LinkConfiguration linkConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); - jobConf.toJobConfig.outputDirectory = outputDirectory; jobConf.toJobConfig.compression = compression; jobConf.toJobConfig.outputFormat = outputFormat; Path outputPath = new Path(outputDirectory); @@ -157,6 +157,7 @@ public class TestLoader extends TestHdfsBase { .addColumn(new Text("col4")); Configuration conf = new Configuration(); + conf.set("org.apache.sqoop.job.connector.from.context." + HdfsConstants.WORK_DIRECTORY, outputDirectory); PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); LoaderContext context = new LoaderContext(prefixContext, new DataReader() { private long index = 0L; @@ -187,7 +188,6 @@ public class TestLoader extends TestHdfsBase { }, schema); LinkConfiguration linkConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); - jobConf.toJobConfig.outputDirectory = outputDirectory; jobConf.toJobConfig.compression = compression; jobConf.toJobConfig.outputFormat = outputFormat; jobConf.toJobConfig.overrideNullValue = true; http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java new file mode 100644 index 0000000..e1f416e --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import com.google.common.io.Files; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; +import org.testng.annotations.Test; + +import java.io.File; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +/** + */ +public class TestToDestroyer { + + @Test + public void testDestroyOnSuccess() throws Exception { + File workDir = Files.createTempDir(); + File targetDir = Files.createTempDir(); + + File.createTempFile("part-01-", ".txt", workDir).createNewFile(); + File.createTempFile("part-02-", ".txt", workDir).createNewFile(); + File.createTempFile("part-03-", ".txt", workDir).createNewFile(); + + LinkConfiguration linkConfig = new LinkConfiguration(); + ToJobConfiguration jobConfig = new ToJobConfiguration(); + jobConfig.toJobConfig.outputDirectory = targetDir.getAbsolutePath(); + + MutableContext context = new MutableMapContext(); + context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath()); + + Destroyer destroyer = new HdfsToDestroyer(); + destroyer.destroy(new DestroyerContext(context, true, null), linkConfig, jobConfig); + + File[] files = targetDir.listFiles(); + + // We should see three files in the target directory + assertNotNull(files); + assertEquals(files.length, 3); + + // With expected file names + boolean f1 = false, f2 = false, f3 = false; + for(File f : files) { + if(f.getName().startsWith("part-01-")) { + f1 = true; + } + if(f.getName().startsWith("part-02-")) { + f2 = true; + } + if(f.getName().startsWith("part-03-")) { + f3 = true; + } + } + assertTrue(f1); + assertTrue(f2); + assertTrue(f3); + + // And target directory should not exists + assertFalse(workDir.exists()); + } + @Test + public void testDestroyOnFailure() throws Exception { + File workDir = Files.createTempDir(); + File targetDir = Files.createTempDir(); + + File.createTempFile("part-01-", ".txt", workDir).createNewFile(); + File.createTempFile("part-02-", ".txt", workDir).createNewFile(); + File.createTempFile("part-03-", ".txt", workDir).createNewFile(); + + LinkConfiguration linkConfig = new LinkConfiguration(); + ToJobConfiguration jobConfig = new ToJobConfiguration(); + jobConfig.toJobConfig.outputDirectory = targetDir.getAbsolutePath(); + + MutableContext context = new MutableMapContext(); + context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath()); + + Destroyer destroyer = new HdfsToDestroyer(); + destroyer.destroy(new DestroyerContext(context, false, null), linkConfig, jobConfig); + + File[] files = targetDir.listFiles(); + + // We should see no files in the target directory + assertNotNull(files); + assertEquals(files.length, 0); + + // And target directory should not exists + assertFalse(workDir.exists()); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java index 1daa25a..914c3ca 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java @@ -28,11 +28,32 @@ import org.testng.annotations.Test; import java.io.File; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + /** * */ public class TestToInitializer extends TestHdfsBase { + @Test + public void testWorkDirectoryBeingSet() { + final String TARGET_DIR = "/target/directory"; + + LinkConfiguration linkConfig = new LinkConfiguration(); + ToJobConfiguration jobConfig = new ToJobConfiguration(); + + jobConfig.toJobConfig.outputDirectory = TARGET_DIR; + + InitializerContext initializerContext = new InitializerContext(new MutableMapContext()); + + Initializer initializer = new HdfsToInitializer(); + initializer.initialize(initializerContext, linkConfig, jobConfig); + + assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY)); + assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(TARGET_DIR + "/.")); + } + @Test(expectedExceptions = SqoopException.class) public void testOutputDirectoryIsAFile() throws Exception { File file = File.createTempFile("MastersOfOrion", ".txt");
