Updated Branches:
  refs/heads/trunk 0b2a688d3 -> f11c3091c

SQOOP-601 Support custom schemas in PostgreSQL Connector

(Jarek Jarcec Cecho via Cheolsoo Park)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/f11c3091
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/f11c3091
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/f11c3091

Branch: refs/heads/trunk
Commit: f11c3091c2ef3e52717acbc40c88def238b1cc3a
Parents: 0b2a688
Author: Cheolsoo Park <[email protected]>
Authored: Mon Sep 17 11:04:18 2012 -0700
Committer: Cheolsoo Park <[email protected]>
Committed: Mon Sep 17 11:04:18 2012 -0700

----------------------------------------------------------------------
 src/docs/user/connectors.txt                       |   32 ++
 .../cloudera/sqoop/manager/PostgresqlManager.java  |    5 -
 .../apache/sqoop/manager/CatalogQueryManager.java  |    5 +-
 .../sqoop/manager/DirectPostgresqlManager.java     |    4 +-
 .../apache/sqoop/manager/PGBulkloadManager.java    |    2 +-
 .../apache/sqoop/manager/PostgresqlManager.java    |   85 +++-
 src/java/org/apache/sqoop/manager/SqlManager.java  |   13 +
 .../org/apache/sqoop/mapreduce/JdbcExportJob.java  |    2 +-
 .../sqoop/mapreduce/JdbcUpdateExportJob.java       |    3 +-
 .../sqoop/mapreduce/JdbcUpsertExportJob.java       |    2 +-
 .../mapreduce/db/DataDrivenDBRecordReader.java     |    8 +-
 src/test/com/cloudera/sqoop/ThirdPartyTests.java   |    6 +-
 .../sqoop/manager/PostgresqlExportTest.java        |  362 +++++++++++++++
 .../sqoop/manager/PostgresqlImportTest.java        |  350 ++++++++++++++
 .../com/cloudera/sqoop/manager/PostgresqlTest.java |  306 ------------
 15 files changed, 852 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/docs/user/connectors.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index a93f14e..930a499 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -21,6 +21,38 @@
 Notes for specific connectors
 -----------------------------
 
+PostgreSQL Connector
+~~~~~~~~~~~~~~~~~~~~~
+
+Extra arguments
+^^^^^^^^^^^^^^^
+
+List of all extra arguments supported by PostgreSQL Connector is shown on table
+below:
+
+.Supported PostgreSQL extra arguments:
+[grid="all"]
+`----------------------------------------`---------------------------------------
+Argument                                 Description
+---------------------------------------------------------------------------------
++\--schema <name>+                       Scheme name that sqoop should use. \
+                                         Default is "public".
+---------------------------------------------------------------------------------
+
+Schema support
+^^^^^^^^^^^^^^
+
+If you need to work with table that is located in schema other than default 
one,
+you need to specify extra argument +\--schema+. Custom schemas are supported 
for
+both import and export job (optional staging table however must be present in 
the
+same schema as target table). Example invocation:
+
+----
+$ sqoop import ... --table custom_table -- --schema custom_schema
+----
+
+
+
 pg_bulkload connector
 ~~~~~~~~~~~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java 
b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
index 16adeb2..354d260 100644
--- a/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
+++ b/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
@@ -29,10 +29,5 @@ public class PostgresqlManager
   public PostgresqlManager(final SqoopOptions opts) {
     super(opts);
   }
-
-  protected PostgresqlManager(final SqoopOptions opts, boolean ignored) {
-    super(opts, ignored);
-  }
-
 }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/CatalogQueryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/CatalogQueryManager.java 
b/src/java/org/apache/sqoop/manager/CatalogQueryManager.java
index 5f2f89f..fa7661e 100644
--- a/src/java/org/apache/sqoop/manager/CatalogQueryManager.java
+++ b/src/java/org/apache/sqoop/manager/CatalogQueryManager.java
@@ -142,10 +142,11 @@ public abstract class CatalogQueryManager
     Statement s = null;
     ResultSet rs = null;
     List<String> columns = new ArrayList<String>();
+    String listColumnsQuery = getListColumnsQuery(tableName);
     try {
       c = getConnection();
       s = c.createStatement();
-      rs = s.executeQuery(getListColumnsQuery(tableName));
+      rs = s.executeQuery(listColumnsQuery);
       while (rs.next()) {
         columns.add(rs.getString(1));
       }
@@ -158,7 +159,7 @@ public abstract class CatalogQueryManager
       } catch (SQLException ce) {
         LOG.error("Failed to rollback transaction", ce);
       }
-      LOG.error("Failed to list columns", sqle);
+      LOG.error("Failed to list columns from query: " + listColumnsQuery, 
sqle);
       throw new RuntimeException(sqle);
     } finally {
       if (rs != null) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java 
b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
index a557aa1..ea91fc6 100644
--- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
@@ -56,9 +56,7 @@ public class DirectPostgresqlManager
       DirectPostgresqlManager.class.getName());
 
   public DirectPostgresqlManager(final SqoopOptions opts) {
-    // Inform superclass that we're overriding import method via alt.
-    // constructor.
-    super(opts, true);
+    super(opts);
   }
 
   private static final String PSQL_CMD = "psql";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java 
b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
index 92174f8..091fd15 100644
--- a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
+++ b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
@@ -40,7 +40,7 @@ public class PGBulkloadManager extends PostgresqlManager {
 
 
   public PGBulkloadManager(final SqoopOptions opts) {
-    super(opts, true);
+    super(opts);
   }
 
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/PostgresqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/PostgresqlManager.java 
b/src/java/org/apache/sqoop/manager/PostgresqlManager.java
index d18321c..7e6284e 100644
--- a/src/java/org/apache/sqoop/manager/PostgresqlManager.java
+++ b/src/java/org/apache/sqoop/manager/PostgresqlManager.java
@@ -21,11 +21,17 @@ package org.apache.sqoop.manager;
 import java.io.IOException;
 import java.sql.SQLException;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.util.ImportException;
+import org.apache.sqoop.cli.RelatedOptions;
 
 /**
  * Manages connections to Postgresql databases.
@@ -33,6 +39,8 @@ import com.cloudera.sqoop.util.ImportException;
 public class PostgresqlManager
     extends com.cloudera.sqoop.manager.CatalogQueryManager {
 
+  public static final String SCHEMA = "schema";
+
   public static final Log LOG = LogFactory.getLog(
       PostgresqlManager.class.getName());
 
@@ -42,13 +50,20 @@ public class PostgresqlManager
   // set to true after we warn the user that we can use direct fastpath.
   private static boolean warningPrinted = false;
 
+  /*
+   * PostgreSQL schema that we should use.
+   */
+  private String schema;
+
   public PostgresqlManager(final SqoopOptions opts) {
     super(DRIVER_CLASS, opts);
-  }
 
-  protected PostgresqlManager(final SqoopOptions opts, boolean ignored) {
-    // constructor used by subclasses to avoid the --direct warning.
-    super(DRIVER_CLASS, opts);
+    // Try to parse extra arguments
+    try {
+      parseExtraArgs(opts.getExtraArgs());
+    } catch (ParseException e) {
+      throw new RuntimeException("Can't parse extra arguments", e);
+    }
   }
 
   @Override
@@ -58,6 +73,11 @@ public class PostgresqlManager
 
   @Override
   public String escapeTableName(String tableName) {
+    // Return full table name including schema if needed
+    if (schema != null && !schema.isEmpty()) {
+      return escapeIdentifier(schema) + "." + escapeIdentifier(tableName);
+    }
+
     return escapeIdentifier(tableName);
   }
 
@@ -117,7 +137,7 @@ public class PostgresqlManager
   protected String getListTablesQuery() {
     return
       "SELECT TABLENAME FROM PG_CATALOG.PG_TABLES "
-    + "WHERE SCHEMANAME = (SELECT CURRENT_SCHEMA())";
+    + "WHERE SCHEMANAME = " + getSchemaSqlFragment();
   }
 
   @Override
@@ -127,7 +147,7 @@ public class PostgresqlManager
     + "  PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col "
     + "WHERE sch.OID = tab.RELNAMESPACE "
     + "  AND tab.OID = col.ATTRELID "
-    + "  AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
+    + "  AND sch.NSPNAME = " + getSchemaSqlFragment()
     + "  AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
     + "  AND col.ATTNUM >= 1"
     + "  AND col.ATTISDROPPED = 'f'";
@@ -142,12 +162,20 @@ public class PostgresqlManager
     + "WHERE sch.OID = tab.RELNAMESPACE "
     + "  AND tab.OID = col.ATTRELID "
     + "  AND tab.OID = ind.INDRELID "
-    + "  AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
+    + "  AND sch.NSPNAME = " + getSchemaSqlFragment()
     + "  AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
     + "  AND col.ATTNUM = ANY(ind.INDKEY) "
     + "  AND ind.INDISPRIMARY";
   }
 
+  private String getSchemaSqlFragment() {
+    if (schema != null && !schema.isEmpty()) {
+      return "'" + escapeLiteral(schema) + "'";
+    }
+
+    return "(SELECT CURRENT_SCHEMA())";
+  }
+
   private String escapeLiteral(String literal) {
     return literal.replace("'", "''");
   }
@@ -157,5 +185,48 @@ public class PostgresqlManager
     return "SELECT CURRENT_TIMESTAMP";
   }
 
+  /**
+   * Parse extra arguments.
+   *
+   * @param args Extra arguments array
+   * @throws ParseException
+   */
+  void parseExtraArgs(String[] args) throws ParseException {
+    // No-op when no extra arguments are present
+    if (args == null || args.length == 0) {
+      return;
+    }
+
+    // We do not need extended abilities of SqoopParser, so we're using
+    // Gnu parser instead.
+    CommandLineParser parser = new GnuParser();
+    CommandLine cmdLine = parser.parse(getExtraOptions(), args, true);
+
+    // Apply extra options
+    if (cmdLine.hasOption(SCHEMA)) {
+      String schemaName = cmdLine.getOptionValue(SCHEMA);
+      LOG.info("We will use schema " + schemaName);
+
+      this.schema = schemaName;
+    }
+  }
+
+  /**
+   * Create related options for PostgreSQL extra parameters.
+   *
+   * @return
+   */
+  @SuppressWarnings("static-access")
+  private RelatedOptions getExtraOptions() {
+    // Connection args (common)
+    RelatedOptions extraOptions =
+      new RelatedOptions("PostgreSQL extra options:");
+
+    extraOptions.addOption(OptionBuilder.withArgName("string").hasArg()
+      .withDescription("Optional schema name")
+      .withLongOpt(SCHEMA).create());
+
+    return extraOptions;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/manager/SqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java 
b/src/java/org/apache/sqoop/manager/SqlManager.java
index ea961cd..3a52c6d 100644
--- a/src/java/org/apache/sqoop/manager/SqlManager.java
+++ b/src/java/org/apache/sqoop/manager/SqlManager.java
@@ -766,6 +766,10 @@ public abstract class SqlManager
   @Override
   public long getTableRowCount(String tableName) throws SQLException {
     release(); // Release any previous ResultSet
+
+    // Escape used table name
+    tableName = escapeTableName(tableName);
+
     long result = -1;
     String countQuery = "SELECT COUNT(*) FROM " + tableName;
     Statement stmt = null;
@@ -801,6 +805,10 @@ public abstract class SqlManager
   @Override
   public void deleteAllRecords(String tableName) throws SQLException {
     release(); // Release any previous ResultSet
+
+    // Escape table name
+    tableName = escapeTableName(tableName);
+
     String deleteQuery = "DELETE FROM " + tableName;
     Statement stmt = null;
     try {
@@ -827,6 +835,11 @@ public abstract class SqlManager
   public void migrateData(String fromTable, String toTable)
     throws SQLException {
     release(); // Release any previous ResultSet
+
+    // Escape all table names
+    fromTable = escapeTableName(fromTable);
+    toTable = escapeTableName(toTable);
+
     String updateQuery = "INSERT INTO " + toTable
           + " ( SELECT * FROM " + fromTable + " )";
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java 
b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index b574f82..bd52f00 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -127,7 +127,7 @@ public class JdbcExportJob extends ExportJobBase {
       if (null == colNames) {
         colNames = mgr.getColumnNames(tableName);
       }
-      DBOutputFormat.setOutput(job, tableName, colNames);
+      DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames);
 
       job.setOutputFormatClass(getOutputFormatClass());
       job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java 
b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index 7be5ed9..c8e17c2 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -132,7 +132,8 @@ public class JdbcUpdateExportJob extends ExportJobBase {
           outColNames[j++] = colNames[i];
         }
       }
-      DBOutputFormat.setOutput(job, tableName, outColNames);
+      DBOutputFormat.setOutput(job,
+        mgr.escapeTableName(tableName), outColNames);
 
       job.setOutputFormatClass(getOutputFormatClass());
       job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java 
b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
index f299f98..c17b4bb 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
@@ -72,7 +72,7 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
         throw new IOException(
             "Export column names could not be determined for " + tableName);
       }
-      DBOutputFormat.setOutput(job, tableName, colNames);
+      DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames);
 
       String updateKeyColumns = options.getUpdateKeyCol();
       if (null == updateKeyColumns) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java 
b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
index 38e9fb9..a56b93d 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
@@ -98,10 +98,10 @@ public class DataDrivenDBRecordReader<T extends DBWritable>
 
       query.append(" FROM ").append(tableName);
       if (!dbProductName.startsWith("ORACLE")
-          && !dbProductName.startsWith("DB2")) {
-        // The AS clause is required for hsqldb, but Oracle explicitly does
-        // not use it, and DB2 does not allow a qualified name in alias. Since
-        // this is not necessary for Oracle and DB2, we do not append.
+          && !dbProductName.startsWith("DB2")
+          && !dbProductName.startsWith("POSTGRESQL")) {
+        // The AS clause is required for hsqldb. Some other databases might 
have
+        // issues with it, so we're skipping some of them.
         query.append(" AS ").append(tableName);
       }
       query.append(" WHERE ");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/test/com/cloudera/sqoop/ThirdPartyTests.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java 
b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
index eeab7f3..949b02d 100644
--- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java
+++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java
@@ -30,7 +30,8 @@ import com.cloudera.sqoop.manager.MySQLCompatTest;
 import com.cloudera.sqoop.manager.OracleExportTest;
 import com.cloudera.sqoop.manager.OracleManagerTest;
 import com.cloudera.sqoop.manager.OracleCompatTest;
-import com.cloudera.sqoop.manager.PostgresqlTest;
+import com.cloudera.sqoop.manager.PostgresqlExportTest;
+import com.cloudera.sqoop.manager.PostgresqlImportTest;
 
 /**
  * Test battery including all tests of vendor-specific ConnManager
@@ -53,7 +54,8 @@ public final class ThirdPartyTests extends TestCase {
     suite.addTestSuite(OracleExportTest.class);
     suite.addTestSuite(OracleManagerTest.class);
     suite.addTestSuite(OracleCompatTest.class);
-    suite.addTestSuite(PostgresqlTest.class);
+    suite.addTestSuite(PostgresqlImportTest.class);
+    suite.addTestSuite(PostgresqlExportTest.class);
 
     return suite;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java 
b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
new file mode 100644
index 0000000..be449e4
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.sqoop.manager;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ExportJobTestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Before;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+/**
+ *
+ */
+public class PostgresqlExportTest extends ExportJobTestCase {
+  public static final Log LOG = LogFactory.getLog(
+      PostgresqlExportTest.class.getName());
+
+  static final String HOST_URL = System.getProperty(
+      "sqoop.test.postgresql.connectstring.host_url",
+      "jdbc:postgresql://localhost/");
+
+  static final String DATABASE_USER = "sqooptest";
+  static final String DATABASE_NAME = "sqooptest";
+  static final String TABLE_NAME = "EMPLOYEES_PG";
+  static final String STAGING_TABLE_NAME = "STAGING";
+  static final String SCHEMA_PUBLIC = "public";
+  static final String SCHEMA_SPECIAL = "special";
+  static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+
+  protected Connection connection;
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+
+    LOG.debug("Setting up postgresql test: " + CONNECT_STRING);
+
+    try {
+      connection = DriverManager.getConnection(HOST_URL, DATABASE_USER, null);
+      connection.setAutoCommit(false);
+    } catch (SQLException ex) {
+      LOG.error("Can't create connection", ex);
+      throw new RuntimeException(ex);
+    }
+
+    createTable(TABLE_NAME, SCHEMA_PUBLIC);
+    createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC);
+    createTable(TABLE_NAME, SCHEMA_SPECIAL);
+    createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL);
+
+    LOG.debug("setUp complete.");
+  }
+
+  @Override
+  public void tearDown() {
+    super.tearDown();
+
+    try {
+      connection.close();
+    } catch (SQLException e) {
+      LOG.error("Ignoring exception in tearDown", e);
+    }
+  }
+
+  public void createTable(String tableName, String schema) {
+    SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
+    options.setUsername(DATABASE_USER);
+
+    ConnManager manager = null;
+    Statement st = null;
+
+    try {
+      manager = new PostgresqlManager(options);
+      st = connection.createStatement();
+
+      // Create schema if not exists in dummy way (always create and ignore
+      // errors.
+      try {
+        st.executeUpdate("CREATE SCHEMA " + escapeTableOrSchemaName(schema));
+        connection.commit();
+      } catch (SQLException e) {
+        LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
+          + "the schema already exists.", e);
+        connection.rollback();
+      }
+
+      String fullTableName = escapeTableOrSchemaName(schema)
+        + "." + escapeTableOrSchemaName(tableName);
+
+      try {
+        // Try to remove the table first. DROP TABLE IF EXISTS didn't
+        // get added until pg 8.3, so we just use "DROP TABLE" and ignore
+        // any exception here if one occurs.
+        st.executeUpdate("DROP TABLE " + fullTableName);
+      } catch (SQLException e) {
+        LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)",
+          e);
+        // Now we need to reset the transaction.
+        connection.rollback();
+      }
+
+      st.executeUpdate("CREATE TABLE " + fullTableName + " ("
+        + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
+        + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
+        + manager.escapeColName("start_date") + " DATE, "
+        + manager.escapeColName("salary") + " FLOAT, "
+        + manager.escapeColName("dept") + " VARCHAR(32))");
+
+      connection.commit();
+    } catch (SQLException sqlE) {
+      LOG.error("Encountered SQL Exception: " + sqlE);
+      sqlE.printStackTrace();
+      fail("SQLException when running test setUp(): " + sqlE);
+    } finally {
+      try {
+        if (null != st) {
+          st.close();
+        }
+
+        if (null != manager) {
+          manager.close();
+        }
+      } catch (SQLException sqlE) {
+        LOG.warn("Got SQLException when closing connection: " + sqlE);
+      }
+    }
+
+    LOG.debug("setUp complete.");
+  }
+
+  private String [] getArgv(String tableName,
+                            String... extraArgs) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    CommonArgs.addHadoopFlags(args);
+
+    args.add("--table");
+    args.add(tableName);
+    args.add("--export-dir");
+    args.add(getWarehouseDir());
+    args.add("--fields-terminated-by");
+    args.add(",");
+    args.add("--lines-terminated-by");
+    args.add("\\n");
+    args.add("--connect");
+    args.add(CONNECT_STRING);
+    args.add("--username");
+    args.add(DATABASE_USER);
+    args.add("-m");
+    args.add("1");
+
+    for (String arg : extraArgs) {
+      args.add(arg);
+    }
+
+    return args.toArray(new String[0]);
+  }
+
+  protected void createTestFile(String filename,
+                                String[] lines)
+                                throws IOException {
+    new File(getWarehouseDir()).mkdirs();
+    File file = new File(getWarehouseDir() + "/" + filename);
+    Writer output = new BufferedWriter(new FileWriter(file));
+    for(String line : lines) {
+      output.write(line);
+      output.write("\n");
+    }
+    output.close();
+  }
+
+  public void testExport() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    });
+
+    runExport(getArgv(TABLE_NAME));
+
+    assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
+  }
+
+  public void testExportStaging() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    });
+
+    String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, };
+
+    runExport(getArgv(TABLE_NAME, extra));
+
+    assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
+  }
+
+  public void testExportDirect() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    });
+
+    String[] extra = new String[] {"--direct"};
+
+    runExport(getArgv(TABLE_NAME, extra));
+
+    assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
+  }
+
+  public void testExportCustomSchema() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    });
+
+    String[] extra = new String[] {"--",
+      "--schema",
+      SCHEMA_SPECIAL,
+    };
+
+    runExport(getArgv(TABLE_NAME, extra));
+
+    assertRowCount(2,
+      escapeTableOrSchemaName(SCHEMA_SPECIAL)
+        + "." + escapeTableOrSchemaName(TABLE_NAME),
+      connection);
+  }
+
+  public void testExportCustomSchemaStaging() throws IOException, SQLException 
{
+    createTestFile("inputFile", new String[] {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    });
+
+    String[] extra = new String[] {
+      "--staging-table",
+      STAGING_TABLE_NAME,
+      "--",
+      "--schema",
+      SCHEMA_SPECIAL,
+    };
+
+    runExport(getArgv(TABLE_NAME, extra));
+
+    assertRowCount(2,
+      escapeTableOrSchemaName(SCHEMA_SPECIAL)
+        + "." + escapeTableOrSchemaName(TABLE_NAME),
+      connection);
+  }
+
+  public void testExportCustomSchemaStagingClear()
+    throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    });
+
+    String[] extra = new String[] {
+      "--staging-table",
+      STAGING_TABLE_NAME,
+      "--clear-staging-table",
+      "--",
+      "--schema",
+      SCHEMA_SPECIAL,
+    };
+
+    runExport(getArgv(TABLE_NAME, extra));
+
+    assertRowCount(2,
+      escapeTableOrSchemaName(SCHEMA_SPECIAL)
+        + "." + escapeTableOrSchemaName(TABLE_NAME),
+      connection);
+  }
+
+  public void testExportCustomSchemaDirect() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    });
+
+    String[] extra = new String[] {
+      "--direct",
+      "--",
+      "--schema",
+      SCHEMA_SPECIAL,
+    };
+
+    runExport(getArgv(TABLE_NAME, extra));
+
+    assertRowCount(2,
+      escapeTableOrSchemaName(SCHEMA_SPECIAL)
+        + "." + escapeTableOrSchemaName(TABLE_NAME),
+      connection);
+  }
+
+  public static void assertRowCount(long expected,
+                                    String tableName,
+                                    Connection connection) {
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = connection.createStatement();
+      rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
+
+      rs.next();
+
+      assertEquals(expected, rs.getLong(1));
+    } catch (SQLException e) {
+      LOG.error("Can't verify number of rows", e);
+      fail();
+    } finally {
+      try {
+        connection.commit();
+
+        if (stmt != null) {
+          stmt.close();
+        }
+        if (rs != null) {
+          rs.close();
+        }
+      } catch (SQLException ex) {
+        LOG.info("Ignored exception in finally block.");
+      }
+    }
+  }
+
+  public String escapeTableOrSchemaName(String tableName) {
+    return "\"" + tableName + "\"";
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java 
b/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java
new file mode 100644
index 0000000..267ccd0
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/PostgresqlImportTest.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.FileInputStream;
+import java.io.File;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import com.cloudera.sqoop.util.FileListing;
+
+/**
+ * Test the PostgresqlManager and DirectPostgresqlManager implementations.
+ * The former uses the postgres JDBC driver to perform an import;
+ * the latter uses pg_dump to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use,
+ * this class is named in such a way that Hadoop's default QA process does not
+ * run it. You need to run this manually with -Dtestcase=PostgresqlImportTest 
or
+ * -Dthirdparty=true.
+ *
+ * You need to put Postgresql's JDBC driver library into a location where
+ * Hadoop can access it (e.g., $HADOOP_HOME/lib).
+ *
+ * To configure a postgresql database to allow local connections, put the
+ * following in /etc/postgresql/8.3/main/pg_hba.conf:
+ *     local  all all trust
+ *     host all all 127.0.0.1/32 trust
+ *     host all all ::1/128      trust
+ *
+ * ... and comment out any other lines referencing 127.0.0.1 or ::1.
+ *
+ * Also in the file /etc/postgresql/8.3/main/postgresql.conf, uncomment
+ * the line that starts with listen_addresses and set its value to '*' as
+ * follows
+ *     listen_addresses = '*'
+ *
+ * For postgresql 8.1, this may be in /var/lib/pgsql/data, instead.  You may
+ * need to restart the postgresql service after modifying this file.
+ *
+ * You should also create a sqooptest user and database:
+ *
+ * $ sudo -u postgres psql -U postgres template1
+ * template1=&gt; CREATE USER sqooptest;
+ * template1=&gt; CREATE DATABASE sqooptest;
+ * template1=&gt; GRANT ALL ON DATABASE sqooptest TO sqooptest;
+ * template1=&gt; \q
+ *
+ */
+public class PostgresqlImportTest extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+      PostgresqlImportTest.class.getName());
+
+  static final String HOST_URL = System.getProperty(
+      "sqoop.test.postgresql.connectstring.host_url",
+      "jdbc:postgresql://localhost/");
+
+  static final String DATABASE_USER = "sqooptest";
+  static final String DATABASE_NAME = "sqooptest";
+  static final String TABLE_NAME = "EMPLOYEES_PG";
+  static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
+  static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE";
+  static final String SCHEMA_PUBLIC = "public";
+  static final String SCHEMA_SPECIAL = "special";
+  static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+
+    LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
+
+    setUpData(TABLE_NAME, SCHEMA_PUBLIC);
+    setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC);
+    setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL);
+
+    LOG.debug("setUp complete.");
+  }
+
+  public void setUpData(String tableName, String schema) {
+    SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
+    options.setUsername(DATABASE_USER);
+
+    ConnManager manager = null;
+    Connection connection = null;
+    Statement st = null;
+
+    try {
+      manager = new PostgresqlManager(options);
+      connection = manager.getConnection();
+      connection.setAutoCommit(false);
+      st = connection.createStatement();
+
+      // Create schema if not exists in dummy way (always create and ignore
+      // errors.
+      try {
+        st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema));
+        connection.commit();
+      } catch (SQLException e) {
+         LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
+           + "the schema already exists.", e);
+        connection.rollback();
+      }
+
+      String fullTableName = manager.escapeTableName(schema)
+        + "." + manager.escapeTableName(tableName);
+
+      try {
+        // Try to remove the table first. DROP TABLE IF EXISTS didn't
+        // get added until pg 8.3, so we just use "DROP TABLE" and ignore
+        // any exception here if one occurs.
+        st.executeUpdate("DROP TABLE " + fullTableName);
+      } catch (SQLException e) {
+        LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)",
+          e);
+        // Now we need to reset the transaction.
+        connection.rollback();
+      }
+
+      st.executeUpdate("CREATE TABLE " + fullTableName + " ("
+          + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
+          + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
+          + manager.escapeColName("start_date") + " DATE, "
+          + manager.escapeColName("salary") + " FLOAT, "
+          + manager.escapeColName("dept") + " VARCHAR(32))");
+
+      st.executeUpdate("INSERT INTO " + fullTableName
+          + " VALUES(1,'Aaron','2009-05-14',1000000.00,'engineering')");
+      st.executeUpdate("INSERT INTO " + fullTableName
+          + " VALUES(2,'Bob','2009-04-20',400.00,'sales')");
+      st.executeUpdate("INSERT INTO " + fullTableName
+          + " VALUES(3,'Fred','2009-01-23',15.00,'marketing')");
+      connection.commit();
+    } catch (SQLException sqlE) {
+      LOG.error("Encountered SQL Exception: " + sqlE);
+      sqlE.printStackTrace();
+      fail("SQLException when running test setUp(): " + sqlE);
+    } finally {
+      try {
+        if (null != st) {
+          st.close();
+        }
+
+        if (null != manager) {
+          manager.close();
+        }
+      } catch (SQLException sqlE) {
+        LOG.warn("Got SQLException when closing connection: " + sqlE);
+      }
+    }
+
+    LOG.debug("setUp complete.");
+  }
+
+
+  private String [] getArgv(boolean isDirect, String tableName,
+      String... extraArgs) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    CommonArgs.addHadoopFlags(args);
+
+    args.add("--table");
+    args.add(tableName);
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--connect");
+    args.add(CONNECT_STRING);
+    args.add("--username");
+    args.add(DATABASE_USER);
+    args.add("--where");
+    args.add("id > 1");
+
+    if (isDirect) {
+      args.add("--direct");
+    }
+
+    for (String arg : extraArgs) {
+      args.add(arg);
+    }
+
+    return args.toArray(new String[0]);
+  }
+
+  private void doImportAndVerify(boolean isDirect, String [] expectedResults,
+      String tableName, String... extraArgs) throws IOException {
+
+    Path warehousePath = new Path(this.getWarehouseDir());
+    Path tablePath = new Path(warehousePath, tableName);
+
+    Path filePath;
+    if (isDirect) {
+      filePath = new Path(tablePath, "data-00000");
+    } else {
+      filePath = new Path(tablePath, "part-m-00000");
+    }
+
+    File tableFile = new File(tablePath.toString());
+    if (tableFile.exists() && tableFile.isDirectory()) {
+      // remove the directory before running the import.
+      FileListing.recursiveDeleteDir(tableFile);
+    }
+
+    String [] argv = getArgv(isDirect, tableName, extraArgs);
+    try {
+      runImport(argv);
+    } catch (IOException ioe) {
+      LOG.error("Got IOException during import: " + ioe.toString());
+      ioe.printStackTrace();
+      fail(ioe.toString());
+    }
+
+    File f = new File(filePath.toString());
+    assertTrue("Could not find imported data file, " + f, f.exists());
+    BufferedReader r = null;
+    try {
+      // Read through the file and make sure it's all there.
+      r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
+      for (String expectedLine : expectedResults) {
+        assertEquals(expectedLine, r.readLine());
+      }
+    } catch (IOException ioe) {
+      LOG.error("Got IOException verifying results: " + ioe.toString());
+      ioe.printStackTrace();
+      fail(ioe.toString());
+    } finally {
+      IOUtils.closeStream(r);
+    }
+  }
+
+  @Test
+  public void testJdbcBasedImport() throws IOException {
+    String [] expectedResults = {
+      "2,Bob,2009-04-20,400.0,sales",
+      "3,Fred,2009-01-23,15.0,marketing",
+    };
+
+    doImportAndVerify(false, expectedResults, TABLE_NAME);
+  }
+
+  @Test
+  public void testDirectImport() throws IOException {
+    String [] expectedResults = {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    };
+
+    doImportAndVerify(true, expectedResults, TABLE_NAME);
+  }
+
+  @Test
+  public void testListTables() throws IOException {
+    SqoopOptions options = new SqoopOptions(new Configuration());
+    options.setConnectString(CONNECT_STRING);
+    options.setUsername(DATABASE_USER);
+
+    ConnManager mgr = new PostgresqlManager(options);
+    String[] tables = mgr.listTables();
+    Arrays.sort(tables);
+    assertTrue(TABLE_NAME + " is not found!",
+        Arrays.binarySearch(tables, TABLE_NAME) >= 0);
+  }
+
+  @Test
+  public void testTableNameWithSpecialCharacter() throws IOException {
+    String [] expectedResults = {
+        "2,Bob,2009-04-20,400.0,sales",
+        "3,Fred,2009-01-23,15.0,marketing",
+    };
+
+    doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME);
+  }
+
+  @Test
+  public void testIncrementalImport() throws IOException {
+    String [] expectedResults = { };
+
+    String [] extraArgs = { "--incremental", "lastmodified",
+       "--check-column", "start_date",
+    };
+
+    doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs);
+  }
+
+ @Test
+  public void testDifferentSchemaImport() throws IOException {
+    String [] expectedResults = {
+      "2,Bob,2009-04-20,400.0,sales",
+      "3,Fred,2009-01-23,15.0,marketing",
+    };
+
+    String [] extraArgs = { "--",
+      "--schema", SCHEMA_SPECIAL,
+    };
+
+    doImportAndVerify(false, expectedResults, DIFFERENT_TABLE_NAME, extraArgs);
+  }
+
+  @Test
+  public void testDifferentSchemaImportDirect() throws IOException {
+    String [] expectedResults = {
+      "2,Bob,2009-04-20,400,sales",
+      "3,Fred,2009-01-23,15,marketing",
+    };
+
+    String [] extraArgs = { "--",
+      "--schema", SCHEMA_SPECIAL,
+    };
+
+    doImportAndVerify(true, expectedResults, DIFFERENT_TABLE_NAME, extraArgs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f11c3091/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java 
b/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java
deleted file mode 100644
index 0dfd1fc..0000000
--- a/src/test/com/cloudera/sqoop/manager/PostgresqlTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.sqoop.manager;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.FileInputStream;
-import java.io.File;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.testutil.CommonArgs;
-import com.cloudera.sqoop.testutil.ImportJobTestCase;
-import com.cloudera.sqoop.util.FileListing;
-
-/**
- * Test the PostgresqlManager and DirectPostgresqlManager implementations.
- * The former uses the postgres JDBC driver to perform an import;
- * the latter uses pg_dump to facilitate it.
- *
- * Since this requires a Postgresql installation on your local machine to use,
- * this class is named in such a way that Hadoop's default QA process does not
- * run it. You need to run this manually with -Dtestcase=PostgresqlTest or
- * -Dthirdparty=true.
- *
- * You need to put Postgresql's JDBC driver library into a location where
- * Hadoop can access it (e.g., $HADOOP_HOME/lib).
- *
- * To configure a postgresql database to allow local connections, put the
- * following in /etc/postgresql/8.3/main/pg_hba.conf:
- *     local  all all trust
- *     host all all 127.0.0.1/32 trust
- *     host all all ::1/128      trust
- *
- * ... and comment out any other lines referencing 127.0.0.1 or ::1.
- *
- * Also in the file /etc/postgresql/8.3/main/postgresql.conf, uncomment
- * the line that starts with listen_addresses and set its value to '*' as
- * follows
- *     listen_addresses = '*'
- *
- * For postgresql 8.1, this may be in /var/lib/pgsql/data, instead.  You may
- * need to restart the postgresql service after modifying this file.
- *
- * You should also create a sqooptest user and database:
- *
- * $ sudo -u postgres psql -U postgres template1
- * template1=&gt; CREATE USER sqooptest;
- * template1=&gt; CREATE DATABASE sqooptest;
- * template1=&gt; \q
- *
- */
-public class PostgresqlTest extends ImportJobTestCase {
-
-  public static final Log LOG = LogFactory.getLog(
-      PostgresqlTest.class.getName());
-
-  static final String HOST_URL = System.getProperty(
-      "sqoop.test.postgresql.connectstring.host_url",
-      "jdbc:postgresql://localhost/");
-
-  static final String DATABASE_USER = "sqooptest";
-  static final String DATABASE_NAME = "sqooptest";
-  static final String TABLE_NAME = "EMPLOYEES_PG";
-  static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
-  static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
-
-  @Override
-  protected boolean useHsqldbTestServer() {
-    return false;
-  }
-
-  @Before
-  public void setUp() {
-    super.setUp();
-
-    LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
-
-    setUpData(TABLE_NAME);
-    setUpData(SPECIAL_TABLE_NAME);
-
-    LOG.debug("setUp complete.");
-  }
-
-  public void setUpData(String tableName) {
-    SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
-    options.setUsername(DATABASE_USER);
-
-    ConnManager manager = null;
-    Connection connection = null;
-    Statement st = null;
-
-    try {
-      manager = new PostgresqlManager(options);
-      connection = manager.getConnection();
-      connection.setAutoCommit(false);
-      st = connection.createStatement();
-
-      // create the database table and populate it with data.
-
-      try {
-        // Try to remove the table first. DROP TABLE IF EXISTS didn't
-        // get added until pg 8.3, so we just use "DROP TABLE" and ignore
-        // any exception here if one occurs.
-        st.executeUpdate("DROP TABLE " + manager.escapeTableName(tableName));
-      } catch (SQLException e) {
-        LOG.info("Couldn't drop table " + tableName + " (ok)");
-        LOG.info(e.toString());
-        // Now we need to reset the transaction.
-        connection.rollback();
-      }
-
-      st.executeUpdate("CREATE TABLE " + manager.escapeTableName(tableName)
-          + " ("
-          + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
-          + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
-          + manager.escapeColName("start_date") + " DATE, "
-          + manager.escapeColName("salary") + " FLOAT, "
-          + manager.escapeColName("dept") + " VARCHAR(32))");
-
-      st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
-          + " VALUES(1,'Aaron','2009-05-14',1000000.00,'engineering')");
-      st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
-          + " VALUES(2,'Bob','2009-04-20',400.00,'sales')");
-      st.executeUpdate("INSERT INTO " + manager.escapeTableName(tableName)
-          + " VALUES(3,'Fred','2009-01-23',15.00,'marketing')");
-      connection.commit();
-    } catch (SQLException sqlE) {
-      LOG.error("Encountered SQL Exception: " + sqlE);
-      sqlE.printStackTrace();
-      fail("SQLException when running test setUp(): " + sqlE);
-    } finally {
-      try {
-        if (null != st) {
-          st.close();
-        }
-
-        if (null != manager) {
-          manager.close();
-        }
-      } catch (SQLException sqlE) {
-        LOG.warn("Got SQLException when closing connection: " + sqlE);
-      }
-    }
-
-    LOG.debug("setUp complete.");
-  }
-
-
-  private String [] getArgv(boolean isDirect, String tableName,
-      String... extraArgs) {
-    ArrayList<String> args = new ArrayList<String>();
-
-    CommonArgs.addHadoopFlags(args);
-
-    args.add("--table");
-    args.add(tableName);
-    args.add("--warehouse-dir");
-    args.add(getWarehouseDir());
-    args.add("--connect");
-    args.add(CONNECT_STRING);
-    args.add("--username");
-    args.add(DATABASE_USER);
-    args.add("--where");
-    args.add("id > 1");
-
-    if (isDirect) {
-      args.add("--direct");
-    }
-
-    for (String arg : extraArgs) {
-      args.add(arg);
-    }
-
-    return args.toArray(new String[0]);
-  }
-
-  private void doImportAndVerify(boolean isDirect, String [] expectedResults,
-      String tableName, String... extraArgs) throws IOException {
-
-    Path warehousePath = new Path(this.getWarehouseDir());
-    Path tablePath = new Path(warehousePath, tableName);
-
-    Path filePath;
-    if (isDirect) {
-      filePath = new Path(tablePath, "data-00000");
-    } else {
-      filePath = new Path(tablePath, "part-m-00000");
-    }
-
-    File tableFile = new File(tablePath.toString());
-    if (tableFile.exists() && tableFile.isDirectory()) {
-      // remove the directory before running the import.
-      FileListing.recursiveDeleteDir(tableFile);
-    }
-
-    String [] argv = getArgv(isDirect, tableName, extraArgs);
-    try {
-      runImport(argv);
-    } catch (IOException ioe) {
-      LOG.error("Got IOException during import: " + ioe.toString());
-      ioe.printStackTrace();
-      fail(ioe.toString());
-    }
-
-    File f = new File(filePath.toString());
-    assertTrue("Could not find imported data file, " + f, f.exists());
-    BufferedReader r = null;
-    try {
-      // Read through the file and make sure it's all there.
-      r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
-      for (String expectedLine : expectedResults) {
-        assertEquals(expectedLine, r.readLine());
-      }
-    } catch (IOException ioe) {
-      LOG.error("Got IOException verifying results: " + ioe.toString());
-      ioe.printStackTrace();
-      fail(ioe.toString());
-    } finally {
-      IOUtils.closeStream(r);
-    }
-  }
-
-  @Test
-  public void testJdbcBasedImport() throws IOException {
-    String [] expectedResults = {
-      "2,Bob,2009-04-20,400.0,sales",
-      "3,Fred,2009-01-23,15.0,marketing",
-    };
-
-    doImportAndVerify(false, expectedResults, TABLE_NAME);
-  }
-
-  @Test
-  public void testDirectImport() throws IOException {
-    String [] expectedResults = {
-      "2,Bob,2009-04-20,400,sales",
-      "3,Fred,2009-01-23,15,marketing",
-    };
-
-    doImportAndVerify(true, expectedResults, TABLE_NAME);
-  }
-
-  @Test
-  public void testListTables() throws IOException {
-    SqoopOptions options = new SqoopOptions(new Configuration());
-    options.setConnectString(CONNECT_STRING);
-    options.setUsername(DATABASE_USER);
-
-    ConnManager mgr = new PostgresqlManager(options);
-    String[] tables = mgr.listTables();
-    Arrays.sort(tables);
-    assertTrue(TABLE_NAME + " is not found!",
-        Arrays.binarySearch(tables, TABLE_NAME) >= 0);
-  }
-
-  @Test
-  public void testTableNameWithSpecialCharacter() throws IOException {
-    String [] expectedResults = {
-        "2,Bob,2009-04-20,400.0,sales",
-        "3,Fred,2009-01-23,15.0,marketing",
-    };
-
-    doImportAndVerify(false, expectedResults, SPECIAL_TABLE_NAME);
-  }
-
-  @Test
-  public void testIncrementalImport() throws IOException {
-    String [] expectedResults = { };
-
-    String [] extraArgs = { "--incremental", "lastmodified",
-       "--check-column", "start_date",
-    };
-
-    doImportAndVerify(false, expectedResults, TABLE_NAME, extraArgs);
-  }
-}

Reply via email to