http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java b/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java index f6e5c0e..e0a0462 100644 --- a/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java +++ b/src/test/org/apache/sqoop/manager/oracle/SystemImportTest.java @@ -44,10 +44,10 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.sqoop.manager.oracle.util.*; import org.junit.Test; -import com.cloudera.sqoop.lib.BlobRef; -import com.cloudera.sqoop.lib.ClobRef; -import com.cloudera.sqoop.lib.SqoopRecord; -import com.cloudera.sqoop.manager.OracleUtils; +import org.apache.sqoop.lib.BlobRef; +import org.apache.sqoop.lib.ClobRef; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.manager.oracle.util.OracleUtils; /** * OraOop system tests of importing data from oracle to hadoop.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java b/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java index 7d3abfd..e98fdfe 100644 --- a/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java +++ b/src/test/org/apache/sqoop/manager/oracle/TestOraOopDataDrivenDBInputFormat.java @@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.junit.Assert; import org.junit.Test; -import com.cloudera.sqoop.lib.SqoopRecord; +import org.apache.sqoop.lib.SqoopRecord; import org.apache.sqoop.manager.oracle.OraOopConstants. OraOopOracleBlockToSplitAllocationMethod; http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/oracle/util/OracleUtils.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/oracle/util/OracleUtils.java b/src/test/org/apache/sqoop/manager/oracle/util/OracleUtils.java new file mode 100644 index 0000000..6d752aa --- /dev/null +++ b/src/test/org/apache/sqoop/manager/oracle/util/OracleUtils.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager.oracle.util; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.manager.ConnManager; + +/** + * Helper methods for Oracle testing. + */ +public final class OracleUtils { + + public static final Log LOG = LogFactory.getLog(OracleUtils.class.getName()); + + // Express edition hardcoded name. + public static final String ORACLE_DATABASE_NAME = "xe"; + + public static final String CONNECT_STRING = System.getProperty("sqoop.test.oracle.connectstring", "jdbc:oracle:thin:@//localhost/" + ORACLE_DATABASE_NAME); + public static final String ORACLE_USER_NAME = System.getProperty("sqoop.test.oracle.username", "SQOOPTEST"); + public static final String ORACLE_USER_PASS = System.getProperty("sqoop.test.oracle.password", "12345"); + + public static final String ORACLE_SECONDARY_USER_NAME = "SQOOPTEST2"; + public static final String ORACLE_SECONDARY_USER_PASS = "ABCDEF"; + + public static final String ORACLE_INVALID_USER_NAME = "invalidusr"; + public static final String SYSTEMTEST_TABLE_NAME = "ORAOOP_TEST"; + public static final int SYSTEMTEST_NUM_ROWS = 100; + public static final int INTEGRATIONTEST_NUM_ROWS = 10000; + // Number of mappers if wanting to override default setting + public static final int NUM_MAPPERS = 0; + // Oracle degree of parallelism to use when creating table. + // If 0 we will calculate a recommended value + public static final int ORACLE_PARALLEL_DEGREE = 0; + + private OracleUtils() { } + + public static void setOracleAuth(SqoopOptions options) { + options.setUsername(ORACLE_USER_NAME); + options.setPassword(ORACLE_USER_PASS); + } + + public static void setOracleSecondaryUserAuth(SqoopOptions options) { + options.setUsername(ORACLE_SECONDARY_USER_NAME); + options.setPassword(ORACLE_SECONDARY_USER_PASS); + } + + /** + * Drop a table if it exists. + */ + public static void dropTable(String tableName, ConnManager manager) + throws SQLException { + Connection connection = null; + Statement st = null; + + try { + connection = manager.getConnection(); + connection.setAutoCommit(false); + st = connection.createStatement(); + + // create the database table and populate it with data. + st.executeUpdate(getDropTableStatement(tableName)); + + connection.commit(); + } finally { + try { + if (null != st) { + st.close(); + } + } catch (SQLException sqlE) { + LOG.warn("Got SQLException when closing connection: " + sqlE); + } + } + } + + public static String getDropTableStatement(String tableName) { + return "BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName + "'; " + + "exception when others then null; end;"; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/DirectPostgreSQLExportManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/postgresql/DirectPostgreSQLExportManualTest.java b/src/test/org/apache/sqoop/manager/postgresql/DirectPostgreSQLExportManualTest.java new file mode 100644 index 0000000..22b202a --- /dev/null +++ b/src/test/org/apache/sqoop/manager/postgresql/DirectPostgreSQLExportManualTest.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager.postgresql; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.PreparedStatement; +import java.util.Arrays; +import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sqoop.TestExport; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.junit.Ignore; +import org.junit.Test; + + +/** + * Test the DirectPostgresqlManager implementations. + * DirectPostgresqlManager uses JDBC driver to facilitate it. + * + * Since this requires a Postgresql installation on your local machine to use, + * this class is named in such a way that Hadoop's default QA process does not + * run it. + * + * You need to run this manually with + * -Dtestcase=DirectPostgreSQLExportManualTest. + * + * You need to put Postgresql's JDBC driver library into lib dir. + * + * You need to create a sqooptest superuser and database and tablespace, + * + * $ sudo -u postgres createuser -U postgres -s sqooptest + * $ sudo -u postgres createdb -U sqooptest sqooptest + * $ psql -U sqooptest sqooptest + * + */ +public class DirectPostgreSQLExportManualTest extends TestExport { + + public static final Log LOG = + LogFactory.getLog(DirectPostgreSQLExportManualTest.class.getName()); + private DBConfiguration dbConf; + + static final String HOST_URL = + System.getProperty("sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + static final String DATABASE = + System.getProperty("sqoop.test.postgresql.database", "sqooptest"); + static final String USERNAME = + System.getProperty("sqoop.test.postgresql.username", "sqooptest"); + static final String PASSWORD = System.getProperty( + "sqoop.test.postgresql.password"); + static final String CONNECT_STRING = HOST_URL + DATABASE; + + public DirectPostgreSQLExportManualTest() { + JobConf conf = new JobConf(getConf()); + DBConfiguration.configureDB(conf, + "org.postgresql.Driver", + getConnectString(), + getUserName(), + PASSWORD, (Integer) null); + dbConf = new DBConfiguration(conf); + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Override + protected String getConnectString() { + return CONNECT_STRING; + } + + protected String getUserName() { + return USERNAME; + } + + @Override + protected String getTablePrefix() { + return super.getTablePrefix().toLowerCase(); + } + + @Override + protected String getTableName() { + return super.getTableName().toLowerCase(); + } + + @Override + public String getStagingTableName() { + return super.getStagingTableName().toLowerCase(); + } + + @Override + protected Connection getConnection() { + try { + Connection conn = dbConf.getConnection(); + conn.setAutoCommit(false); + PreparedStatement stmt = + conn.prepareStatement("SET extra_float_digits TO 0"); + stmt.executeUpdate(); + conn.commit(); + return conn; + } catch (SQLException sqlE) { + LOG.error("Could not get connection to test server: " + sqlE); + return null; + } catch (ClassNotFoundException cnfE) { + LOG.error("Could not find driver class: " + cnfE); + return null; + } + } + + @Override + protected String getDropTableStatement(String tableName) { + return "DROP TABLE IF EXISTS " + tableName; + } + + @Override + protected String[] getArgv(boolean includeHadoopFlags, + int rowsPerStatement, + int statementsPerTx, + String... additionalArgv) { + ArrayList<String> args = + new ArrayList<String>(Arrays.asList(additionalArgv)); + args.add("--username"); + args.add(getUserName()); + args.add("--password"); + args.add(PASSWORD); + args.add("--direct"); + return super.getArgv(includeHadoopFlags, + rowsPerStatement, + statementsPerTx, + args.toArray(new String[0])); + } + + @Override + protected String [] getCodeGenArgv(String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs)); + args.add("--username"); + args.add(getUserName()); + args.add("--password"); + args.add(PASSWORD); + return super.getCodeGenArgv(args.toArray(new String[0])); + } + + @Ignore("Ignoring this test case as direct export does not support --columns option.") + @Override + @Test + public void testColumnsExport() throws IOException, SQLException { + } + + @Ignore("Ignoring this test case as the scenario is not supported with direct export.") + @Override + @Test + public void testLessColumnsInFileThanInTable() throws IOException, SQLException { + } + + @Ignore("Ignoring this test case as the scenario is not supported with direct export.") + @Override + @Test + public void testLessColumnsInFileThanInTableInputNullIntPassed() throws IOException, SQLException { + } + + @Ignore("Ignoring this test case as the scenario is not supported with direct export.") + @Override + @Test + public void testLessColumnsInFileThanInTableInputNullStringPassed() throws IOException, SQLException { + } + + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/PGBulkloadManagerManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/postgresql/PGBulkloadManagerManualTest.java b/src/test/org/apache/sqoop/manager/postgresql/PGBulkloadManagerManualTest.java new file mode 100644 index 0000000..8855316 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/postgresql/PGBulkloadManagerManualTest.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager.postgresql; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Test; + +import org.apache.sqoop.TestExport; +import org.apache.sqoop.mapreduce.db.DBConfiguration; + + +/** + * Test the PGBulkloadManager implementations. + * PGBulkloadManager uses both JDBC driver and pg_bulkload to facilitate it. + * + * Since this requires a Postgresql installation on your local machine to use, + * this class is named in such a way that Hadoop's default QA process does not + * run it. + * + * You need to run this manually with -Dtestcase=PGBulkloadManagerManualTest. + * + * You need to put Postgresql's JDBC driver library into lib dir. + * + * You need to create a sqooptest superuser and database and tablespace, + * and install pg_bulkload for sqooptest database: + * + * $ sudo -u postgres createuser -U postgres -s sqooptest + * $ sudo -u postgres createdb -U sqooptest sqooptest + * $ sudo -u postgres mkdir /var/pgdata/stagingtablespace + * $ psql -U sqooptest + * -f /usr/local/share/postgresql/contrib/pg_bulkload.sql sqooptest + * $ psql -U sqooptest sqooptest + * sqooptest=# CREATE USER sqooptest; + * sqooptest=# CREATE DATABASE sqooptest; + * sqooptest=# CREATE TABLESPACE sqooptest + * LOCATION '/var/pgdata/stagingtablespace'; + * sqooptest=# \q + * + */ +public class PGBulkloadManagerManualTest extends TestExport { + + public static final Log LOG = + LogFactory.getLog(PGBulkloadManagerManualTest.class.getName()); + private DBConfiguration dbConf; + static final String HOST_URL = + System.getProperty("sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + static final String DATABASE = + System.getProperty("sqoop.test.postgresql.database", "sqooptest"); + static final String TABLESPACE = + System.getProperty("sqoop.test.postgresql.tablespace", "sqooptest"); + static final String USERNAME = + System.getProperty("sqoop.test.postgresql.username", "sqooptest"); + static final String PG_BULKLOAD = + System.getProperty("sqoop.test.postgresql.pg_bulkload", "pg_bulkload"); + static final String CONNECT_STRING = HOST_URL + DATABASE; + + public PGBulkloadManagerManualTest() { + JobConf conf = new JobConf(getConf()); + DBConfiguration.configureDB(conf, + "org.postgresql.Driver", + getConnectString(), + getUserName(), + (String) null, (Integer) null); + dbConf = new DBConfiguration(conf); + } + + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + + @Override + protected String getConnectString() { + return CONNECT_STRING; + } + + + protected String getUserName() { + return USERNAME; + } + + + @Override + protected String getTablePrefix() { + return super.getTablePrefix().toLowerCase(); + } + + + @Override + protected String getTableName() { + return super.getTableName().toLowerCase(); + } + + @Override + public String getStagingTableName() { + return super.getStagingTableName().toLowerCase(); + } + + + @Override + protected Connection getConnection() { + try { + Connection conn = dbConf.getConnection(); + conn.setAutoCommit(false); + PreparedStatement stmt = + conn.prepareStatement("SET extra_float_digits TO 0"); + stmt.executeUpdate(); + conn.commit(); + return conn; + } catch (SQLException sqlE) { + LOG.error("Could not get connection to test server: " + sqlE); + return null; + } catch (ClassNotFoundException cnfE) { + LOG.error("Could not find driver class: " + cnfE); + return null; + } + } + + + @Override + protected String getDropTableStatement(String tableName) { + return "DROP TABLE IF EXISTS " + tableName; + } + + + @Override + protected String[] getArgv(boolean includeHadoopFlags, + int rowsPerStatement, + int statementsPerTx, + String... additionalArgv) { + ArrayList<String> args = + new ArrayList<String>(Arrays.asList(additionalArgv)); + args.add("-D"); + args.add("pgbulkload.bin=" + PG_BULKLOAD); + args.add("--username"); + args.add(getUserName()); + args.add("--connection-manager"); + args.add("org.apache.sqoop.manager.PGBulkloadManager"); + args.add("--staging-table"); + args.add("dummy"); + args.add("--clear-staging-table"); + return super.getArgv(includeHadoopFlags, + rowsPerStatement, + statementsPerTx, + args.toArray(new String[0])); + } + + + @Override + protected String [] getCodeGenArgv(String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs)); + args.add("--username"); + args.add(getUserName()); + return super.getCodeGenArgv(args.toArray(new String[0])); + } + + + @Override + public void testColumnsExport() throws IOException, SQLException { + // PGBulkloadManager does not support --columns option. + } + + @Test + public void testMultiReduceExport() throws IOException, SQLException { + multiFileTest(2, 10, 2, "-D", "mapred.reduce.tasks=2"); + } + + @Test + public void testMultiReduceExportWithNewProp() + throws IOException, SQLException { + multiFileTest(2, 10, 2, "-D", "mapreduce.job.reduces=2"); + } + + @Test + public void testExportWithTablespace() throws IOException, SQLException { + multiFileTest(1, 10, 1, + "-D", "pgbulkload.staging.tablespace=" + TABLESPACE); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java new file mode 100644 index 0000000..f86b119 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExportTest.java @@ -0,0 +1,492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.manager.postgresql; + +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.ExportJobTestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.manager.ConnManager; +import org.apache.sqoop.manager.PostgresqlManager; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * + */ +public class PostgresqlExportTest extends ExportJobTestCase { + public static final Log LOG = LogFactory.getLog( + PostgresqlExportTest.class.getName()); + + static final String HOST_URL = System.getProperty( + "sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + static final String DATABASE_USER = System.getProperty( + "sqoop.test.postgresql.username", + "sqooptest"); + static final String DATABASE_NAME = System.getProperty( + "sqoop.test.postgresql.database", + "sqooptest"); + static final String PASSWORD = System.getProperty( + "sqoop.test.postgresql.password"); + + static final String TABLE_NAME = "EMPLOYEES_PG"; + static final String PROCEDURE_NAME = "INSERT_AN_EMPLOYEE"; + static final String STAGING_TABLE_NAME = "STAGING"; + static final String SCHEMA_PUBLIC = "public"; + static final String SCHEMA_SPECIAL = "special"; + static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; + + protected Connection connection; + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + private String getDropTableStatement(String tableName, String schema) { + return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName); + } + + @Before + public void setUp() { + super.setUp(); + + LOG.debug("Setting up postgresql test: " + CONNECT_STRING); + + try { + connection = DriverManager.getConnection(CONNECT_STRING, DATABASE_USER, PASSWORD); + connection.setAutoCommit(false); + } catch (SQLException ex) { + LOG.error("Can't create connection", ex); + throw new RuntimeException(ex); + } + + createTable(TABLE_NAME, SCHEMA_PUBLIC); + createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC); + createTable(TABLE_NAME, SCHEMA_SPECIAL); + createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL); + createProcedure(PROCEDURE_NAME, SCHEMA_PUBLIC); + + LOG.debug("setUp complete."); + } + + @Override + public void tearDown() { + try { + Statement stmt = connection.createStatement(); + stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC)); + stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_PUBLIC)); + stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_SPECIAL)); + stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_SPECIAL)); + } catch(SQLException e) { + LOG.error("Can't clean up the database:", e); + } + + super.tearDown(); + + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Ignoring exception in tearDown", e); + } + } + + private interface CreateIt { + void createIt( + Statement st, + String fullName, + ConnManager manager) throws SQLException; + } + + private void createTable(String tableName, String schema) { + CreateIt createIt = new CreateIt() { + @Override + public void createIt( + Statement st, + String fullName, + ConnManager manager) throws SQLException { + st.executeUpdate("CREATE TABLE " + fullName + " (" + + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " + + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " + + manager.escapeColName("start_date") + " DATE, " + + manager.escapeColName("salary") + " FLOAT, " + + manager.escapeColName("dept") + " VARCHAR(32))"); + } + }; + create(tableName, "TABLE", schema, createIt); + } + + private void createProcedure(String procedureName, String schema) { + CreateIt createIt = new CreateIt() { + @Override + public void createIt( + Statement st, + String fullName, + ConnManager manager) throws SQLException { + st.executeUpdate("CREATE OR REPLACE FUNCTION " + fullName + " (" + + "IN " + manager.escapeColName("id") + " INT," + + "IN " + manager.escapeColName("name") + " VARCHAR(24)," + + "IN " + manager.escapeColName("start_date") + " DATE," + + "IN " + manager.escapeColName("salary") + " FLOAT," + + "IN " + manager.escapeColName("dept") + " VARCHAR(32)" + + ") " + + "RETURNS VOID " + + "AS $$ " + + "BEGIN " + + "INSERT INTO " + + quoteTableOrSchemaName(SCHEMA_PUBLIC) + + "." + + quoteTableOrSchemaName(TABLE_NAME) + + " (" + + manager.escapeColName("id") + +", " + + manager.escapeColName("name") + +", " + + manager.escapeColName("start_date") + +", " + + manager.escapeColName("salary") + +", " + + manager.escapeColName("dept") + + ") VALUES (" + + manager.escapeColName("id") + +", " + + manager.escapeColName("name") + +", " + + manager.escapeColName("start_date") + +", " + + manager.escapeColName("salary") + +", " + + manager.escapeColName("dept") + + ");" + + "END;" + + "$$ LANGUAGE plpgsql;"); + } + }; + create(procedureName, "FUNCTION", schema, createIt); + } + + private void create( + String name, + String type, + String schema, + CreateIt createIt) { + SqoopOptions options = new SqoopOptions(CONNECT_STRING, name); + options.setUsername(DATABASE_USER); + + ConnManager manager = null; + Statement st = null; + + try { + manager = new PostgresqlManager(options); + st = connection.createStatement(); + + // Create schema if not exists in dummy way (always create and ignore + // errors. + try { + st.executeUpdate("CREATE SCHEMA " + quoteTableOrSchemaName(schema)); + connection.commit(); + } catch (SQLException e) { + LOG.info("Couldn't create schema " + schema + " (is o.k. as long as" + + "the schema already exists.", e); + connection.rollback(); + } + + String fullTableName = quoteTableOrSchemaName(schema) + + "." + quoteTableOrSchemaName(name); + + try { + // Try to remove the table first. DROP TABLE IF EXISTS didn't + // get added until pg 8.3, so we just use "DROP TABLE" and ignore + // any exception here if one occurs. + st.executeUpdate("DROP " + type + " " + fullTableName); + } catch (SQLException e) { + LOG.info("Couldn't drop " + + type.toLowerCase() + + " " +fullTableName + + " (ok)", + e); + // Now we need to reset the transaction. + connection.rollback(); + } + + createIt.createIt(st, fullTableName, manager); + + connection.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: " + sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != st) { + st.close(); + } + + if (null != manager) { + manager.close(); + } + } catch (SQLException sqlE) { + LOG.warn("Got SQLException when closing connection: " + sqlE); + } + } + + LOG.debug("setUp complete."); + } + + private String [] getArgv(boolean useTable, + String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + if (useTable) { + args.add("--table"); + args.add(TABLE_NAME); + } else { + args.add("--call"); + args.add(PROCEDURE_NAME); + } + args.add("--export-dir"); + args.add(getWarehouseDir()); + args.add("--fields-terminated-by"); + args.add(","); + args.add("--lines-terminated-by"); + args.add("\\n"); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("--password"); + args.add(PASSWORD); + args.add("-m"); + args.add("1"); + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + protected void createTestFile(String filename, + String[] lines) + throws IOException { + new File(getWarehouseDir()).mkdirs(); + File file = new File(getWarehouseDir() + "/" + filename); + Writer output = new BufferedWriter(new FileWriter(file)); + for(String line : lines) { + output.write(line); + output.write("\n"); + } + output.close(); + } + + @Test + public void testExport() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + runExport(getArgv(true)); + + assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); + } + + @Test + public void testExportUsingProcedure() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + runExport(getArgv(false)); + + assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); + } + + @Test + public void testExportStaging() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, }; + + runExport(getArgv(true, extra)); + + assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); + } + + @Test + public void testExportDirect() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--direct"}; + + runExport(getArgv(true, extra)); + + assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection); + } + + @Test + public void testExportCustomSchema() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(true, extra)); + + assertRowCount(2, + quoteTableOrSchemaName(SCHEMA_SPECIAL) + + "." + quoteTableOrSchemaName(TABLE_NAME), + connection); + } + + @Test + public void testExportCustomSchemaStaging() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--staging-table", + STAGING_TABLE_NAME, + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(true, extra)); + + assertRowCount(2, + quoteTableOrSchemaName(SCHEMA_SPECIAL) + + "." + quoteTableOrSchemaName(TABLE_NAME), + connection); + } + + @Test + public void testExportCustomSchemaStagingClear() + throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--staging-table", + STAGING_TABLE_NAME, + "--clear-staging-table", + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(true, extra)); + + assertRowCount(2, + quoteTableOrSchemaName(SCHEMA_SPECIAL) + + "." + quoteTableOrSchemaName(TABLE_NAME), + connection); + } + + @Test + public void testExportCustomSchemaDirect() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--direct", + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(true, extra)); + + assertRowCount(2, + quoteTableOrSchemaName(SCHEMA_SPECIAL) + + "." + quoteTableOrSchemaName(TABLE_NAME), + connection); + } + + public static void assertRowCount(long expected, + String tableName, + Connection connection) { + Statement stmt = null; + ResultSet rs = null; + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery("SELECT count(*) FROM " + tableName); + + rs.next(); + + assertEquals(expected, rs.getLong(1)); + } catch (SQLException e) { + LOG.error("Can't verify number of rows", e); + fail(); + } finally { + try { + connection.commit(); + + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + } catch (SQLException ex) { + LOG.info("Ignored exception in finally block."); + } + } + } + + public String quoteTableOrSchemaName(String tableName) { + return "\"" + tableName + "\""; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExternalTableImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExternalTableImportTest.java b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExternalTableImportTest.java new file mode 100644 index 0000000..dd4cfb4 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlExternalTableImportTest.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.manager.postgresql; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.sqoop.manager.ConnManager; +import org.apache.sqoop.manager.PostgresqlManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.util.FileListing; + +public class PostgresqlExternalTableImportTest extends ImportJobTestCase { + + public static final Log LOG = LogFactory + .getLog(PostgresqlExternalTableImportTest.class.getName()); + static final String HOST_URL = System.getProperty("sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + static final String DATABASE_USER = System.getProperty( + "sqoop.test.postgresql.username", "sqooptest"); + static final String DATABASE_NAME = System.getProperty( + "sqoop.test.postgresql.database", "sqooptest"); + static final String PASSWORD = System.getProperty("sqoop.test.postgresql.password"); + + static final String TABLE_NAME = "EMPLOYEES_PG"; + static final String NULL_TABLE_NAME = "NULL_EMPLOYEES_PG"; + static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's"; + static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE"; + static final String SCHEMA_PUBLIC = "public"; + static final String SCHEMA_SPECIAL = "special"; + static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; + static final String EXTERNAL_TABLE_DIR = "/tmp/external/employees_pg"; + protected Connection connection; + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + public String quoteTableOrSchemaName(String tableName) { + return "\"" + tableName + "\""; + } + + private String getDropTableStatement(String tableName, String schema) { + return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + + quoteTableOrSchemaName(tableName); + } + + @Before + public void setUp() { + super.setUp(); + + LOG.debug("Setting up another postgresql test: " + CONNECT_STRING); + + setUpData(TABLE_NAME, SCHEMA_PUBLIC, false); + setUpData(NULL_TABLE_NAME, SCHEMA_PUBLIC, true); + setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC, false); + setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL, false); + + LOG.debug("setUp complete."); + } + + @After + public void tearDown() { + try { + Statement stmt = connection.createStatement(); + stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC)); + stmt.executeUpdate(getDropTableStatement(NULL_TABLE_NAME, SCHEMA_PUBLIC)); + stmt.executeUpdate(getDropTableStatement(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC)); + stmt.executeUpdate(getDropTableStatement(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL)); + } catch (SQLException e) { + LOG.error("Can't clean up the database:", e); + } + + super.tearDown(); + + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Ignoring exception in tearDown", e); + } + } + + public void setUpData(String tableName, String schema, boolean nullEntry) { + SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName); + options.setUsername(DATABASE_USER); + options.setPassword(PASSWORD); + + ConnManager manager = null; + Statement st = null; + + try { + manager = new PostgresqlManager(options); + connection = manager.getConnection(); + connection.setAutoCommit(false); + st = connection.createStatement(); + + // Create schema if not exists in dummy way (always create and ignore + // errors. + try { + st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema)); + connection.commit(); + } catch (SQLException e) { + LOG.info("Couldn't create schema " + schema + " (is o.k. as long as" + + "the schema already exists."); + connection.rollback(); + } + + String fullTableName = manager.escapeTableName(schema) + "." + + manager.escapeTableName(tableName); + LOG.info("Creating table: " + fullTableName); + + try { + // Try to remove the table first. DROP TABLE IF EXISTS didn't + // get added until pg 8.3, so we just use "DROP TABLE" and ignore + // any exception here if one occurs. + st.executeUpdate("DROP TABLE " + fullTableName); + } catch (SQLException e) { + LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)"); + // Now we need to reset the transaction. + connection.rollback(); + } + + st.executeUpdate("CREATE TABLE " + fullTableName + " (" + manager.escapeColName("id") + + " INT NOT NULL PRIMARY KEY, " + manager.escapeColName("name") + + " VARCHAR(24) NOT NULL, " + manager.escapeColName("start_date") + " DATE, " + + manager.escapeColName("Salary") + " FLOAT, " + manager.escapeColName("Fired") + + " BOOL, " + manager.escapeColName("dept") + " VARCHAR(32))"); + + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,'engineering')"); + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales')"); + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing')"); + if (nullEntry) { + st.executeUpdate("INSERT INTO " + fullTableName + " VALUES(4,'Mike',NULL,NULL,NULL,NULL)"); + + } + connection.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: " + sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != st) { + st.close(); + } + + if (null != manager) { + manager.close(); + } + } catch (SQLException sqlE) { + LOG.warn("Got SQLException when closing connection: " + sqlE); + } + } + + LOG.debug("setUp complete."); + } + + private String[] getArgv(boolean isDirect, String tableName, String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + args.add("--table"); + args.add(tableName); + args.add("--external-table-dir"); + args.add(EXTERNAL_TABLE_DIR); + args.add("--hive-import"); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("--password"); + args.add(PASSWORD); + args.add("--where"); + args.add("id > 1"); + args.add("-m"); + args.add("1"); + + if (isDirect) { + args.add("--direct"); + } + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + private void doImportAndVerify(boolean isDirect, String[] expectedResults, String tableName, + String... extraArgs) throws IOException { + + Path tablePath = new Path(EXTERNAL_TABLE_DIR); + + // if importing with merge step, directory should exist and output should be + // from a reducer + boolean isMerge = Arrays.asList(extraArgs).contains("--merge-key"); + Path filePath = new Path(tablePath, isMerge ? "part-r-00000" : "part-m-00000"); + + File tableFile = new File(tablePath.toString()); + if (tableFile.exists() && tableFile.isDirectory() && !isMerge) { + // remove the directory before running the import. + FileListing.recursiveDeleteDir(tableFile); + } + + String[] argv = getArgv(isDirect, tableName, extraArgs); + try { + runImport(argv); + } catch (IOException ioe) { + LOG.error("Got IOException during import: " + ioe.toString()); + ioe.printStackTrace(); + fail(ioe.toString()); + } + + File f = new File(filePath.toString()); + assertTrue("Could not find imported data file, " + f, f.exists()); + BufferedReader r = null; + try { + // Read through the file and make sure it's all there. + r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); + for (String expectedLine : expectedResults) { + assertEquals(expectedLine, r.readLine()); + } + } catch (IOException ioe) { + LOG.error("Got IOException verifying results: " + ioe.toString()); + ioe.printStackTrace(); + fail(ioe.toString()); + } finally { + IOUtils.closeStream(r); + } + } + + @Test + public void testJdbcBasedImport() throws IOException { + // separator is different to other tests + // because the CREATE EXTERNAL TABLE DDL is + // ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' + char sep = '\001'; + String[] expectedResults = { + "2" + sep + "Bob" + sep + "2009-04-20" + sep + "400.0" + sep + "true" + sep + "sales", + "3" + sep + "Fred" + sep + "2009-01-23" + sep + "15.0" + sep + "false" + sep + "marketing" }; + doImportAndVerify(false, expectedResults, TABLE_NAME); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/postgresql/PostgresqlImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/postgresql/PostgresqlImportTest.java b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlImportTest.java new file mode 100644 index 0000000..846228a --- /dev/null +++ b/src/test/org/apache/sqoop/manager/postgresql/PostgresqlImportTest.java @@ -0,0 +1,468 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager.postgresql; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.FileInputStream; +import java.io.File; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.sqoop.manager.ConnManager; +import org.apache.sqoop.manager.PostgresqlManager; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.testutil.ImportJobTestCase; +import org.apache.sqoop.util.FileListing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test the PostgresqlManager and DirectPostgresqlManager implementations. + * The former uses the postgres JDBC driver to perform an import; + * the latter uses pg_dump to facilitate it. + * + * Since this requires a Postgresql installation on your local machine to use, + * this class is named in such a way that Hadoop's default QA process does not + * run it. You need to run this manually with -Dtestcase=PostgresqlImportTest or + * -Dthirdparty=true. + * + * You need to put Postgresql's JDBC driver library into a location where + * Hadoop can access it (e.g., $HADOOP_HOME/lib). + * + * To configure a postgresql database to allow local connections, put the + * following in /etc/postgresql/8.3/main/pg_hba.conf: + * local all all trust + * host all all 127.0.0.1/32 trust + * host all all ::1/128 trust + * + * ... and comment out any other lines referencing 127.0.0.1 or ::1. + * + * Also in the file /etc/postgresql/8.3/main/postgresql.conf, uncomment + * the line that starts with listen_addresses and set its value to '*' as + * follows + * listen_addresses = '*' + * + * For postgresql 8.1, this may be in /var/lib/pgsql/data, instead. You may + * need to restart the postgresql service after modifying this file. + * + * You should also create a sqooptest user and database: + * + * $ sudo -u postgres psql -U postgres template1 + * template1=> CREATE USER sqooptest; + * template1=> CREATE DATABASE sqooptest; + * template1=> GRANT ALL ON DATABASE sqooptest TO sqooptest; + * template1=> \q + * + */ +public class PostgresqlImportTest extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + PostgresqlImportTest.class.getName()); + + static final String HOST_URL = System.getProperty( + "sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + static final String DATABASE_USER = System.getProperty( + "sqoop.test.postgresql.username", + "sqooptest"); + static final String DATABASE_NAME = System.getProperty( + "sqoop.test.postgresql.database", + "sqooptest"); + static final String PASSWORD = System.getProperty( + "sqoop.test.postgresql.password"); + + static final String TABLE_NAME = "EMPLOYEES_PG"; + static final String NULL_TABLE_NAME = "NULL_EMPLOYEES_PG"; + static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's"; + static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE"; + static final String SCHEMA_PUBLIC = "public"; + static final String SCHEMA_SPECIAL = "special"; + static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; + + protected Connection connection; + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + public String quoteTableOrSchemaName(String tableName) { + return "\"" + tableName + "\""; + } + + private String getDropTableStatement(String tableName, String schema) { + return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName); + } + + @Before + public void setUp() { + super.setUp(); + + LOG.debug("Setting up another postgresql test: " + CONNECT_STRING); + + setUpData(TABLE_NAME, SCHEMA_PUBLIC, false); + setUpData(NULL_TABLE_NAME, SCHEMA_PUBLIC, true); + setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC, false); + setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL, false); + + LOG.debug("setUp complete."); + } + + @After + public void tearDown() { + try { + Statement stmt = connection.createStatement(); + stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC)); + stmt.executeUpdate(getDropTableStatement(NULL_TABLE_NAME, SCHEMA_PUBLIC)); + stmt.executeUpdate(getDropTableStatement(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC)); + stmt.executeUpdate(getDropTableStatement(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL)); + } catch (SQLException e) { + LOG.error("Can't clean up the database:", e); + } + + super.tearDown(); + + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Ignoring exception in tearDown", e); + } + } + + + + public void setUpData(String tableName, String schema, boolean nullEntry) { + SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName); + options.setUsername(DATABASE_USER); + options.setPassword(PASSWORD); + + ConnManager manager = null; + Statement st = null; + + try { + manager = new PostgresqlManager(options); + connection = manager.getConnection(); + connection.setAutoCommit(false); + st = connection.createStatement(); + + // Create schema if not exists in dummy way (always create and ignore + // errors. + try { + st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema)); + connection.commit(); + } catch (SQLException e) { + LOG.info("Couldn't create schema " + schema + " (is o.k. as long as" + + "the schema already exists."); + connection.rollback(); + } + + String fullTableName = manager.escapeTableName(schema) + + "." + manager.escapeTableName(tableName); + LOG.info("Creating table: " + fullTableName); + + try { + // Try to remove the table first. DROP TABLE IF EXISTS didn't + // get added until pg 8.3, so we just use "DROP TABLE" and ignore + // any exception here if one occurs. + st.executeUpdate("DROP TABLE " + fullTableName); + } catch (SQLException e) { + LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)"); + // Now we need to reset the transaction. + connection.rollback(); + } + + st.executeUpdate("CREATE TABLE " + fullTableName + " (" + + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " + + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " + + manager.escapeColName("start_date") + " DATE, " + + manager.escapeColName("Salary") + " FLOAT, " + + manager.escapeColName("Fired") + " BOOL, " + + manager.escapeColName("dept") + " VARCHAR(32))"); + + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,'engineering')"); + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales')"); + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing')"); + if (nullEntry) { + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(4,'Mike',NULL,NULL,NULL,NULL)"); + + } + connection.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: " + sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != st) { + st.close(); + } + + if (null != manager) { + manager.close(); + } + } catch (SQLException sqlE) { + LOG.warn("Got SQLException when closing connection: " + sqlE); + } + } + + LOG.debug("setUp complete."); + } + + + private String [] getArgv(boolean isDirect, String tableName, + String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + args.add("--table"); + args.add(tableName); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("--password"); + args.add(PASSWORD); + args.add("--where"); + args.add("id > 1"); + args.add("-m"); + args.add("1"); + + if (isDirect) { + args.add("--direct"); + } + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + private void doImportAndVerify(boolean isDirect, String[] expectedResults, + String tableName, String... extraArgs) throws IOException { + + Path warehousePath = new Path(this.getWarehouseDir()); + Path tablePath = new Path(warehousePath, tableName); + + // if importing with merge step, directory should exist and output should be from a reducer + boolean isMerge = Arrays.asList(extraArgs).contains("--merge-key"); + Path filePath = new Path(tablePath, isMerge ? "part-r-00000" : "part-m-00000"); + + File tableFile = new File(tablePath.toString()); + if (tableFile.exists() && tableFile.isDirectory() && !isMerge) { + // remove the directory before running the import. + FileListing.recursiveDeleteDir(tableFile); + } + + String [] argv = getArgv(isDirect, tableName, extraArgs); + try { + runImport(argv); + } catch (IOException ioe) { + LOG.error("Got IOException during import: " + ioe.toString()); + ioe.printStackTrace(); + fail(ioe.toString()); + } + + File f = new File(filePath.toString()); + assertTrue("Could not find imported data file, " + f, f.exists()); + BufferedReader r = null; + try { + // Read through the file and make sure it's all there. + r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); + for (String expectedLine : expectedResults) { + assertEquals(expectedLine, r.readLine()); + } + } catch (IOException ioe) { + LOG.error("Got IOException verifying results: " + ioe.toString()); + ioe.printStackTrace(); + fail(ioe.toString()); + } finally { + IOUtils.closeStream(r); + } + } + + @Test + public void testJdbcBasedImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400.0,true,sales", + "3,Fred,2009-01-23,15.0,false,marketing", + }; + + doImportAndVerify(false, expectedResults, TABLE_NAME); + } + + @Test + public void testDirectImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400,TRUE,sales", + "3,Fred,2009-01-23,15,FALSE,marketing", + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME); + } + + @Test + public void testListTables() throws IOException { + SqoopOptions options = new SqoopOptions(new Configuration()); + options.setConnectString(CONNECT_STRING); + options.setUsername(DATABASE_USER); + options.setPassword(PASSWORD); + + ConnManager mgr = new PostgresqlManager(options); + String[] tables = mgr.listTables(); + Arrays.sort(tables); + assertTrue(TABLE_NAME + " is not found!", + Arrays.binarySearch(tables, TABLE_NAME) >= 0); + } + + @Test + public void testTableNameWithSpecialCharacter() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400.0,true,sales", + "3,Fred,2009-01-23,15.0,false,marketing", + }; + + doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME); + } + + @Test + public void testIncrementalImport() throws IOException { + String [] expectedResults = { }; + + String [] extraArgs = { "--incremental", "lastmodified", + "--check-column", "start_date", + }; + + doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs); + } + + @Test + public void testDirectIncrementalImport() throws IOException { + String [] expectedResults = { }; + + String [] extraArgs = { "--incremental", "lastmodified", + "--check-column", "start_date", + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs); + } + + @Test + public void testDirectIncrementalImportMerge() throws IOException { + String [] expectedResults = { }; + + String [] extraArgs = { "--incremental", "lastmodified", + "--check-column", "start_date", + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs); + + extraArgs = new String[] { "--incremental", "lastmodified", + "--check-column", "start_date", + "--merge-key", "id", + "--last-value", "2009-04-20" + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs); + } + + @Test + public void testDifferentSchemaImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400.0,true,sales", + "3,Fred,2009-01-23,15.0,false,marketing", + }; + + String [] extraArgs = { "--", + "--schema", SCHEMA_SPECIAL, + }; + + doImportAndVerify(false, expectedResults, DIFFERENT_TABLE_NAME, extraArgs); + } + + @Test + public void testDifferentSchemaImportDirect() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400,TRUE,sales", + "3,Fred,2009-01-23,15,FALSE,marketing", + }; + + String [] extraArgs = { "--", + "--schema", SCHEMA_SPECIAL, + }; + + doImportAndVerify(true, expectedResults, DIFFERENT_TABLE_NAME, extraArgs); + } + + @Test + public void testNullEscapeCharacters() throws Exception { + String [] expectedResults = { + "2,Bob,2009-04-20,400,TRUE,sales", + "3,Fred,2009-01-23,15,FALSE,marketing", + "4,Mike,\\N,\\N,\\N,\\N", + }; + + String [] extraArgs = { + "--null-string", "\\\\\\\\N", + "--null-non-string", "\\\\\\\\N", + }; + + doImportAndVerify(true, expectedResults, NULL_TABLE_NAME, extraArgs); + } + + @Test + public void testDifferentBooleanValues() throws Exception { + String [] expectedResults = { + "2,Bob,2009-04-20,400,REAL_TRUE,sales", + "3,Fred,2009-01-23,15,REAL_FALSE,marketing", + }; + + String [] extraArgs = { + "--", + "--boolean-true-string", "REAL_TRUE", + "--boolean-false-string", "REAL_FALSE", + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME, extraArgs); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java b/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java index be2b22c..15672b1 100644 --- a/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java +++ b/src/test/org/apache/sqoop/manager/sqlserver/ManagerCompatExport.java @@ -34,10 +34,10 @@ import org.apache.sqoop.manager.sqlserver.MSSQLTestData.KEY_STRINGS; import org.apache.sqoop.manager.sqlserver.MSSQLTestDataFileParser.DATATYPES; import org.junit.Before; import org.junit.Test; -import com.cloudera.sqoop.Sqoop; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.testutil.ExportJobTestCase; -import com.cloudera.sqoop.tool.ExportTool; +import org.apache.sqoop.Sqoop; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.testutil.ExportJobTestCase; +import org.apache.sqoop.tool.ExportTool; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java index a68ed30..293da00 100644 --- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java +++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeExportSequenceFileTest.java @@ -31,11 +31,11 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.util.ReflectionUtils; import org.apache.sqoop.manager.sqlserver.MSSQLTestDataFileParser.DATATYPES; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.lib.RecordParser; -import com.cloudera.sqoop.lib.SqoopRecord; -import com.cloudera.sqoop.tool.CodeGenTool; -import com.cloudera.sqoop.util.ClassLoaderStack; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.lib.RecordParser; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.tool.CodeGenTool; +import org.apache.sqoop.util.ClassLoaderStack; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java index a4d1822..520c4ac 100644 --- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java +++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportDelimitedFileTest.java @@ -31,12 +31,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.StringUtils; import org.apache.sqoop.manager.sqlserver.MSSQLTestDataFileParser.DATATYPES; -import com.cloudera.sqoop.Sqoop; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.orm.CompilationManager; -import com.cloudera.sqoop.testutil.CommonArgs; -import com.cloudera.sqoop.tool.ImportTool; -import com.cloudera.sqoop.util.ClassLoaderStack; +import org.apache.sqoop.Sqoop; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.orm.CompilationManager; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.tool.ImportTool; +import org.apache.sqoop.util.ClassLoaderStack; import org.junit.Ignore; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java index 409c4ad..592a78f 100644 --- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java +++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerDatatypeImportSequenceFileTest.java @@ -33,8 +33,8 @@ import org.apache.hadoop.util.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.testutil.ManagerCompatTestCase; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.testutil.ManagerCompatTestCase; import org.apache.sqoop.manager.sqlserver.MSSQLTestDataFileParser.DATATYPES; import org.apache.sqoop.manager.sqlserver.MSSQLTestData.KEY_STRINGS; http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java index 535e599..e6b0865 100644 --- a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java +++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerHiveImportTest.java @@ -26,10 +26,10 @@ import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.hive.TestHiveImport; -import com.cloudera.sqoop.testutil.CommonArgs; -import com.cloudera.sqoop.tool.SqoopTool; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.hive.TestHiveImport; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.sqoop.tool.SqoopTool; import org.junit.After; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/sqoop/blob/6984a36c/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerExportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerExportTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerExportTest.java new file mode 100644 index 0000000..b7c2b75 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerExportTest.java @@ -0,0 +1,474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.manager.sqlserver; + +import org.apache.sqoop.ConnFactory; +import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.testutil.CommonArgs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.manager.SQLServerManager; +import org.apache.sqoop.testutil.ExportJobTestCase; +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Please see instructions in SQLServerManagerImportTest. + */ +public class SQLServerManagerExportTest extends ExportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + SQLServerManagerExportTest.class.getName()); + + static final String HOST_URL = System.getProperty( + "sqoop.test.sqlserver.connectstring.host_url", + "jdbc:sqlserver://sqlserverhost:1433"); + static final String DATABASE_NAME = System.getProperty( + "sqoop.test.sqlserver.database", + "sqooptest"); + static final String DATABASE_USER = System.getProperty( + "ms.sqlserver.username", + "sqoopuser"); + static final String DATABASE_PASSWORD = System.getProperty( + "ms.sqlserver.password", + "password"); + + static final String SCHEMA_DBO = "dbo"; + static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL"; + static final String DBO_BINARY_TABLE_NAME = "BINARYTYPE_MSSQL"; + static final String SCHEMA_SCH = "sch"; + static final String SCH_TABLE_NAME = "PRIVATE_TABLE"; + static final String CONNECT_STRING = HOST_URL + + ";databaseName=" + DATABASE_NAME; + + static final String CONNECTOR_FACTORY = System.getProperty( + "sqoop.test.msserver.connector.factory", + ConnFactory.DEFAULT_FACTORY_CLASS_NAMES); + + // instance variables populated during setUp, used during tests + private SQLServerManager manager; + private Configuration conf = new Configuration(); + private Connection conn = null; + + @Override + protected Configuration getConf() { + return conf; + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + private String getDropTableStatement(String schema, String tableName) { + return "DROP TABLE IF EXISTS " + manager.escapeObjectName(schema) + + "." + manager.escapeObjectName(tableName); + } + + @Before + public void setUp() { + super.setUp(); + + SqoopOptions options = new SqoopOptions(CONNECT_STRING, + DBO_TABLE_NAME); + options.setUsername(DATABASE_USER); + options.setPassword(DATABASE_PASSWORD); + + manager = new SQLServerManager(options); + + createTableAndPopulateData(SCHEMA_DBO, DBO_TABLE_NAME); + createTableAndPopulateData(SCHEMA_SCH, SCH_TABLE_NAME); + + // To test with Microsoft SQL server connector, copy the connector jar to + // sqoop.thirdparty.lib.dir and set sqoop.test.msserver.connector.factory + // to com.microsoft.sqoop.SqlServer.MSSQLServerManagerFactory. By default, + // the built-in SQL server connector is used. + conf.setStrings(ConnFactory.FACTORY_CLASS_NAMES_KEY, CONNECTOR_FACTORY); + } + + public void createTableAndPopulateData(String schema, String table) { + String fulltableName = manager.escapeObjectName(schema) + + "." + manager.escapeObjectName(table); + + Statement stmt = null; + + // Create schema if needed + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("CREATE SCHEMA " + schema); + conn.commit(); + } catch (SQLException sqlE) { + LOG.info("Can't create schema: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Drop the existing table, if there is one. + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("DROP TABLE " + fulltableName); + conn.commit(); + } catch (SQLException sqlE) { + LOG.info("Table was not dropped: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Create and populate table + try { + conn = manager.getConnection(); + conn.setAutoCommit(false); + stmt = conn.createStatement(); + + // create the database table and populate it with data. + stmt.executeUpdate("CREATE TABLE " + fulltableName + " (" + + "id INT NOT NULL, " + + "name VARCHAR(24) NOT NULL, " + + "salary FLOAT, " + + "dept VARCHAR(32), " + + "PRIMARY KEY (id))"); + conn.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: ", sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing connection/stmt", ex); + } + } + } + + public void createSQLServerBinaryTypeTable(String schema, String table) { + String fulltableName = manager.escapeObjectName(schema) + + "." + manager.escapeObjectName(table); + + Statement stmt = null; + + // Create schema if needed + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("CREATE SCHEMA " + schema); + conn.commit(); + } catch (SQLException sqlE) { + LOG.info("Can't create schema: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Drop the existing table, if there is one. + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("DROP TABLE " + fulltableName); + conn.commit(); + } catch (SQLException sqlE) { + LOG.info("Table was not dropped: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Create and populate table + try { + conn = manager.getConnection(); + conn.setAutoCommit(false); + stmt = conn.createStatement(); + + // create the database table and populate it with data. + stmt.executeUpdate("CREATE TABLE " + fulltableName + " (" + + "id INT PRIMARY KEY, " + + "b1 BINARY(10), " + + "b2 VARBINARY(10))"); + conn.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: ", sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing connection/stmt", ex); + } + } + } + + @After + public void tearDown() { + try { + Statement stmt = conn.createStatement(); + stmt.executeUpdate(getDropTableStatement(SCHEMA_DBO, DBO_TABLE_NAME)); + stmt.executeUpdate(getDropTableStatement(SCHEMA_SCH, SCH_TABLE_NAME)); + } catch (SQLException e) { + LOG.error("Can't clean up the database:", e); + } + + super.tearDown(); + try { + conn.close(); + manager.close(); + } catch (SQLException sqlE) { + LOG.error("Got SQLException: " + sqlE.toString()); + fail("Got SQLException: " + sqlE.toString()); + } + } + + private String [] getArgv(String tableName, + String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + args.add("--table"); + args.add(tableName); + args.add("--export-dir"); + args.add(getWarehouseDir()); + args.add("--fields-terminated-by"); + args.add(","); + args.add("--lines-terminated-by"); + args.add("\\n"); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("--password"); + args.add(DATABASE_PASSWORD); + args.add("-m"); + args.add("1"); + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + protected void createTestFile(String filename, + String[] lines) + throws IOException { + new File(getWarehouseDir()).mkdirs(); + File file = new File(getWarehouseDir() + "/" + filename); + Writer output = new BufferedWriter(new FileWriter(file)); + for(String line : lines) { + output.write(line); + output.write("\n"); + } + output.close(); + } + + @Test + public void testExport() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + + runExport(getArgv(DBO_TABLE_NAME)); + + assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn); + } + + @Test + public void testExportCustomSchema() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + + String[] extra = new String[] {"--", + "--schema", + SCHEMA_SCH, + }; + + runExport(getArgv(SCH_TABLE_NAME, extra)); + + assertRowCount( + 2, + escapeObjectName(SCHEMA_SCH) + "." + escapeObjectName(SCH_TABLE_NAME), + conn + ); + } + + @Test + public void testExportTableHints() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + + String []extra = new String[] {"--", "--table-hints", + "ROWLOCK", + }; + runExport(getArgv(DBO_TABLE_NAME, extra)); + assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn); + } + + @Test + public void testExportTableHintsMultiple() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + + String []extra = new String[] {"--", "--table-hints", + "ROWLOCK,NOWAIT", + }; + runExport(getArgv(DBO_TABLE_NAME, extra)); + assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn); + } + + @Test + public void testSQLServerBinaryType() throws IOException, SQLException { + createSQLServerBinaryTypeTable(SCHEMA_DBO, DBO_BINARY_TABLE_NAME); + createTestFile("inputFile", new String[] { + "1,73 65 63 72 65 74 00 00 00 00,73 65 63 72 65 74" + }); + String[] expectedContent = {"73656372657400000000", "736563726574"}; + runExport(getArgv(DBO_BINARY_TABLE_NAME)); + assertRowCount(1, escapeObjectName(DBO_BINARY_TABLE_NAME), conn); + checkSQLBinaryTableContent(expectedContent, escapeObjectName(DBO_BINARY_TABLE_NAME), conn); + } + + /** Make sure mixed update/insert export work correctly. */ + @Test + public void testUpsertTextExport() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + // first time will be insert. + runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id", + "--update-mode", "allowinsert", "--", "--schema", SCHEMA_SCH)); + // second time will be update. + runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id", + "--update-mode", "allowinsert", "--", "--schema", SCHEMA_SCH)); + assertRowCount(2, escapeObjectName(SCHEMA_SCH) + "." + escapeObjectName(SCH_TABLE_NAME), conn); + } + + public static void checkSQLBinaryTableContent(String[] expected, String tableName, Connection connection){ + Statement stmt = null; + ResultSet rs = null; + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery("SELECT TOP 1 [b1], [b2] FROM " + tableName); + rs.next(); + assertEquals(expected[0], rs.getString("b1")); + assertEquals(expected[1], rs.getString("b2")); + } catch (SQLException e) { + LOG.error("Can't verify table content", e); + fail(); + } finally { + try { + connection.commit(); + + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + } catch (SQLException ex) { + LOG.info("Ignored exception in finally block."); + } + } + } + + public static void assertRowCount(long expected, + String tableName, + Connection connection) { + Statement stmt = null; + ResultSet rs = null; + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery("SELECT count(*) FROM " + tableName); + + rs.next(); + + assertEquals(expected, rs.getLong(1)); + } catch (SQLException e) { + LOG.error("Can't verify number of rows", e); + fail(); + } finally { + try { + connection.commit(); + + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + } catch (SQLException ex) { + LOG.info("Ignored exception in finally block."); + } + } + } + + public String escapeObjectName(String objectName) { + return "[" + objectName + "]"; + } +}
