Author: tomwhite
Date: Wed Jun 3 10:22:56 2009
New Revision: 781330
URL: http://svn.apache.org/viewvc?rev=781330&view=rev
Log:
HADOOP-5844. Use mysqldump when connecting to local mysql instance in Sqoop.
Contributed by Aaron Kimball.
Added:
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun 3 10:22:56 2009
@@ -131,6 +131,9 @@
HADOOP-4861. Add disk usage with human-readable size (-duh).
(Todd Lipcon via tomwhite)
+ HADOOP-5844. Use mysqldump when connecting to local mysql instance in
Sqoop.
+ (Aaron Kimball via tomwhite)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
Modified:
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
(original)
+++
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
Wed Jun 3 10:22:56 2009
@@ -21,6 +21,7 @@
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.manager.GenericJdbcManager;
import org.apache.hadoop.sqoop.manager.HsqldbManager;
+import org.apache.hadoop.sqoop.manager.LocalMySQLManager;
import org.apache.hadoop.sqoop.manager.MySQLManager;
import java.io.IOException;
@@ -70,7 +71,11 @@
}
if (scheme.equals("jdbc:mysql:")) {
- return new MySQLManager(opts);
+ if (opts.isLocal()) {
+ return new LocalMySQLManager(opts);
+ } else {
+ return new MySQLManager(opts);
+ }
} else if (scheme.equals("jdbc:hsqldb:hsql:")) {
return new HsqldbManager(opts);
} else {
Modified:
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
(original)
+++
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
Wed Jun 3 10:22:56 2009
@@ -92,6 +92,9 @@
private String driverClassName;
private String warehouseDir;
private FileLayout layout;
+ private boolean local; // if true and conn is mysql, use mysqldump.
+
+ private String tmpDir; // where temp data goes; usually /tmp
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
@@ -136,6 +139,10 @@
this.driverClassName = props.getProperty("jdbc.driver",
this.driverClassName);
this.warehouseDir = props.getProperty("hdfs.warehouse.dir",
this.warehouseDir);
+ String localImport = props.getProperty("local.import",
+ Boolean.toString(this.local)).toLowerCase();
+ this.local = "true".equals(localImport) || "yes".equals(localImport)
+ || "1".equals(localImport);
} catch (IOException ioe) {
LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": "
+ ioe.toString());
} finally {
@@ -156,11 +163,12 @@
this.hadoopHome = System.getenv("HADOOP_HOME");
this.codeOutputDir = System.getProperty("sqoop.src.dir", ".");
- String tmpDir = System.getProperty("test.build.data", "/tmp/");
- if (!tmpDir.endsWith(File.separator)) {
- tmpDir = tmpDir + File.separator;
+ String myTmpDir = System.getProperty("test.build.data", "/tmp/");
+ if (!myTmpDir.endsWith(File.separator)) {
+ myTmpDir = myTmpDir + File.separator;
}
+ this.tmpDir = myTmpDir;
this.jarOutputDir = tmpDir + "sqoop/compile";
this.layout = FileLayout.TextFile;
@@ -178,6 +186,7 @@
System.out.println("--driver (class-name) Manually specify JDBC
driver class to use");
System.out.println("--username (username) Set authentication
username");
System.out.println("--password (password) Set authentication
password");
+ System.out.println("--local Use local import fast
path (mysql only)");
System.out.println("");
System.out.println("Import control options:");
System.out.println("--table (tablename) Table to read");
@@ -232,6 +241,8 @@
this.action = ControlAction.ListTables;
} else if (args[i].equals("--all-tables")) {
this.allTables = true;
+ } else if (args[i].equals("--local")) {
+ this.local = true;
} else if (args[i].equals("--username")) {
this.username = args[++i];
if (null == this.password) {
@@ -300,6 +311,13 @@
}
}
+ /** get the temporary directory; guaranteed to end in File.separator
+ * (e.g., '/')
+ */
+ public String getTmpDir() {
+ return tmpDir;
+ }
+
public String getConnectString() {
return connectString;
}
@@ -336,6 +354,10 @@
return password;
}
+ public boolean isLocal() {
+ return local;
+ }
+
/**
* @return location where .java files go; guaranteed to end with '/'
*/
@@ -393,4 +415,8 @@
public FileLayout getFileLayout() {
return this.layout;
}
+
+ public void setUsername(String name) {
+ this.username = name;
+ }
}
Added:
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=781330&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
(added)
+++
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
Wed Jun 3 10:22:56 2009
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.util.ImportError;
+
+/**
+ * Manages local connections to MySQL databases
+ * that are local to this machine -- so we can use mysqldump to get
+ * really fast dumps.
+ */
+public class LocalMySQLManager extends MySQLManager {
+
+ public static final Log LOG =
LogFactory.getLog(LocalMySQLManager.class.getName());
+
+ public LocalMySQLManager(final ImportOptions options) {
+ super(options, false);
+ }
+
+ private static final String MYSQL_DUMP_CMD = "mysqldump";
+
+ /**
+ * Import the table into HDFS by using mysqldump to pull out the data from
+ * the database and upload the files directly to HDFS.
+ */
+ public void importTable(String tableName, String jarFile, Configuration conf)
+ throws IOException, ImportError {
+
+ LOG.info("Beginning mysqldump fast path import");
+
+ if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
+ // TODO(aaron): Support SequenceFile-based load-in
+ LOG.warn("File import layout " + options.getFileLayout()
+ + " is not supported by");
+ LOG.warn("MySQL local import; import will proceed as text files.");
+ }
+
+ ArrayList<String> args = new ArrayList<String>();
+
+ // We need to parse the connect string URI to determine the database
+ // name. Using java.net.URL directly on the connect string will fail
because
+ // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
scheme
+ // (everything before '://') and replace it with 'http', which we know
will work.
+ String connectString = options.getConnectString();
+ String databaseName = null;
+ try {
+ String sanitizedString = null;
+ int schemeEndOffset = connectString.indexOf("://");
+ if (-1 == schemeEndOffset) {
+ // couldn't find one? try our best here.
+ sanitizedString = "http://" + connectString;
+ LOG.warn("Could not find database access scheme in connect string " +
connectString);
+ } else {
+ sanitizedString = "http" + connectString.substring(schemeEndOffset);
+ }
+
+ URL connectUrl = new URL(sanitizedString);
+ databaseName = connectUrl.getPath();
+ } catch (MalformedURLException mue) {
+ LOG.error("Malformed connect string URL: " + connectString
+ + "; reason is " + mue.toString());
+ }
+
+ if (null == databaseName) {
+ throw new ImportError("Could not determine database name");
+ }
+
+ // database name was found from the 'path' part of the URL; trim leading
'/'
+ while (databaseName.startsWith("/")) {
+ databaseName = databaseName.substring(1);
+ }
+
+ LOG.info("Performing import of table " + tableName + " from database " +
databaseName);
+
+ args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
+ args.add("--skip-opt");
+ args.add("--compact");
+ args.add("--no-create-db");
+ args.add("--no-create-info");
+
+ String username = options.getUsername();
+ if (null != username) {
+ args.add("--user=" + username);
+ }
+
+ String password = options.getPassword();
+ if (null != password) {
+ // TODO(aaron): This is really insecure.
+ args.add("--password=" + password);
+ }
+
+ args.add("--quick"); // no buffering
+ // TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data
+ args.add("--single-transaction");
+
+ args.add(databaseName);
+ args.add(tableName);
+
+ Process p = null;
+ try {
+ // begin the import in an external process.
+ LOG.debug("Starting mysqldump with arguments:");
+ for (String arg : args) {
+ LOG.debug(" " + arg);
+ }
+
+ p = Runtime.getRuntime().exec(args.toArray(new String[0]));
+
+ // read from the pipe, into HDFS.
+ InputStream is = p.getInputStream();
+ OutputStream os = null;
+
+ BufferedReader r = null;
+ BufferedWriter w = null;
+
+ try {
+ r = new BufferedReader(new InputStreamReader(is));
+
+ // create the paths/files in HDFS
+ FileSystem fs = FileSystem.get(conf);
+ String warehouseDir = options.getWarehouseDir();
+ Path destDir = null;
+ if (null != warehouseDir) {
+ destDir = new Path(new Path(warehouseDir), tableName);
+ } else {
+ destDir = new Path(tableName);
+ }
+
+ LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
+ LOG.debug("Creating destination directory " + destDir);
+ fs.mkdirs(destDir);
+ Path destFile = new Path(destDir, "data-00000");
+ LOG.debug("Opening output file: " + destFile);
+ if (fs.exists(destFile)) {
+ Path canonicalDest = destFile.makeQualified(fs);
+ throw new IOException("Destination file " + canonicalDest + "
already exists");
+ }
+
+ os = fs.create(destFile);
+ w = new BufferedWriter(new OutputStreamWriter(os));
+
+ // Actually do the read/write transfer loop here.
+ int preambleLen = -1; // set to this for "undefined"
+ while (true) {
+ String inLine = r.readLine();
+ if (null == inLine) {
+ break; // EOF.
+ }
+
+ // this line is of the form "INSERT .. VALUES ( actual value text );"
+ // strip the leading preamble up to the '(' and the trailing ');'.
+ if (preambleLen == -1) {
+ // we haven't determined how long the preamble is. It's constant
+ // across all lines, so just figure this out once.
+ String recordStartMark = "VALUES (";
+ preambleLen = inLine.indexOf(recordStartMark) +
recordStartMark.length();
+ }
+
+ // chop off the leading and trailing text as we write the
+ // output to HDFS.
+ w.write(inLine, preambleLen, inLine.length() - 2 - preambleLen);
+ w.newLine();
+ }
+ } finally {
+ LOG.info("Transfer loop complete.");
+ if (null != r) {
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing FIFO stream: " + ioe.toString());
+ }
+ }
+
+ if (null != w) {
+ try {
+ w.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing HDFS stream: " + ioe.toString());
+ }
+ }
+ }
+ } finally {
+ int result = 0;
+ if (null != p) {
+ while (true) {
+ try {
+ result = p.waitFor();
+ } catch (InterruptedException ie) {
+ // interrupted; loop around.
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ if (0 != result) {
+ throw new IOException("mysqldump terminated with status "
+ + Integer.toString(result));
+ }
+ }
+ }
+}
+
Modified:
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
(original)
+++
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
Wed Jun 3 10:22:56 2009
@@ -41,6 +41,19 @@
public MySQLManager(final ImportOptions opts) {
super(DRIVER_CLASS, opts);
+
+ String connectString = opts.getConnectString();
+ if (null != connectString && connectString.indexOf("//localhost") != -1) {
+ // if we're not doing a remote connection, they should have a
LocalMySQLManager.
+ LOG.warn("It looks like you are importing from mysql on localhost.");
+ LOG.warn("This transfer can be faster! Use the --local option to
exercise a");
+ LOG.warn("MySQL-specific fast path.");
+ }
+ }
+
+ protected MySQLManager(final ImportOptions opts, boolean ignored) {
+ // constructor used by subclasses to avoid the --local warning.
+ super(DRIVER_CLASS, opts);
}
@Override
Modified:
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
(original)
+++
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
Wed Jun 3 10:22:56 2009
@@ -43,9 +43,6 @@
/**
* Actually runs a jdbc import job using the ORM files generated by the
sqoop.orm package.
- *
- *
- *
*/
public class ImportJob {
Modified:
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
(original)
+++
hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
Wed Jun 3 10:22:56 2009
@@ -130,6 +130,15 @@
}
}
+ // find sqoop jar for compilation classpath
+ String sqoopJar = findThisJar();
+ if (null != sqoopJar) {
+ sqoopJar = File.pathSeparator + sqoopJar;
+ } else {
+ LOG.warn("Could not find sqoop jar; child compilation may fail");
+ sqoopJar = "";
+ }
+
String curClasspath = System.getProperty("java.class.path");
args.add("-sourcepath");
@@ -140,7 +149,7 @@
args.add(jarOutDir);
args.add("-classpath");
- args.add(curClasspath + File.pathSeparator + coreJar);
+ args.add(curClasspath + File.pathSeparator + coreJar + sqoopJar);
// add all the source files
for (String srcfile : sources) {
Modified:
hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
(original)
+++
hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
Wed Jun 3 10:22:56 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.sqoop;
+import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
import org.apache.hadoop.sqoop.manager.TestSqlManager;
import org.apache.hadoop.sqoop.orm.TestClassWriter;
@@ -44,6 +45,7 @@
suite.addTestSuite(TestColumnTypes.class);
suite.addTestSuite(TestMultiCols.class);
suite.addTestSuite(TestOrderBy.class);
+ suite.addTestSuite(LocalMySQLTest.class);
return suite;
}
Added:
hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java?rev=781330&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
(added)
+++
hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
Wed Jun 3 10:22:56 2009
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.FileInputStream;
+import java.io.File;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Test the LocalMySQLManager implementation.
+ * This differs from MySQLManager only in its importTable() method, which
+ * uses mysqldump instead of mapreduce+DBInputFormat.
+ *
+ * Since this requires a MySQL installation on your local machine to use, this
+ * class is named in such a way that Hadoop's default QA process does not run
+ * it. You need to run this manually with -Dtestcase=LocalMySQLTest.
+ *
+ * You need to put MySQL's Connector/J JDBC driver library into a location
+ * where Hadoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons).
+ *
+ * You should also create a database named 'sqooptestdb' and authorize
yourself:
+ *
+ * CREATE DATABASE sqooptestdb;
+ * use mysql;
+ * GRANT ALL PRIVILEGES ON sqooptestdb.* TO 'yourusername'@'localhost';
+ * GRANT FILE ON *.* TO 'yourusername'@'localhost';
+ * flush privileges;
+ *
+ * The above will authorize you to use file-level access to the database.
+ * This privilege is global and cannot be applied on a per-schema basis
+ * (e.g., just to sqooptestdb).
+ */
+public class LocalMySQLTest extends ImportJobTestCase {
+
+ public static final Log LOG =
LogFactory.getLog(LocalMySQLTest.class.getName());
+
+ static final String MYSQL_DATABASE_NAME = "sqooptestdb";
+ static final String TABLE_NAME = "EMPLOYEES";
+ static final String CONNECT_STRING = "jdbc:mysql://localhost/" +
MYSQL_DATABASE_NAME;
+
+ // instance variables populated during setUp, used during tests
+ private LocalMySQLManager manager;
+
+ @Before
+ public void setUp() {
+ ImportOptions options = new ImportOptions(CONNECT_STRING, TABLE_NAME);
+ options.setUsername(getCurrentUser());
+ manager = new LocalMySQLManager(options);
+
+ Connection connection = null;
+ Statement st = null;
+
+ try {
+ connection = manager.getConnection();
+ connection.setAutoCommit(false);
+ st = connection.createStatement();
+
+ // create the database table and populate it with data.
+ st.executeUpdate("DROP TABLE IF EXISTS " + TABLE_NAME);
+ st.executeUpdate("CREATE TABLE " + TABLE_NAME + " ("
+ + "id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, "
+ + "name VARCHAR(24) NOT NULL, "
+ + "start_date DATE, "
+ + "salary FLOAT, "
+ + "dept VARCHAR(32))");
+
+ st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ + "NULL,'Aaron','2009-05-14',1000000.00,'engineering')");
+ st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ + "NULL,'Bob','2009-04-20',400.00,'sales')");
+ st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+ + "NULL,'Fred','2009-01-23',15.00,'marketing')");
+ connection.commit();
+ } catch (SQLException sqlE) {
+ LOG.error("Encountered SQL Exception: " + sqlE);
+ sqlE.printStackTrace();
+ fail("SQLException when running test setUp(): " + sqlE);
+ } finally {
+ try {
+ if (null != st) {
+ st.close();
+ }
+
+ if (null != connection) {
+ connection.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.warn("Got SQLException when closing connection: " + sqlE);
+ }
+ }
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ manager.close();
+ } catch (SQLException sqlE) {
+ LOG.error("Got SQLException: " + sqlE.toString());
+ fail("Got SQLException: " + sqlE.toString());
+ }
+ }
+
+ /** @return the current username. */
+ private String getCurrentUser() {
+ // First, check the $USER environment variable.
+ String envUser = System.getenv("USER");
+ if (null != envUser) {
+ return envUser;
+ }
+
+ // Try `whoami`
+ String [] whoamiArgs = new String[1];
+ whoamiArgs[0] = "whoami";
+ Process p = null;
+ BufferedReader r = null;
+ try {
+ p = Runtime.getRuntime().exec(whoamiArgs);
+ InputStream is = p.getInputStream();
+ r = new BufferedReader(new InputStreamReader(is));
+ return r.readLine();
+ } catch (IOException ioe) {
+ LOG.error("IOException reading from `whoami`: " + ioe.toString());
+ return null;
+ } finally {
+ // close our stream.
+ if (null != r) {
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.warn("IOException closing input stream from `whoami`: " +
ioe.toString());
+ }
+ }
+
+ // wait for whoami to exit.
+ while (p != null) {
+ try {
+ int ret = p.waitFor();
+ if (0 != ret) {
+ LOG.error("whoami exited with error status " + ret);
+ // suppress original return value from this method.
+ return null;
+ }
+ } catch (InterruptedException ie) {
+ continue; // loop around.
+ }
+ }
+ }
+ }
+
+ private String [] getArgv(boolean includeHadoopFlags) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ if (includeHadoopFlags) {
+ args.add("-D");
+ args.add("fs.default.name=file:///");
+ }
+
+ args.add("--table");
+ args.add(TABLE_NAME);
+ args.add("--warehouse-dir");
+ args.add(getWarehouseDir());
+ args.add("--connect");
+ args.add(CONNECT_STRING);
+ args.add("--local");
+ args.add("--username");
+ args.add(getCurrentUser());
+
+ return args.toArray(new String[0]);
+ }
+
+ @Test
+ public void testLocalBulkImport() {
+ String [] argv = getArgv(true);
+ try {
+ runImport(argv);
+ } catch (IOException ioe) {
+ LOG.error("Got IOException during import: " + ioe.toString());
+ ioe.printStackTrace();
+ fail(ioe.toString());
+ }
+
+ Path warehousePath = new Path(this.getWarehouseDir());
+ Path tablePath = new Path(warehousePath, TABLE_NAME);
+ Path filePath = new Path(tablePath, "data-00000");
+
+ File f = new File(filePath.toString());
+ assertTrue("Could not find imported data file", f.exists());
+ BufferedReader r = null;
+ try {
+ // Read through the file and make sure it's all there.
+ r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
+ assertEquals("1,'Aaron','2009-05-14',1e+06,'engineering'", r.readLine());
+ assertEquals("2,'Bob','2009-04-20',400,'sales'", r.readLine());
+ assertEquals("3,'Fred','2009-01-23',15,'marketing'", r.readLine());
+ } catch (IOException ioe) {
+ LOG.error("Got IOException verifying results: " + ioe.toString());
+ ioe.printStackTrace();
+ fail(ioe.toString());
+ } finally {
+ IOUtils.closeStream(r);
+ }
+ }
+}