Updated Branches: refs/heads/sqoop2 9addddfe3 -> 39eb1e56d
SQOOP-927: Sqoop2: Integration: Mapreduce specific tests should be running on MiniCluster (Abraham Elmahrek via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/39eb1e56 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/39eb1e56 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/39eb1e56 Branch: refs/heads/sqoop2 Commit: 39eb1e56d3898ed5663a58b319e5055109d3b7d2 Parents: 9addddf Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Oct 22 16:41:34 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Oct 22 16:41:34 2013 -0700 ---------------------------------------------------------------------- pom.xml | 7 +- .../apache/sqoop/test/asserts/HdfsAsserts.java | 20 ++- .../sqoop/test/hadoop/HadoopLocalRunner.java | 50 ++++++++ .../test/hadoop/HadoopMiniClusterRunner.java | 93 ++++++++++++++ .../apache/sqoop/test/hadoop/HadoopRunner.java | 128 +++++++++++++++++++ .../sqoop/test/hadoop/HadoopRunnerFactory.java | 38 ++++++ .../sqoop/test/testcases/ConnectorTestCase.java | 17 +++ .../sqoop/test/testcases/TomcatTestCase.java | 63 +++++++-- .../org/apache/sqoop/test/utils/HdfsUtils.java | 58 ++++++--- 9 files changed, 432 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 20d5e4e..4470331 100644 --- a/pom.xml +++ b/pom.xml @@ -164,6 +164,11 @@ limitations under the License. <version>${hadoop.1.version}</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.1.version}</version> + </dependency> </dependencies> </dependencyManagement> </profile> @@ -485,7 +490,7 @@ limitations under the License. <version>2.12</version> <configuration> <forkMode>always</forkMode> - <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds> + <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds> <redirectTestOutputToFile>true</redirectTestOutputToFile> <argLine>-Xms256m -Xmx1g</argLine> </configuration> http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java b/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java index c175272..d8f2b8d 100644 --- a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java +++ b/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java @@ -18,12 +18,14 @@ package org.apache.sqoop.test.asserts; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.sqoop.test.utils.HdfsUtils; import java.io.BufferedReader; -import java.io.FileReader; import java.io.IOException; +import java.io.InputStreamReader; import java.util.Arrays; import java.util.HashSet; import java.util.LinkedList; @@ -35,8 +37,6 @@ import static org.junit.Assert.fail; /** * Assert methods suitable for checking HDFS files and directories. - * - * TODO: This module will require clean up to work on MiniCluster/Real cluster. */ public class HdfsAsserts { @@ -49,15 +49,13 @@ public class HdfsAsserts { * @param lines Expected lines * @throws IOException */ - public static void assertMapreduceOutput(String directory, String... lines) throws IOException { + public static void assertMapreduceOutput(FileSystem fs, String directory, String... lines) throws IOException { Set<String> setLines = new HashSet<String>(Arrays.asList(lines)); List<String> notFound = new LinkedList<String>(); - String []files = HdfsUtils.getOutputMapreduceFiles(directory); - - for(String file : files) { - String filePath = directory + "/" + file; - BufferedReader br = new BufferedReader(new FileReader((filePath))); + Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory); + for(Path file : files) { + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(file))); String line; while ((line = br.readLine()) != null) { @@ -83,8 +81,8 @@ public class HdfsAsserts { * @param directory Mapreduce output directory * @param expectedFiles Expected number of files */ - public static void assertMapreduceOutputFiles(String directory, int expectedFiles) { - String []files = HdfsUtils.getOutputMapreduceFiles(directory); + public static void assertMapreduceOutputFiles(FileSystem fs, String directory, int expectedFiles) throws IOException { + Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory); assertEquals(expectedFiles, files.length); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopLocalRunner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopLocalRunner.java b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopLocalRunner.java new file mode 100644 index 0000000..44465b4 --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopLocalRunner.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.test.utils.HdfsUtils; + +/** + * Represents a local cluster. + * It uses an unchanged Configuration object. + * HadoopRunner implementation that is using LocalJobRunner for executing mapreduce jobs and local filesystem instead of HDFS. + */ +public class HadoopLocalRunner extends HadoopRunner { + + @Override + public Configuration prepareConfiguration(Configuration conf) { + return conf; + } + + @Override + public void start() throws Exception { + // Do nothing! + } + + @Override + public void stop() throws Exception { + // Do nothing! + } + + @Override + public String getTestDirectory() { + return HdfsUtils.joinPathFragments(getTemporaryPath(), "/mapreduce-job-io"); + } + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopMiniClusterRunner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopMiniClusterRunner.java b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopMiniClusterRunner.java new file mode 100644 index 0000000..b06dcab --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopMiniClusterRunner.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.log4j.Logger; + +/** + * Represents a minicluster setup. + * It creates a configuration object and mutates it. + * Clients that need to connect to the miniclusters should use + * the provided configuration object. + */ +public class HadoopMiniClusterRunner extends HadoopRunner { + private static final Logger LOG = Logger.getLogger(HadoopMiniClusterRunner.class); + + /** + * Hadoop HDFS cluster + */ + protected MiniDFSCluster dfsCluster; + + /** + * Hadoop MR cluster + */ + protected MiniMRCluster mrCluster; + + @Override + public Configuration prepareConfiguration(Configuration config) { + config.set("dfs.block.access.token.enable", "false"); + config.set("dfs.permissions", "true"); + config.set("hadoop.security.authentication", "simple"); + config.set("mapred.tasktracker.map.tasks.maximum", "1"); + config.set("mapred.tasktracker.reduce.tasks.maximum", "1"); + config.set("mapred.submit.replication", "1"); + config.set("yarn.resourcemanager.scheduler.class", "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + config.set("yarn.application.classpath", System.getProperty("java.class.path")); + return config; + } + + @SuppressWarnings("deprecation") + @Override + public void start() throws Exception { + System.setProperty("test.build.data", getDataDir()); + LOG.info("test.build.data set to: " + getDataDir()); + + System.setProperty("hadoop.log.dir", getLogDir()); + LOG.info("log dir set to: " + getLogDir()); + + // Start DFS server + LOG.info("Starting DFS cluster..."); + dfsCluster = new MiniDFSCluster(config, 1, true, null); + if (dfsCluster.isClusterUp()) { + LOG.info("Started DFS cluster on port: " + dfsCluster.getNameNodePort()); + } else { + LOG.error("Could not start DFS cluster"); + } + + // Start MR server + LOG.info("Starting MR cluster"); + mrCluster = new MiniMRCluster(0, 0, 1, dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, new JobConf(config)); + LOG.info("Started MR cluster"); + config = prepareConfiguration(mrCluster.createJobConf()); + } + + @Override + public void stop() throws Exception { + LOG.info("Stopping MR cluster"); + mrCluster.shutdown(); + LOG.info("Stopped MR cluster"); + + LOG.info("Stopping DFS cluster"); + dfsCluster.shutdown(); + LOG.info("Stopped DFS cluster"); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunner.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunner.java b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunner.java new file mode 100644 index 0000000..2516ff1 --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunner.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.test.utils.HdfsUtils; + +/** + * Hadoop cluster runner for testing purpose. + * + * Runner provides methods for bootstrapping and using Hadoop cluster. + * This abstract implementation is agnostic about in what mode Hadoop is running. + * Each mode will have it's own concrete implementation (for example LocalJobRunner, MiniCluster or Real existing cluster). + */ +public abstract class HadoopRunner { + + /** + * Temporary path that can be used as a root for other directories storing various data like logs or stored HDFS files. + */ + private String temporaryPath; + + /** + * Configuration object for Hadoop. + */ + protected Configuration config = null; + + /** + * Prepare configuration object. + * This method should be called once before the start method is called. + * + * @param config is the configuration object to prepare. + */ + abstract public Configuration prepareConfiguration(Configuration config); + + /** + * Start hadoop cluster. + * + * @throws Exception + */ + abstract public void start() throws Exception; + + /** + * Stop hadoop cluster. + * + * @throws Exception + */ + abstract public void stop() throws Exception; + + /** + * Return working directory on HDFS instance that this HadoopRunner is using. + * + * This directory might be on local filesystem in case of local mode. + */ + public String getTestDirectory() { + return "/mapreduce-job-io"; + } + + /** + * Get temporary path. + * + * @return + */ + public String getTemporaryPath() { + return temporaryPath; + } + + /** + * Set temporary path. + * + * @param temporaryPath + */ + public void setTemporaryPath(String temporaryPath) { + this.temporaryPath = temporaryPath; + } + + /** + * Return directory on local filesystem where logs and other + * data generated by the Hadoop Cluster should be stored. + * + * @return + */ + public String getDataDir() { + return HdfsUtils.joinPathFragments(temporaryPath, "data"); + } + + /** + * Return directory on local filesystem where logs and other + * data generated by the Hadoop Cluster should be stored. + * + * @return + */ + public String getLogDir() { + return HdfsUtils.joinPathFragments(temporaryPath, "log"); + } + + /** + * Get hadoop configuration. + * + * @return + */ + public Configuration getConfiguration() { + return config; + } + + /** + * Set the configuration object that should be used with Miniclusters. + * + * @param config + */ + public void setConfiguration(Configuration config) { + this.config = config; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunnerFactory.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunnerFactory.java b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunnerFactory.java new file mode 100644 index 0000000..020fa3f --- /dev/null +++ b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunnerFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.test.hadoop; + +import java.util.Properties; + +/** + * Create database provider. + */ +public class HadoopRunnerFactory { + + public static final String CLUSTER_CLASS_PROPERTY = "sqoop.hadoop.runner.class"; + + public static HadoopRunner getHadoopCluster(Properties properties, Class<? extends HadoopRunner> defaultClusterClass) throws ClassNotFoundException, IllegalAccessException, InstantiationException { + String className = properties.getProperty(CLUSTER_CLASS_PROPERTY); + if(className == null) { + return defaultClusterClass.newInstance(); + } + + Class<?> klass = Class.forName(className); + return (HadoopRunner)klass.newInstance(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java index d10b942..5ec4fa4 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.test.testcases; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; import org.apache.log4j.Logger; import org.apache.sqoop.client.SubmissionCallback; import org.apache.sqoop.framework.configuration.OutputFormat; @@ -31,6 +33,8 @@ import org.apache.sqoop.test.data.Cities; import org.apache.sqoop.test.data.UbuntuReleases; import org.apache.sqoop.test.db.DatabaseProvider; import org.apache.sqoop.test.db.DatabaseProviderFactory; +import org.apache.sqoop.test.hadoop.HadoopMiniClusterRunner; +import org.apache.sqoop.test.hadoop.HadoopRunnerFactory; import org.apache.sqoop.validation.Status; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -71,6 +75,19 @@ abstract public class ConnectorTestCase extends TomcatTestCase { }; @BeforeClass + public static void startHadoop() throws Exception { + // Start Hadoop Clusters + hadoopCluster = HadoopRunnerFactory.getHadoopCluster(System.getProperties(), HadoopMiniClusterRunner.class); + hadoopCluster.setTemporaryPath(TMP_PATH_BASE); + hadoopCluster.setConfiguration( hadoopCluster.prepareConfiguration(new JobConf()) ); + hadoopCluster.start(); + + // Initialize Hdfs Client + hdfsClient = FileSystem.get(hadoopCluster.getConfiguration()); + LOG.debug("HDFS Client: " + hdfsClient); + } + + @BeforeClass public static void startProvider() throws Exception { provider = DatabaseProviderFactory.getProvider(System.getProperties()); LOG.info("Starting database provider: " + provider.getClass().getName()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java index efdfed4..7e2558f 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java @@ -17,24 +17,30 @@ */ package org.apache.sqoop.test.testcases; +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; import org.apache.log4j.Logger; import org.apache.sqoop.client.SqoopClient; import org.apache.sqoop.test.asserts.HdfsAsserts; +import org.apache.sqoop.test.hadoop.HadoopRunner; +import org.apache.sqoop.test.hadoop.HadoopRunnerFactory; +import org.apache.sqoop.test.hadoop.HadoopLocalRunner; import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster; import org.apache.sqoop.test.utils.HdfsUtils; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.rules.TestName; -import java.io.IOException; - /** * Basic test case that will bootstrap Sqoop server running in external Tomcat * process. */ abstract public class TomcatTestCase { - private static final Logger LOG = Logger.getLogger(TomcatTestCase.class); @Rule public TestName name = new TestName(); @@ -47,7 +53,7 @@ abstract public class TomcatTestCase { * pick up configured java.io.tmpdir value. The last results is /tmp/ directory * in case that no property is set. */ - private static final String TMP_PATH_BASE = + protected static final String TMP_PATH_BASE = System.getProperty("sqoop.integration.tmpdir", System.getProperty("java.io.tmpdir", "/tmp")) + "/sqoop-cargo-tests/"; /** @@ -62,6 +68,16 @@ abstract public class TomcatTestCase { private String tmpPath; /** + * Hadoop cluster + */ + protected static HadoopRunner hadoopCluster; + + /** + * Hadoop client + */ + protected static FileSystem hdfsClient; + + /** * Tomcat based Sqoop mini cluster */ private TomcatSqoopMiniCluster cluster; @@ -71,13 +87,27 @@ abstract public class TomcatTestCase { */ private SqoopClient client; + @BeforeClass + public static void startHadoop() throws Exception { + // Start Hadoop Clusters + hadoopCluster = HadoopRunnerFactory.getHadoopCluster(System.getProperties(), HadoopLocalRunner.class); + hadoopCluster.setTemporaryPath(TMP_PATH_BASE); + hadoopCluster.setConfiguration( hadoopCluster.prepareConfiguration(new JobConf()) ); + hadoopCluster.start(); + + // Initialize Hdfs Client + hdfsClient = FileSystem.get(hadoopCluster.getConfiguration()); + LOG.debug("HDFS Client: " + hdfsClient); + } + @Before public void startServer() throws Exception { - // Set up the temporary path - tmpPath = TMP_PATH_BASE + getClass().getName() + "/" + name.getMethodName() + "/"; + // Get and set temporary path in hadoop cluster. + tmpPath = HdfsUtils.joinPathFragments(TMP_PATH_BASE, getClass().getName(), name.getMethodName()); + LOG.debug("Temporary Directory: " + tmpPath); - // Set up and start server - cluster = new TomcatSqoopMiniCluster(getTemporaryPath()); + // Start server + cluster = new TomcatSqoopMiniCluster(tmpPath, hadoopCluster.getConfiguration()); cluster.start(); // Initialize Sqoop Client API @@ -89,6 +119,11 @@ abstract public class TomcatTestCase { cluster.stop(); } + @AfterClass + public static void stopHadoop() throws Exception { + hadoopCluster.stop(); + } + /** * Return SqoopClient configured to talk to testing server. * @@ -112,12 +147,12 @@ abstract public class TomcatTestCase { } /** - * Get input/output directory for mapreduce job. + * Return mapreduce base directory. * * @return */ public String getMapreduceDirectory() { - return getTemporaryPath() + "/mapreduce-job-io"; + return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(), name.getMethodName()); } /** @@ -130,7 +165,7 @@ abstract public class TomcatTestCase { * @throws IOException */ protected void assertMapreduceOutput(String... lines) throws IOException { - HdfsAsserts.assertMapreduceOutput(getMapreduceDirectory(), lines); + HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), lines); } /** @@ -138,8 +173,8 @@ abstract public class TomcatTestCase { * * @param expectedFiles Expected number of files */ - protected void assertMapreduceOutputFiles(int expectedFiles) { - HdfsAsserts.assertMapreduceOutputFiles(getMapreduceDirectory(), expectedFiles); + protected void assertMapreduceOutputFiles(int expectedFiles) throws IOException { + HdfsAsserts.assertMapreduceOutputFiles(hdfsClient, getMapreduceDirectory(), expectedFiles); } /** @@ -150,6 +185,6 @@ abstract public class TomcatTestCase { * @throws IOException */ protected void createInputMapreduceFile(String filename, String...lines) throws IOException { - HdfsUtils.createFile(getMapreduceDirectory(), filename, lines); + HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java b/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java index 95dd177..59c5f15 100644 --- a/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java +++ b/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java @@ -17,50 +17,76 @@ */ package org.apache.sqoop.test.utils; -import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; -import java.io.File; -import java.io.FilenameFilter; +import java.io.BufferedWriter; +import java.io.FileNotFoundException; import java.io.IOException; -import java.util.Arrays; +import java.io.OutputStreamWriter; +import java.util.LinkedList; /** * Handy utilities to work with HDFS - * - * TODO: This module will require clean up to work on MiniCluster/Real cluster. */ public class HdfsUtils { + @SuppressWarnings("unused") private static final Logger LOG = Logger.getLogger(HdfsUtils.class); + private static final char PATH_SEPARATOR = '/'; + /** * Get list of mapreduce output files from given directory. * * @param directory Directory to be searched for files generated by MR * @return + * @throws IOException + * @throws FileNotFoundException */ - public static String [] getOutputMapreduceFiles(String directory) { - File dir = new File(directory); - return dir.list(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith("part-"); + public static Path [] getOutputMapreduceFiles(FileSystem fs, String directory) throws FileNotFoundException, IOException { + LinkedList<Path> files = new LinkedList<Path>(); + for (FileStatus fileStatus : fs.listStatus(new Path(directory))) { + if (fileStatus.getPath().getName().startsWith("part-")) { + files.add(fileStatus.getPath()); } - }); + } + return files.toArray(new Path[files.size()]); } /** * Create HDFS file with given content. * + * @param fs filesystem object * @param directory Directory where the file should be created * @param filename File name * @param lines Individual lines that should be written into the file * @throws IOException */ - public static void createFile(String directory, String filename, String ...lines) throws IOException { - File outputFile = new File(directory, filename); - FileUtils.writeLines(outputFile, Arrays.asList(lines)); + public static void createFile(FileSystem fs, String path, String ...lines) throws IOException { + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path), true))); + for (String line : lines) { + writer.write(line); + writer.newLine(); + } + writer.close(); + } + + /** + * Join several path fragments together. + * @param paths + */ + public static String joinPathFragments(String ...paths){ + StringBuilder builder = new StringBuilder(); + for (String path : paths) { + builder.append(path); + if (path.charAt(path.length() - 1) != PATH_SEPARATOR) { + builder.append(PATH_SEPARATOR); + } + } + return builder.toString(); } private HdfsUtils() {
