Updated Branches: refs/heads/trunk d4767e9f6 -> 4a26d0905
SQOOP-540 Microsoft SQL Connector doesn't support custom schemas (Jarek Jarcec Cecho via Cheolsoo Park) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/4a26d090 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/4a26d090 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/4a26d090 Branch: refs/heads/trunk Commit: 4a26d0905ce58e97a35494dbd425f768b34abe28 Parents: d4767e9 Author: Cheolsoo Park <[email protected]> Authored: Mon Oct 1 18:17:38 2012 -0700 Committer: Cheolsoo Park <[email protected]> Committed: Mon Oct 1 18:17:38 2012 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/manager/SQLServerManager.java | 94 +++++- .../org/apache/sqoop/mapreduce/HBaseImportJob.java | 2 +- .../mapreduce/db/DataDrivenDBRecordReader.java | 1 + .../manager/SQLServerManagerExportManualTest.java | 294 +++++++++++++++ .../manager/SQLServerManagerImportManualTest.java | 109 +++++-- 5 files changed, 470 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/4a26d090/src/java/org/apache/sqoop/manager/SQLServerManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java index 7ce1edd..51f8679 100644 --- a/src/java/org/apache/sqoop/manager/SQLServerManager.java +++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java @@ -20,6 +20,11 @@ package org.apache.sqoop.manager; import java.io.IOException; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +32,7 @@ import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat; import com.cloudera.sqoop.mapreduce.JdbcExportJob; import com.cloudera.sqoop.util.ExportException; +import org.apache.sqoop.cli.RelatedOptions; /** * Manages connections to SQLServer databases. Requires the SQLServer JDBC @@ -35,6 +41,8 @@ import com.cloudera.sqoop.util.ExportException; public class SQLServerManager extends com.cloudera.sqoop.manager.InformationSchemaManager { + public static final String SCHEMA = "schema"; + public static final Log LOG = LogFactory.getLog( SQLServerManager.class.getName()); @@ -42,8 +50,20 @@ public class SQLServerManager private static final String DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; + /** + * Schema name that we will use. + */ + private String schema; + public SQLServerManager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); + + // Try to parse extra arguments + try { + parseExtraArgs(opts.getExtraArgs()); + } catch (ParseException e) { + throw new RuntimeException("Can't parse extra arguments", e); + } } /** @@ -75,23 +95,83 @@ public class SQLServerManager @Override protected String getSchemaQuery() { - return "SELECT SCHEMA_NAME()"; + if (schema == null) { + return "SELECT SCHEMA_NAME()"; + } + + return "'" + schema + "'"; } @Override public String escapeColName(String colName) { - if (null == colName) { - return null; - } - return "[" + colName + "]"; + return escapeObjectName(colName); } @Override public String escapeTableName(String tableName) { - if (null == tableName) { + // Return table name including schema if requested + if (schema != null && !schema.isEmpty()) { + return escapeObjectName(schema) + "." + escapeObjectName(tableName); + } + + return escapeObjectName(tableName); + } + + /** + * Escape database object name (database, table, column, schema). + * + * @param objectName Object name in database + * @return Escaped variant of the name + */ + public String escapeObjectName(String objectName) { + if (null == objectName) { return null; } - return "[" + tableName + "]"; + return "[" + objectName + "]"; + } + + /** + * Parse extra arguments. + * + * @param args Extra arguments array + * @throws ParseException + */ + void parseExtraArgs(String[] args) throws ParseException { + // No-op when no extra arguments are present + if (args == null || args.length == 0) { + return; + } + + // We do not need extended abilities of SqoopParser, so we're using + // Gnu parser instead. + CommandLineParser parser = new GnuParser(); + CommandLine cmdLine = parser.parse(getExtraOptions(), args, true); + + // Apply extra options + if (cmdLine.hasOption(SCHEMA)) { + String schemaName = cmdLine.getOptionValue(SCHEMA); + LOG.info("We will use schema " + schemaName); + + this.schema = schemaName; + } + } + + /** + * Create related options for SQL Server extra parameters. + * + * @return + */ + @SuppressWarnings("static-access") + private RelatedOptions getExtraOptions() { + // Connection args (common) + RelatedOptions extraOptions = + new RelatedOptions("SQL Server extra options:"); + + extraOptions.addOption(OptionBuilder.withArgName("string").hasArg() + .withDescription("Optional schema name") + .withLongOpt(SCHEMA).create()); + + return extraOptions; } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/4a26d090/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java index a6e5546..afc4299 100644 --- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java @@ -175,7 +175,7 @@ public class HBaseImportJob extends DataDrivenImportJob { User user = User.getCurrent(); // Obtain security token if needed - if((Boolean)isSecurityEnabled.invoke(null)) { + if ((Boolean)isSecurityEnabled.invoke(null)) { obtainAuthTokenForJob.invoke(user, conf, job); } } catch (NoSuchMethodException e) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/4a26d090/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java index a56b93d..4b44244 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java @@ -99,6 +99,7 @@ public class DataDrivenDBRecordReader<T extends DBWritable> query.append(" FROM ").append(tableName); if (!dbProductName.startsWith("ORACLE") && !dbProductName.startsWith("DB2") + && !dbProductName.startsWith("MICROSOFT SQL SERVER") && !dbProductName.startsWith("POSTGRESQL")) { // The AS clause is required for hsqldb. Some other databases might have // issues with it, so we're skipping some of them. http://git-wip-us.apache.org/repos/asf/sqoop/blob/4a26d090/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java new file mode 100644 index 0000000..ac7a934 --- /dev/null +++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.sqoop.manager; + +import com.cloudera.sqoop.ConnFactory; +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.testutil.CommonArgs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import com.cloudera.sqoop.testutil.ExportJobTestCase; +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Before; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +/** + * Please see instructions in SQLServerManagerImportManualTest. + */ +public class SQLServerManagerExportManualTest extends ExportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + SQLServerManagerExportManualTest.class.getName()); + + static final String HOST_URL = System.getProperty( + "sqoop.test.sqlserver.connectstring.host_url", + "jdbc:sqlserver://sqlserverhost:1433"); + + static final String DATABASE_NAME = "SQOOPTEST"; + static final String DATABASE_USER = "SQOOPUSER"; + static final String DATABASE_PASSWORD = "PASSWORD"; + static final String SCHEMA_DBO = "dbo"; + static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL"; + static final String SCHEMA_SCH = "sch"; + static final String SCH_TABLE_NAME = "PRIVATE_TABLE"; + static final String CONNECT_STRING = HOST_URL + + ";databaseName=" + DATABASE_NAME; + + static final String CONNECTOR_FACTORY = System.getProperty( + "sqoop.test.msserver.connector.factory", + ConnFactory.DEFAULT_FACTORY_CLASS_NAMES); + + // instance variables populated during setUp, used during tests + private SQLServerManager manager; + private Configuration conf = new Configuration(); + private Connection conn = null; + + @Override + protected Configuration getConf() { + return conf; + } + + @Override + protected boolean useHsqldbTestServer() { + return false; + } + + @Before + public void setUp() { + super.setUp(); + + SqoopOptions options = new SqoopOptions(CONNECT_STRING, + DBO_TABLE_NAME); + options.setUsername(DATABASE_USER); + options.setPassword(DATABASE_PASSWORD); + + manager = new SQLServerManager(options); + + createTableAndPopulateData(SCHEMA_DBO, DBO_TABLE_NAME); + createTableAndPopulateData(SCHEMA_SCH, SCH_TABLE_NAME); + + // To test with Microsoft SQL server connector, copy the connector jar to + // sqoop.thirdparty.lib.dir and set sqoop.test.msserver.connector.factory + // to com.microsoft.sqoop.SqlServer.MSSQLServerManagerFactory. By default, + // the built-in SQL server connector is used. + conf.setStrings(ConnFactory.FACTORY_CLASS_NAMES_KEY, CONNECTOR_FACTORY); + } + + public void createTableAndPopulateData(String schema, String table) { + String fulltableName = manager.escapeObjectName(schema) + + "." + manager.escapeObjectName(table); + + Statement stmt = null; + + // Create schema if needed + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("CREATE SCHEMA " + schema); + conn.commit(); + } catch (SQLException sqlE) { + LOG.info("Can't create schema: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Drop the existing table, if there is one. + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("DROP TABLE " + fulltableName); + conn.commit(); + } catch (SQLException sqlE) { + LOG.info("Table was not dropped: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Create and populate table + try { + conn = manager.getConnection(); + conn.setAutoCommit(false); + stmt = conn.createStatement(); + + // create the database table and populate it with data. + stmt.executeUpdate("CREATE TABLE " + fulltableName + " (" + + "id INT NOT NULL, " + + "name VARCHAR(24) NOT NULL, " + + "salary FLOAT, " + + "dept VARCHAR(32), " + + "PRIMARY KEY (id))"); + conn.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: ", sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing connection/stmt", ex); + } + } + } + + @After + public void tearDown() { + super.tearDown(); + try { + conn.close(); + manager.close(); + } catch (SQLException sqlE) { + LOG.error("Got SQLException: " + sqlE.toString()); + fail("Got SQLException: " + sqlE.toString()); + } + } + + private String [] getArgv(String tableName, + String... extraArgs) { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + + args.add("--table"); + args.add(tableName); + args.add("--export-dir"); + args.add(getWarehouseDir()); + args.add("--fields-terminated-by"); + args.add(","); + args.add("--lines-terminated-by"); + args.add("\\n"); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("--password"); + args.add(DATABASE_PASSWORD); + args.add("-m"); + args.add("1"); + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + protected void createTestFile(String filename, + String[] lines) + throws IOException { + new File(getWarehouseDir()).mkdirs(); + File file = new File(getWarehouseDir() + "/" + filename); + Writer output = new BufferedWriter(new FileWriter(file)); + for(String line : lines) { + output.write(line); + output.write("\n"); + } + output.close(); + } + + public void testExport() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + + runExport(getArgv(DBO_TABLE_NAME)); + + assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn); + } + + public void testExportCustomSchema() throws IOException, SQLException { + createTestFile("inputFile", new String[] { + "2,Bob,400,sales", + "3,Fred,15,marketing", + }); + + String[] extra = new String[] {"--", + "--schema", + SCHEMA_SCH, + }; + + runExport(getArgv(SCH_TABLE_NAME, extra)); + + assertRowCount( + 2, + escapeObjectName(SCHEMA_SCH) + "." + escapeObjectName(SCH_TABLE_NAME), + conn + ); + } + + public static void assertRowCount(long expected, + String tableName, + Connection connection) { + Statement stmt = null; + ResultSet rs = null; + try { + stmt = connection.createStatement(); + rs = stmt.executeQuery("SELECT count(*) FROM " + tableName); + + rs.next(); + + assertEquals(expected, rs.getLong(1)); + } catch (SQLException e) { + LOG.error("Can't verify number of rows", e); + fail(); + } finally { + try { + connection.commit(); + + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + } catch (SQLException ex) { + LOG.info("Ignored exception in finally block."); + } + } + } + + public String escapeObjectName(String objectName) { + return "[" + objectName + "]"; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/4a26d090/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java index 0f949f4..bf889d0 100644 --- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java +++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java @@ -75,7 +75,10 @@ public class SQLServerManagerImportManualTest extends ImportJobTestCase { static final String DATABASE_NAME = "SQOOPTEST"; static final String DATABASE_USER = "SQOOPUSER"; static final String DATABASE_PASSWORD = "PASSWORD"; - static final String TABLE_NAME = "EMPLOYEES_MSSQL"; + static final String SCHEMA_DBO = "dbo"; + static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL"; + static final String SCHEMA_SCH = "sch"; + static final String SCH_TABLE_NAME = "PRIVATE_TABLE"; static final String CONNECT_STRING = HOST_URL + ";databaseName=" + DATABASE_NAME; @@ -103,21 +106,36 @@ public class SQLServerManagerImportManualTest extends ImportJobTestCase { super.setUp(); SqoopOptions options = new SqoopOptions(CONNECT_STRING, - TABLE_NAME); + DBO_TABLE_NAME); options.setUsername(DATABASE_USER); options.setPassword(DATABASE_PASSWORD); manager = new SQLServerManager(options); - // Drop the existing table, if there is one. + createTableAndPopulateData(SCHEMA_DBO, DBO_TABLE_NAME); + createTableAndPopulateData(SCHEMA_SCH, SCH_TABLE_NAME); + + // To test with Microsoft SQL server connector, copy the connector jar to + // sqoop.thirdparty.lib.dir and set sqoop.test.msserver.connector.factory + // to com.microsoft.sqoop.SqlServer.MSSQLServerManagerFactory. By default, + // the built-in SQL server connector is used. + conf.setStrings(ConnFactory.FACTORY_CLASS_NAMES_KEY, CONNECTOR_FACTORY); + } + + public void createTableAndPopulateData(String schema, String table) { + String fulltableName = manager.escapeObjectName(schema) + + "." + manager.escapeObjectName(table); + Connection conn = null; Statement stmt = null; + + // Create schema if needed try { conn = manager.getConnection(); stmt = conn.createStatement(); - stmt.execute("DROP TABLE " + TABLE_NAME); + stmt.execute("CREATE SCHEMA " + schema); } catch (SQLException sqlE) { - LOG.info("Table was not dropped: " + sqlE.getMessage()); + LOG.info("Can't create schema: " + sqlE.getMessage()); } finally { try { if (null != stmt) { @@ -128,27 +146,44 @@ public class SQLServerManagerImportManualTest extends ImportJobTestCase { } } - // Create and populate table + // Drop the existing table, if there is one. try { conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("DROP TABLE " + fulltableName); + } catch (SQLException sqlE) { + LOG.info("Table was not dropped: " + sqlE.getMessage()); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Create and populate table + try { + conn = manager.getConnection(); conn.setAutoCommit(false); stmt = conn.createStatement(); // create the database table and populate it with data. - stmt.executeUpdate("CREATE TABLE " + TABLE_NAME + " (" + stmt.executeUpdate("CREATE TABLE " + fulltableName + " (" + "id INT NOT NULL, " + "name VARCHAR(24) NOT NULL, " + "salary FLOAT, " + "dept VARCHAR(32), " + "PRIMARY KEY (id))"); - stmt.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" + stmt.executeUpdate("INSERT INTO " + fulltableName + " VALUES(" + "1,'Aaron', " + "1000000.00,'engineering')"); - stmt.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" + stmt.executeUpdate("INSERT INTO " + fulltableName + " VALUES(" + "2,'Bob', " + "400.00,'sales')"); - stmt.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" + stmt.executeUpdate("INSERT INTO " + fulltableName + " VALUES(" + "3,'Fred', 15.00," + "'marketing')"); conn.commit(); @@ -179,28 +214,49 @@ public class SQLServerManagerImportManualTest extends ImportJobTestCase { } @Test - public void testSQLServerImport() throws IOException { + public void testImportSimple() throws IOException { String [] expectedResults = { "1,Aaron,1000000.0,engineering", "2,Bob,400.0,sales", "3,Fred,15.0,marketing", }; - // To test with Microsoft SQL server connector, copy the connector jar to - // sqoop.thirdparty.lib.dir and set sqoop.test.msserver.connector.factory - // to com.microsoft.sqoop.SqlServer.MSSQLServerManagerFactory. By default, - // the built-in SQL server connector is used. - conf.setStrings(ConnFactory.FACTORY_CLASS_NAMES_KEY, CONNECTOR_FACTORY); - runSQLServerTest(expectedResults); + doImportAndVerify(DBO_TABLE_NAME, expectedResults); + } + + @Test + public void testImportExplicitDefaultSchema() throws IOException { + String [] expectedResults = { + "1,Aaron,1000000.0,engineering", + "2,Bob,400.0,sales", + "3,Fred,15.0,marketing", + }; + + String[] extraArgs = new String[] {"--schema", SCHEMA_DBO}; + + doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs); } - private String [] getArgv() { + @Test + public void testImportDifferentSchema() throws IOException { + String [] expectedResults = { + "1,Aaron,1000000.0,engineering", + "2,Bob,400.0,sales", + "3,Fred,15.0,marketing", + }; + + String[] extraArgs = new String[] {"--schema", SCHEMA_SCH}; + + doImportAndVerify(SCH_TABLE_NAME, expectedResults, extraArgs); + } + + private String [] getArgv(String tableName, String ... extraArgs) { ArrayList<String> args = new ArrayList<String>(); CommonArgs.addHadoopFlags(args); args.add("--table"); - args.add(TABLE_NAME); + args.add(tableName); args.add("--warehouse-dir"); args.add(getWarehouseDir()); args.add("--connect"); @@ -212,13 +268,22 @@ public class SQLServerManagerImportManualTest extends ImportJobTestCase { args.add("--num-mappers"); args.add("1"); + if (extraArgs.length > 0) { + args.add("--"); + for (String arg : extraArgs) { + args.add(arg); + } + } + return args.toArray(new String[0]); } - private void runSQLServerTest(String [] expectedResults) throws IOException { + private void doImportAndVerify(String tableName, + String [] expectedResults, + String ... extraArgs) throws IOException { Path warehousePath = new Path(this.getWarehouseDir()); - Path tablePath = new Path(warehousePath, TABLE_NAME); + Path tablePath = new Path(warehousePath, tableName); Path filePath = new Path(tablePath, "part-m-00000"); File tableFile = new File(tablePath.toString()); @@ -227,7 +292,7 @@ public class SQLServerManagerImportManualTest extends ImportJobTestCase { FileListing.recursiveDeleteDir(tableFile); } - String [] argv = getArgv(); + String [] argv = getArgv(tableName, extraArgs); try { runImport(argv); } catch (IOException ioe) {
