Updated Branches: refs/heads/trunk 0b2a688d3 -> f11c3091c
SQOOP-601 Support custom schemas in PostgreSQL Connector (Jarek Jarcec Cecho via Cheolsoo Park) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/f11c3091 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/f11c3091 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/f11c3091 Branch: refs/heads/trunk Commit: f11c3091c2ef3e52717acbc40c88def238b1cc3a Parents: 0b2a688 Author: Cheolsoo Park <[email protected]> Authored: Mon Sep 17 11:04:18 2012 -0700 Committer: Cheolsoo Park <[email protected]> Committed: Mon Sep 17 11:04:18 2012 -0700 ---------------------------------------------------------------------- src/docs/user/connectors.txt | 32 ++ .../cloudera/sqoop/manager/PostgresqlManager.java | 5 - .../apache/sqoop/manager/CatalogQueryManager.java | 5 +- .../sqoop/manager/DirectPostgresqlManager.java | 4 +- .../apache/sqoop/manager/PGBulkloadManager.java | 2 +- .../apache/sqoop/manager/PostgresqlManager.java | 85 +++- src/java/org/apache/sqoop/manager/SqlManager.java | 13 + .../org/apache/sqoop/mapreduce/JdbcExportJob.java | 2 +- .../sqoop/mapreduce/JdbcUpdateExportJob.java | 3 +- .../sqoop/mapreduce/JdbcUpsertExportJob.java | 2 +- .../mapreduce/db/DataDrivenDBRecordReader.java | 8 +- src/test/com/cloudera/sqoop/ThirdPartyTests.java | 6 +- .../sqoop/manager/PostgresqlExportTest.java | 362 +++++++++++++++ .../sqoop/manager/PostgresqlImportTest.java | 350 ++++++++++++++ .../com/cloudera/sqoop/manager/PostgresqlTest.java | 306 ------------ 15 files changed, 852 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/docs/user/connectors.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt index a93f14e..930a499 100644 --- a/src/docs/user/connectors.txt +++ b/src/docs/user/connectors.txt @@ -21,6 +21,38 @@ Notes for specific connectors ----------------------------- +PostgreSQL Connector +~~~~~~~~~~~~~~~~~~~~~ + +Extra arguments +^^^^^^^^^^^^^^^ + +List of all extra arguments supported by PostgreSQL Connector is shown on table +below: + +.Supported PostgreSQL extra arguments: +[grid="all"] +`----------------------------------------`--------------------------------------- +Argument Description +--------------------------------------------------------------------------------- ++\--schema <name>+ Scheme name that sqoop should use. \ + Default is "public". +--------------------------------------------------------------------------------- + +Schema support +^^^^^^^^^^^^^^ + +If you need to work with table that is located in schema other than default one, +you need to specify extra argument +\--schema+. Custom schemas are supported for +both import and export job (optional staging table however must be present in the +same schema as target table). Example invocation: + +---- +$ sqoop import ... --table custom_table -- --schema custom_schema +---- + + + pg_bulkload connector ~~~~~~~~~~~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java index 16adeb2..354d260 100644 --- a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java +++ b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java @@ -29,10 +29,5 @@ public class PostgresqlManager public PostgresqlManager(final SqoopOptions opts) { super(opts); } - - protected PostgresqlManager(final SqoopOptions opts, boolean ignored) { - super(opts, ignored); - } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/CatalogQueryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/CatalogQueryManager.java b/src/java/org/apache/sqoop/manager/CatalogQueryManager.java index 5f2f89f..fa7661e 100644 --- a/src/java/org/apache/sqoop/manager/CatalogQueryManager.java +++ b/src/java/org/apache/sqoop/manager/CatalogQueryManager.java @@ -142,10 +142,11 @@ public abstract class CatalogQueryManager Statement s = null; ResultSet rs = null; List<String> columns = new ArrayList<String>(); + String listColumnsQuery = getListColumnsQuery(tableName); try { c = getConnection(); s = c.createStatement(); - rs = s.executeQuery(getListColumnsQuery(tableName)); + rs = s.executeQuery(listColumnsQuery); while (rs.next()) { columns.add(rs.getString(1)); } @@ -158,7 +159,7 @@ public abstract class CatalogQueryManager } catch (SQLException ce) { LOG.error("Failed to rollback transaction", ce); } - LOG.error("Failed to list columns", sqle); + LOG.error("Failed to list columns from query: " + listColumnsQuery, sqle); throw new RuntimeException(sqle); } finally { if (rs != null) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java index a557aa1..ea91fc6 100644 --- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java @@ -56,9 +56,7 @@ public class DirectPostgresqlManager DirectPostgresqlManager.class.getName()); public DirectPostgresqlManager(final SqoopOptions opts) { - // Inform superclass that we're overriding import method via alt. - // constructor. - super(opts, true); + super(opts); } private static final String PSQL_CMD = "psql"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/PGBulkloadManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java index 92174f8..091fd15 100644 --- a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java +++ b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java @@ -40,7 +40,7 @@ public class PGBulkloadManager extends PostgresqlManager { public PGBulkloadManager(final SqoopOptions opts) { - super(opts, true); + super(opts); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/PostgresqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/PostgresqlManager.java b/src/java/org/apache/sqoop/manager/PostgresqlManager.java index d18321c..7e6284e 100644 --- a/src/java/org/apache/sqoop/manager/PostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/PostgresqlManager.java @@ -21,11 +21,17 @@ package org.apache.sqoop.manager; import java.io.IOException; import java.sql.SQLException; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.util.ImportException; +import org.apache.sqoop.cli.RelatedOptions; /** * Manages connections to Postgresql databases. @@ -33,6 +39,8 @@ import com.cloudera.sqoop.util.ImportException; public class PostgresqlManager extends com.cloudera.sqoop.manager.CatalogQueryManager { + public static final String SCHEMA = "schema"; + public static final Log LOG = LogFactory.getLog( PostgresqlManager.class.getName()); @@ -42,13 +50,20 @@ public class PostgresqlManager // set to true after we warn the user that we can use direct fastpath. private static boolean warningPrinted = false; + /* + * PostgreSQL schema that we should use. + */ + private String schema; + public PostgresqlManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); - } - protected PostgresqlManager(final SqoopOptions opts, boolean ignored) { - // constructor used by subclasses to avoid the --direct warning. - super(DRIVER_CLASS, opts); + // Try to parse extra arguments + try { + parseExtraArgs(opts.getExtraArgs()); + } catch (ParseException e) { + throw new RuntimeException("Can't parse extra arguments", e); + } } @Override @@ -58,6 +73,11 @@ public class PostgresqlManager @Override public String escapeTableName(String tableName) { + // Return full table name including schema if needed + if (schema != null && !schema.isEmpty()) { + return escapeIdentifier(schema) + "." + escapeIdentifier(tableName); + } + return escapeIdentifier(tableName); } @@ -117,7 +137,7 @@ public class PostgresqlManager protected String getListTablesQuery() { return "SELECT TABLENAME FROM PG_CATALOG.PG_TABLES " - + "WHERE SCHEMANAME = (SELECT CURRENT_SCHEMA())"; + + "WHERE SCHEMANAME = " + getSchemaSqlFragment(); } @Override @@ -127,7 +147,7 @@ public class PostgresqlManager + " PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col " + "WHERE sch.OID = tab.RELNAMESPACE " + " AND tab.OID = col.ATTRELID " - + " AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) " + + " AND sch.NSPNAME = " + getSchemaSqlFragment() + " AND tab.RELNAME = '" + escapeLiteral(tableName) + "' " + " AND col.ATTNUM >= 1" + " AND col.ATTISDROPPED = 'f'"; @@ -142,12 +162,20 @@ public class PostgresqlManager + "WHERE sch.OID = tab.RELNAMESPACE " + " AND tab.OID = col.ATTRELID " + " AND tab.OID = ind.INDRELID " - + " AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) " + + " AND sch.NSPNAME = " + getSchemaSqlFragment() + " AND tab.RELNAME = '" + escapeLiteral(tableName) + "' " + " AND col.ATTNUM = ANY(ind.INDKEY) " + " AND ind.INDISPRIMARY"; } + private String getSchemaSqlFragment() { + if (schema != null && !schema.isEmpty()) { + return "'" + escapeLiteral(schema) + "'"; + } + + return "(SELECT CURRENT_SCHEMA())"; + } + private String escapeLiteral(String literal) { return literal.replace("'", "''"); } @@ -157,5 +185,48 @@ public class PostgresqlManager return "SELECT CURRENT_TIMESTAMP"; } + /** + * Parse extra arguments. + * + * @param args Extra arguments array + * @throws ParseException + */ + void parseExtraArgs(String[] args) throws ParseException { + // No-op when no extra arguments are present + if (args == null || args.length == 0) { + return; + } + + // We do not need extended abilities of SqoopParser, so we're using + // Gnu parser instead. + CommandLineParser parser = new GnuParser(); + CommandLine cmdLine = parser.parse(getExtraOptions(), args, true); + + // Apply extra options + if (cmdLine.hasOption(SCHEMA)) { + String schemaName = cmdLine.getOptionValue(SCHEMA); + LOG.info("We will use schema " + schemaName); + + this.schema = schemaName; + } + } + + /** + * Create related options for PostgreSQL extra parameters. + * + * @return + */ + @SuppressWarnings("static-access") + private RelatedOptions getExtraOptions() { + // Connection args (common) + RelatedOptions extraOptions = + new RelatedOptions("PostgreSQL extra options:"); + + extraOptions.addOption(OptionBuilder.withArgName("string").hasArg() + .withDescription("Optional schema name") + .withLongOpt(SCHEMA).create()); + + return extraOptions; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/SqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index ea961cd..3a52c6d 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -766,6 +766,10 @@ public abstract class SqlManager @Override public long getTableRowCount(String tableName) throws SQLException { release(); // Release any previous ResultSet + + // Escape used table name + tableName = escapeTableName(tableName); + long result = -1; String countQuery = "SELECT COUNT(*) FROM " + tableName; Statement stmt = null; @@ -801,6 +805,10 @@ public abstract class SqlManager @Override public void deleteAllRecords(String tableName) throws SQLException { release(); // Release any previous ResultSet + + // Escape table name + tableName = escapeTableName(tableName); + String deleteQuery = "DELETE FROM " + tableName; Statement stmt = null; try { @@ -827,6 +835,11 @@ public abstract class SqlManager public void migrateData(String fromTable, String toTable) throws SQLException { release(); // Release any previous ResultSet + + // Escape all table names + fromTable = escapeTableName(fromTable); + toTable = escapeTableName(toTable); + String updateQuery = "INSERT INTO " + toTable + " ( SELECT * FROM " + fromTable + " )"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java index b574f82..bd52f00 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java @@ -127,7 +127,7 @@ public class JdbcExportJob extends ExportJobBase { if (null == colNames) { colNames = mgr.getColumnNames(tableName); } - DBOutputFormat.setOutput(job, tableName, colNames); + DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames); job.setOutputFormatClass(getOutputFormatClass()); job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java index 7be5ed9..c8e17c2 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java @@ -132,7 +132,8 @@ public class JdbcUpdateExportJob extends ExportJobBase { outColNames[j++] = colNames[i]; } } - DBOutputFormat.setOutput(job, tableName, outColNames); + DBOutputFormat.setOutput(job, + mgr.escapeTableName(tableName), outColNames); job.setOutputFormatClass(getOutputFormatClass()); job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName); http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java index f299f98..c17b4bb 100644 --- a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java @@ -72,7 +72,7 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob { throw new IOException( "Export column names could not be determined for " + tableName); } - DBOutputFormat.setOutput(job, tableName, colNames); + DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames); String updateKeyColumns = options.getUpdateKeyCol(); if (null == updateKeyColumns) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java index 38e9fb9..a56b93d 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java @@ -98,10 +98,10 @@ public class DataDrivenDBRecordReader<T extends DBWritable> query.append(" FROM ").append(tableName); if (!dbProductName.startsWith("ORACLE") - && !dbProductName.startsWith("DB2")) { - // The AS clause is required for hsqldb, but Oracle explicitly does - // not use it, and DB2 does not allow a qualified name in alias. Since - // this is not necessary for Oracle and DB2, we do not append. + && !dbProductName.startsWith("DB2") + && !dbProductName.startsWith("POSTGRESQL")) { + // The AS clause is required for hsqldb. Some other databases might have + // issues with it, so we're skipping some of them. query.append(" AS ").append(tableName); } query.append(" WHERE "); http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/test/com/cloudera/sqoop/ThirdPartyTests.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java b/src/test/com/cloudera/sqoop/ThirdPartyTests.java index eeab7f3..949b02d 100644 --- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java +++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java @@ -30,7 +30,8 @@ import com.cloudera.sqoop.manager.MySQLCompatTest; import com.cloudera.sqoop.manager.OracleExportTest; import com.cloudera.sqoop.manager.OracleManagerTest; import com.cloudera.sqoop.manager.OracleCompatTest; -import com.cloudera.sqoop.manager.PostgresqlTest; +import com.cloudera.sqoop.manager.PostgresqlExportTest; +import com.cloudera.sqoop.manager.PostgresqlImportTest; /** * Test battery including all tests of vendor-specific ConnManager @@ -53,7 +54,8 @@ public final class ThirdPartyTests extends TestCase { suite.addTestSuite(OracleExportTest.class); suite.addTestSuite(OracleManagerTest.class); suite.addTestSuite(OracleCompatTest.class); - suite.addTestSuite(PostgresqlTest.class); + suite.addTestSuite(PostgresqlImportTest.class); + suite.addTestSuite(PostgresqlExportTest.class); return suite; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java new file mode 100644 index 0000000..be449e4 --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.sqoop.manager; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.ExportJobTestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Before; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +/** + * + */ +public class PostgresqlExportTest extends ExportJobTestCase { + public static final Log LOG = LogFactory.getLog( + PostgresqlExportTest.class.getName()); + + static final String HOST_URL = System.getProperty( + "sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + + static final String DATABASE_USER = "sqooptest"; + static final String DATABASE_NAME = "sqooptest"; + static final String TABLE_NAME = "EMPLOYEES_PG"; + static final String STAGING_TABLE_NAME = "STAGING"; + static final String SCHEMA_PUBLIC = "public"; + static final String SCHEMA_SPECIAL = "special"; + static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; + + protected Connection connection; + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Before + public void setUp() { + super.setUp(); + + LOG.debug("Setting up postgresql test: " + CONNECT_STRING); + + try { + connection = DriverManager.getConnection(HOST_URL, DATABASE_USER, null); + connection.setAutoCommit(false); + } catch (SQLException ex) { + LOG.error("Can't create connection", ex); + throw new RuntimeException(ex); + } + + createTable(TABLE_NAME, SCHEMA_PUBLIC); + createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC); + createTable(TABLE_NAME, SCHEMA_SPECIAL); + createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL); + + LOG.debug("setUp complete."); + } + + @Override + public void tearDown() { + super.tearDown(); + + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Ignoring exception in tearDown", e); + } + } + + public void createTable(String tableName, String schema) { + SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName); + options.setUsername(DATABASE_USER); + + ConnManager manager = null; + Statement st = null; + + try { + manager = new PostgresqlManager(options); + st = connection.createStatement(); + + // Create schema if not exists in dummy way (always create and ignore + // errors. + try { + st.executeUpdate("CREATE SCHEMA " + escapeTableOrSchemaName(schema)); + connection.commit(); + } catch (SQLException e) { + LOG.info("Couldn't create schema " + schema + " (is o.k. as long as" + + "the schema already exists.", e); + connection.rollback(); + } + + String fullTableName = escapeTableOrSchemaName(schema) + + "." + escapeTableOrSchemaName(tableName); + + try { + // Try to remove the table first. DROP TABLE IF EXISTS didn't + // get added until pg 8.3, so we just use "DROP TABLE" and ignore + // any exception here if one occurs. + st.executeUpdate("DROP TABLE " + fullTableName); + } catch (SQLException e) { + LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)", + e); + // Now we need to reset the transaction. + connection.rollback(); + } + + st.executeUpdate("CREATE TABLE " + fullTableName + " (" + + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " + + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " + + manager.escapeColName("start_date") + " DATE, " + + manager.escapeColName("salary") + " FLOAT, " + + manager.escapeColName("dept") + " VARCHAR(32))"); + + connection.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: " + sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != st) { + st.close(); + } + + if (null != manager) { + manager.close(); + } + } catch (SQLException sqlE) { + LOG.warn("Got SQLException when closing connection: " + sqlE); + } + } + + LOG.debug("setUp complete."); + } + + private String [] getArgv(String tableName, + String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + args.add("--table"); + args.add(tableName); + args.add("--export-dir"); + args.add(getWarehouseDir()); + args.add("--fields-terminated-by"); + args.add(","); + args.add("--lines-terminated-by"); + args.add("\\n"); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("-m"); + args.add("1"); + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + protected void createTestFile(String filename, + String[] lines) + throws IOException { + new File(getWarehouseDir()).mkdirs(); + File file = new File(getWarehouseDir() + "/" + filename); + Writer output = new BufferedWriter(new FileWriter(file)); + for(String line : lines) { + output.write(line); + output.write("\n"); + } + output.close(); + } + + public void testExport() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + runExport(getArgv(TABLE_NAME)); + + assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); + } + + public void testExportStaging() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); + } + + public void testExportDirect() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--direct"}; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection); + } + + public void testExportCustomSchema() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] {"--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, + escapeTableOrSchemaName(SCHEMA_SPECIAL) + + "." + escapeTableOrSchemaName(TABLE_NAME), + connection); + } + + public void testExportCustomSchemaStaging() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--staging-table", + STAGING_TABLE_NAME, + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, + escapeTableOrSchemaName(SCHEMA_SPECIAL) + + "." + escapeTableOrSchemaName(TABLE_NAME), + connection); + } + + public void testExportCustomSchemaStagingClear() + throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--staging-table", + STAGING_TABLE_NAME, + "--clear-staging-table", + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, + escapeTableOrSchemaName(SCHEMA_SPECIAL) + + "." + escapeTableOrSchemaName(TABLE_NAME), + connection); + } + + public void testExportCustomSchemaDirect() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }); + + String[] extra = new String[] { + "--direct", + "--", + "--schema", + SCHEMA_SPECIAL, + }; + + runExport(getArgv(TABLE_NAME, extra)); + + assertRowCount(2, + escapeTableOrSchemaName(SCHEMA_SPECIAL) + + "." + escapeTableOrSchemaName(TABLE_NAME), + connection); + } + + public static void assertRowCount(long expected, + String tableName, + Connection connection) { + Statement stmt = null; + ResultSet rs = null; + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery("SELECT count(*) FROM " + tableName); + + rs.next(); + + assertEquals(expected, rs.getLong(1)); + } catch (SQLException e) { + LOG.error("Can't verify number of rows", e); + fail(); + } finally { + try { + connection.commit(); + + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + } catch (SQLException ex) { + LOG.info("Ignored exception in finally block."); + } + } + } + + public String escapeTableOrSchemaName(String tableName) { + return "\"" + tableName + "\""; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java b/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java new file mode 100644 index 0000000..267ccd0 --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.manager; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.FileInputStream; +import java.io.File; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.ImportJobTestCase; +import com.cloudera.sqoop.util.FileListing; + +/** + * Test the PostgresqlManager and DirectPostgresqlManager implementations. + * The former uses the postgres JDBC driver to perform an import; + * the latter uses pg_dump to facilitate it. + * + * Since this requires a Postgresql installation on your local machine to use, + * this class is named in such a way that Hadoop's default QA process does not + * run it. You need to run this manually with -Dtestcase=PostgresqlImportTest or + * -Dthirdparty=true. + * + * You need to put Postgresql's JDBC driver library into a location where + * Hadoop can access it (e.g., $HADOOP_HOME/lib). + * + * To configure a postgresql database to allow local connections, put the + * following in /etc/postgresql/8.3/main/pg_hba.conf: + * local all all trust + * host all all 127.0.0.1/32 trust + * host all all ::1/128 trust + * + * ... and comment out any other lines referencing 127.0.0.1 or ::1. + * + * Also in the file /etc/postgresql/8.3/main/postgresql.conf, uncomment + * the line that starts with listen_addresses and set its value to '*' as + * follows + * listen_addresses = '*' + * + * For postgresql 8.1, this may be in /var/lib/pgsql/data, instead. You may + * need to restart the postgresql service after modifying this file. + * + * You should also create a sqooptest user and database: + * + * $ sudo -u postgres psql -U postgres template1 + * template1=> CREATE USER sqooptest; + * template1=> CREATE DATABASE sqooptest; + * template1=> GRANT ALL ON DATABASE sqooptest TO sqooptest; + * template1=> \q + * + */ +public class PostgresqlImportTest extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + PostgresqlImportTest.class.getName()); + + static final String HOST_URL = System.getProperty( + "sqoop.test.postgresql.connectstring.host_url", + "jdbc:postgresql://localhost/"); + + static final String DATABASE_USER = "sqooptest"; + static final String DATABASE_NAME = "sqooptest"; + static final String TABLE_NAME = "EMPLOYEES_PG"; + static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's"; + static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE"; + static final String SCHEMA_PUBLIC = "public"; + static final String SCHEMA_SPECIAL = "special"; + static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Before + public void setUp() { + super.setUp(); + + LOG.debug("Setting up another postgresql test: " + CONNECT_STRING); + + setUpData(TABLE_NAME, SCHEMA_PUBLIC); + setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC); + setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL); + + LOG.debug("setUp complete."); + } + + public void setUpData(String tableName, String schema) { + SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName); + options.setUsername(DATABASE_USER); + + ConnManager manager = null; + Connection connection = null; + Statement st = null; + + try { + manager = new PostgresqlManager(options); + connection = manager.getConnection(); + connection.setAutoCommit(false); + st = connection.createStatement(); + + // Create schema if not exists in dummy way (always create and ignore + // errors. + try { + st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema)); + connection.commit(); + } catch (SQLException e) { + LOG.info("Couldn't create schema " + schema + " (is o.k. as long as" + + "the schema already exists.", e); + connection.rollback(); + } + + String fullTableName = manager.escapeTableName(schema) + + "." + manager.escapeTableName(tableName); + + try { + // Try to remove the table first. DROP TABLE IF EXISTS didn't + // get added until pg 8.3, so we just use "DROP TABLE" and ignore + // any exception here if one occurs. + st.executeUpdate("DROP TABLE " + fullTableName); + } catch (SQLException e) { + LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)", + e); + // Now we need to reset the transaction. + connection.rollback(); + } + + st.executeUpdate("CREATE TABLE " + fullTableName + " (" + + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " + + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " + + manager.escapeColName("start_date") + " DATE, " + + manager.escapeColName("salary") + " FLOAT, " + + manager.escapeColName("dept") + " VARCHAR(32))"); + + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(1,'Aaron','2009-05-14',1000000.00,'engineering')"); + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(2,'Bob','2009-04-20',400.00,'sales')"); + st.executeUpdate("INSERT INTO " + fullTableName + + " VALUES(3,'Fred','2009-01-23',15.00,'marketing')"); + connection.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: " + sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != st) { + st.close(); + } + + if (null != manager) { + manager.close(); + } + } catch (SQLException sqlE) { + LOG.warn("Got SQLException when closing connection: " + sqlE); + } + } + + LOG.debug("setUp complete."); + } + + + private String [] getArgv(boolean isDirect, String tableName, + String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + args.add("--table"); + args.add(tableName); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("--where"); + args.add("id > 1"); + + if (isDirect) { + args.add("--direct"); + } + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + private void doImportAndVerify(boolean isDirect, String [] expectedResults, + String tableName, String... extraArgs) throws IOException { + + Path warehousePath = new Path(this.getWarehouseDir()); + Path tablePath = new Path(warehousePath, tableName); + + Path filePath; + if (isDirect) { + filePath = new Path(tablePath, "data-00000"); + } else { + filePath = new Path(tablePath, "part-m-00000"); + } + + File tableFile = new File(tablePath.toString()); + if (tableFile.exists() && tableFile.isDirectory()) { + // remove the directory before running the import. + FileListing.recursiveDeleteDir(tableFile); + } + + String [] argv = getArgv(isDirect, tableName, extraArgs); + try { + runImport(argv); + } catch (IOException ioe) { + LOG.error("Got IOException during import: " + ioe.toString()); + ioe.printStackTrace(); + fail(ioe.toString()); + } + + File f = new File(filePath.toString()); + assertTrue("Could not find imported data file, " + f, f.exists()); + BufferedReader r = null; + try { + // Read through the file and make sure it's all there. + r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); + for (String expectedLine : expectedResults) { + assertEquals(expectedLine, r.readLine()); + } + } catch (IOException ioe) { + LOG.error("Got IOException verifying results: " + ioe.toString()); + ioe.printStackTrace(); + fail(ioe.toString()); + } finally { + IOUtils.closeStream(r); + } + } + + @Test + public void testJdbcBasedImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400.0,sales", + "3,Fred,2009-01-23,15.0,marketing", + }; + + doImportAndVerify(false, expectedResults, TABLE_NAME); + } + + @Test + public void testDirectImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }; + + doImportAndVerify(true, expectedResults, TABLE_NAME); + } + + @Test + public void testListTables() throws IOException { + SqoopOptions options = new SqoopOptions(new Configuration()); + options.setConnectString(CONNECT_STRING); + options.setUsername(DATABASE_USER); + + ConnManager mgr = new PostgresqlManager(options); + String[] tables = mgr.listTables(); + Arrays.sort(tables); + assertTrue(TABLE_NAME + " is not found!", + Arrays.binarySearch(tables, TABLE_NAME) >= 0); + } + + @Test + public void testTableNameWithSpecialCharacter() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400.0,sales", + "3,Fred,2009-01-23,15.0,marketing", + }; + + doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME); + } + + @Test + public void testIncrementalImport() throws IOException { + String [] expectedResults = { }; + + String [] extraArgs = { "--incremental", "lastmodified", + "--check-column", "start_date", + }; + + doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs); + } + + @Test + public void testDifferentSchemaImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400.0,sales", + "3,Fred,2009-01-23,15.0,marketing", + }; + + String [] extraArgs = { "--", + "--schema", SCHEMA_SPECIAL, + }; + + doImportAndVerify(false, expectedResults, DIFFERENT_TABLE_NAME, extraArgs); + } + + @Test + public void testDifferentSchemaImportDirect() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing", + }; + + String [] extraArgs = { "--", + "--schema", SCHEMA_SPECIAL, + }; + + doImportAndVerify(true, expectedResults, DIFFERENT_TABLE_NAME, extraArgs); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java b/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java deleted file mode 100644 index 0dfd1fc..0000000 --- a/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.sqoop.manager; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.FileInputStream; -import java.io.File; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Arrays; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.junit.Before; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.testutil.CommonArgs; -import com.cloudera.sqoop.testutil.ImportJobTestCase; -import com.cloudera.sqoop.util.FileListing; - -/** - * Test the PostgresqlManager and DirectPostgresqlManager implementations. - * The former uses the postgres JDBC driver to perform an import; - * the latter uses pg_dump to facilitate it. - * - * Since this requires a Postgresql installation on your local machine to use, - * this class is named in such a way that Hadoop's default QA process does not - * run it. You need to run this manually with -Dtestcase=PostgresqlTest or - * -Dthirdparty=true. - * - * You need to put Postgresql's JDBC driver library into a location where - * Hadoop can access it (e.g., $HADOOP_HOME/lib). - * - * To configure a postgresql database to allow local connections, put the - * following in /etc/postgresql/8.3/main/pg_hba.conf: - * local all all trust - * host all all 127.0.0.1/32 trust - * host all all ::1/128 trust - * - * ... and comment out any other lines referencing 127.0.0.1 or ::1. - * - * Also in the file /etc/postgresql/8.3/main/postgresql.conf, uncomment - * the line that starts with listen_addresses and set its value to '*' as - * follows - * listen_addresses = '*' - * - * For postgresql 8.1, this may be in /var/lib/pgsql/data, instead. You may - * need to restart the postgresql service after modifying this file. - * - * You should also create a sqooptest user and database: - * - * $ sudo -u postgres psql -U postgres template1 - * template1=> CREATE USER sqooptest; - * template1=> CREATE DATABASE sqooptest; - * template1=> \q - * - */ -public class PostgresqlTest extends ImportJobTestCase { - - public static final Log LOG = LogFactory.getLog( - PostgresqlTest.class.getName()); - - static final String HOST_URL = System.getProperty( - "sqoop.test.postgresql.connectstring.host_url", - "jdbc:postgresql://localhost/"); - - static final String DATABASE_USER = "sqooptest"; - static final String DATABASE_NAME = "sqooptest"; - static final String TABLE_NAME = "EMPLOYEES_PG"; - static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's"; - static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; - - @Override - protected boolean useHsqldbTestServer() { - return false; - } - - @Before - public void setUp() { - super.setUp(); - - LOG.debug("Setting up another postgresql test: " + CONNECT_STRING); - - setUpData(TABLE_NAME); - setUpData(SPECIAL_TABLE_NAME); - - LOG.debug("setUp complete."); - } - - public void setUpData(String tableName) { - SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName); - options.setUsername(DATABASE_USER); - - ConnManager manager = null; - Connection connection = null; - Statement st = null; - - try { - manager = new PostgresqlManager(options); - connection = manager.getConnection(); - connection.setAutoCommit(false); - st = connection.createStatement(); - - // create the database table and populate it with data. - - try { - // Try to remove the table first. DROP TABLE IF EXISTS didn't - // get added until pg 8.3, so we just use "DROP TABLE" and ignore - // any exception here if one occurs. - st.executeUpdate("DROP TABLE " + manager.escapeTableName(tableName)); - } catch (SQLException e) { - LOG.info("Couldn't drop table " + tableName + " (ok)"); - LOG.info(e.toString()); - // Now we need to reset the transaction. - connection.rollback(); - } - - st.executeUpdate("CREATE TABLE " + manager.escapeTableName(tableName) - + " (" - + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, " - + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, " - + manager.escapeColName("start_date") + " DATE, " - + manager.escapeColName("salary") + " FLOAT, " - + manager.escapeColName("dept") + " VARCHAR(32))"); - - st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName) - + " VALUES(1,'Aaron','2009-05-14',1000000.00,'engineering')"); - st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName) - + " VALUES(2,'Bob','2009-04-20',400.00,'sales')"); - st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName) - + " VALUES(3,'Fred','2009-01-23',15.00,'marketing')"); - connection.commit(); - } catch (SQLException sqlE) { - LOG.error("Encountered SQL Exception: " + sqlE); - sqlE.printStackTrace(); - fail("SQLException when running test setUp(): " + sqlE); - } finally { - try { - if (null != st) { - st.close(); - } - - if (null != manager) { - manager.close(); - } - } catch (SQLException sqlE) { - LOG.warn("Got SQLException when closing connection: " + sqlE); - } - } - - LOG.debug("setUp complete."); - } - - - private String [] getArgv(boolean isDirect, String tableName, - String... extraArgs) { - ArrayList<String> args = new ArrayList<String>(); - - CommonArgs.addHadoopFlags(args); - - args.add("--table"); - args.add(tableName); - args.add("--warehouse-dir"); - args.add(getWarehouseDir()); - args.add("--connect"); - args.add(CONNECT_STRING); - args.add("--username"); - args.add(DATABASE_USER); - args.add("--where"); - args.add("id > 1"); - - if (isDirect) { - args.add("--direct"); - } - - for (String arg : extraArgs) { - args.add(arg); - } - - return args.toArray(new String[0]); - } - - private void doImportAndVerify(boolean isDirect, String [] expectedResults, - String tableName, String... extraArgs) throws IOException { - - Path warehousePath = new Path(this.getWarehouseDir()); - Path tablePath = new Path(warehousePath, tableName); - - Path filePath; - if (isDirect) { - filePath = new Path(tablePath, "data-00000"); - } else { - filePath = new Path(tablePath, "part-m-00000"); - } - - File tableFile = new File(tablePath.toString()); - if (tableFile.exists() && tableFile.isDirectory()) { - // remove the directory before running the import. - FileListing.recursiveDeleteDir(tableFile); - } - - String [] argv = getArgv(isDirect, tableName, extraArgs); - try { - runImport(argv); - } catch (IOException ioe) { - LOG.error("Got IOException during import: " + ioe.toString()); - ioe.printStackTrace(); - fail(ioe.toString()); - } - - File f = new File(filePath.toString()); - assertTrue("Could not find imported data file, " + f, f.exists()); - BufferedReader r = null; - try { - // Read through the file and make sure it's all there. - r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); - for (String expectedLine : expectedResults) { - assertEquals(expectedLine, r.readLine()); - } - } catch (IOException ioe) { - LOG.error("Got IOException verifying results: " + ioe.toString()); - ioe.printStackTrace(); - fail(ioe.toString()); - } finally { - IOUtils.closeStream(r); - } - } - - @Test - public void testJdbcBasedImport() throws IOException { - String [] expectedResults = { - "2,Bob,2009-04-20,400.0,sales", - "3,Fred,2009-01-23,15.0,marketing", - }; - - doImportAndVerify(false, expectedResults, TABLE_NAME); - } - - @Test - public void testDirectImport() throws IOException { - String [] expectedResults = { - "2,Bob,2009-04-20,400,sales", - "3,Fred,2009-01-23,15,marketing", - }; - - doImportAndVerify(true, expectedResults, TABLE_NAME); - } - - @Test - public void testListTables() throws IOException { - SqoopOptions options = new SqoopOptions(new Configuration()); - options.setConnectString(CONNECT_STRING); - options.setUsername(DATABASE_USER); - - ConnManager mgr = new PostgresqlManager(options); - String[] tables = mgr.listTables(); - Arrays.sort(tables); - assertTrue(TABLE_NAME + " is not found!", - Arrays.binarySearch(tables, TABLE_NAME) >= 0); - } - - @Test - public void testTableNameWithSpecialCharacter() throws IOException { - String [] expectedResults = { - "2,Bob,2009-04-20,400.0,sales", - "3,Fred,2009-01-23,15.0,marketing", - }; - - doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME); - } - - @Test - public void testIncrementalImport() throws IOException { - String [] expectedResults = { }; - - String [] extraArgs = { "--incremental", "lastmodified", - "--check-column", "start_date", - }; - - doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs); - } -}
