SQOOP-1905: Add --schema option for import-all-tables and list-tables for DB2 connector
(Ying Cao via Attila Szabo) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/13ec21cd Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/13ec21cd Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/13ec21cd Branch: refs/heads/branch-1.4.7 Commit: 13ec21cdaad1804df491155cfcf6382693aa9ac5 Parents: 33be844 Author: Attila Szabo <[email protected]> Authored: Fri Jul 21 12:09:04 2017 +0200 Committer: Attila Szabo <[email protected]> Committed: Fri Jul 21 12:19:46 2017 +0200 ---------------------------------------------------------------------- .../org/apache/sqoop/manager/Db2Manager.java | 154 ++++++++++- .../sqoop/manager/GenericJdbcManager.java | 33 +++ .../DB2ImportAllTableWithSchemaManualTest.java | 274 +++++++++++++++++++ 3 files changed, 453 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/13ec21cd/src/java/org/apache/sqoop/manager/Db2Manager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/Db2Manager.java b/src/java/org/apache/sqoop/manager/Db2Manager.java index 52ab05e..61b6868 100644 --- a/src/java/org/apache/sqoop/manager/Db2Manager.java +++ b/src/java/org/apache/sqoop/manager/Db2Manager.java @@ -19,12 +19,17 @@ package org.apache.sqoop.manager; import java.io.IOException; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.ParseException; +import org.apache.sqoop.cli.RelatedOptions; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,9 +59,37 @@ public class Db2Manager private Map<String, String> columnTypeNames; + public static final String SCHEMA = "schema"; + + /** + * Query to list all tables visible to the current user. Note that this list + * does not identify the table owners which is required in order to ensure + * that the table can be operated on for import/export purposes. + */ + + public static final String QUERY_LIST_SCHEMA_TABLES = "SELECT DISTINCT NAME FROM SYSIBM.SYSTABLES WHERE CREATOR =? AND TYPE='T' "; + + /** + * Query to get the current user's schema for the DB session. Used in case of + * wallet logins. + */ + public static final String QUERY_GET_USERSCHEMA = + "select current schema from sysibm.sysdummy1"; + + /** + * DB2 schema that we should use. + */ + private String schema = null; public Db2Manager(final SqoopOptions opts) { super(DRIVER_CLASS, opts); + + // Try to parse extra arguments + try { + this.schema = parseExtraScheArgs(opts.getExtraArgs(),getExtraOptions()); + } catch (ParseException e) { + throw new RuntimeException("Can't parse extra arguments", e); + } } /** @@ -107,15 +140,7 @@ public class Db2Manager // represents schema name. databases.add(rset.getString(1)); } - conn.commit(); } catch (SQLException sqle) { - try { - if (conn != null) { - conn.rollback(); - } - } catch (SQLException ce) { - LoggingUtils.logAll(LOG, "Failed to rollback transaction", ce); - } LoggingUtils.logAll(LOG, "Failed to list databases", sqle); throw new RuntimeException(sqle); } finally { @@ -131,6 +156,105 @@ public class Db2Manager return databases.toArray(new String[databases.size()]); } + public static String getUserSchema(Connection conn) { + Statement stmt = null; + ResultSet rset = null; + String currSchema = null; + try { + stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY); + rset = stmt.executeQuery(QUERY_GET_USERSCHEMA); + + if (rset.next()) { + currSchema = rset.getString(1); + } + } catch (SQLException e) { + LoggingUtils.logAll(LOG, "Failed to get user schema", e); + } finally { + if (rset != null) { + try { + rset.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Failed to close resultset", ex); + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Failed to close statement", ex); + } + } + } + if (currSchema == null) { + throw new RuntimeException("Unable to get current user schema"); + } + return currSchema; + } + + @Override + public String[] listTables() { + Connection conn = null; + PreparedStatement pStmt = null; + ResultSet rset = null; + List<String> tables = new ArrayList<String>(); + String currUserSchema = null; + + try { + conn = getConnection(); + currUserSchema = getUserSchema(conn); + + pStmt = conn.prepareStatement(QUERY_LIST_SCHEMA_TABLES, + ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY); + + // if user don't provide schema in CLI + if (schema == null) { + pStmt.setString(1, currUserSchema); + } else { //user provide a schema + pStmt.setString(1, schema); + } + + rset = pStmt.executeQuery(); + + if (schema != null && rset == null) { + LOG.debug("schema=" + schema + + ",maybe not exists in current database"); + } + while (rset.next()) { + if(schema == null){ + tables.add(rset.getString(1)); + }else{ + tables.add(schema + "." + rset.getString(1)); + } + } + } catch (SQLException e) { + LoggingUtils.logAll(LOG, "Failed to list tables", e); + } finally { + if (rset != null) { + try { + rset.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Failed to close resultset", ex); + } + } + if (pStmt != null) { + try { + pStmt.close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Failed to close statement", ex); + } + } + + try { + close(); + } catch (SQLException ex) { + LoggingUtils.logAll(LOG, "Unable to discard connection", ex); + } + } + return tables.toArray(new String[tables.size()]); + } + /** * Return hive type for SQL type. * @@ -224,4 +348,18 @@ public class Db2Manager return null; } + /** + * Create related options for PostgreSQL extra parameters. + * @return + */ + @SuppressWarnings("static-access") + protected RelatedOptions getExtraOptions() { + // Connection args (common) + RelatedOptions extraOptions = new RelatedOptions("DB2 extra options:"); + extraOptions.addOption(OptionBuilder.withArgName("string").hasArg() + .withDescription("Optional schema name").withLongOpt(SCHEMA) + .create("schema")); + return extraOptions; + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/13ec21cd/src/java/org/apache/sqoop/manager/GenericJdbcManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/GenericJdbcManager.java b/src/java/org/apache/sqoop/manager/GenericJdbcManager.java index 2113a5f..f38bcc5 100644 --- a/src/java/org/apache/sqoop/manager/GenericJdbcManager.java +++ b/src/java/org/apache/sqoop/manager/GenericJdbcManager.java @@ -21,10 +21,16 @@ package org.apache.sqoop.manager; import java.sql.Connection; import java.sql.SQLException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.cloudera.sqoop.SqoopOptions; +import org.apache.sqoop.cli.RelatedOptions; /** * Database manager that is connects to a generic JDBC-compliant @@ -39,6 +45,7 @@ public class GenericJdbcManager private String jdbcDriverClass; private Connection connection; + private static final String SCHEMA = "schema"; public GenericJdbcManager(final String driverClass, final SqoopOptions opts) { super(opts); @@ -84,5 +91,31 @@ public class GenericJdbcManager public String getDriverClass() { return jdbcDriverClass; } + + public String parseExtraScheArgs(String[] args,RelatedOptions opts) throws ParseException { + // No-op when no extra arguments are present + if (args == null || args.length == 0) { + return null; + } + + // We do not need extended abilities of SqoopParser, so we're using + // Gnu parser instead. + CommandLineParser parser = new GnuParser(); + CommandLine cmdLine = parser.parse(opts, args, true); + + //Apply parsed arguments + return applyExtraScheArguments(cmdLine); + } + + public String applyExtraScheArguments(CommandLine cmdLine) { + if (cmdLine.hasOption(SCHEMA)) { + String schemaName = cmdLine.getOptionValue(SCHEMA); + LOG.info("We will use schema " + schemaName); + + return schemaName; + } + + return null; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/13ec21cd/src/test/org/apache/sqoop/manager/db2/DB2ImportAllTableWithSchemaManualTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/db2/DB2ImportAllTableWithSchemaManualTest.java b/src/test/org/apache/sqoop/manager/db2/DB2ImportAllTableWithSchemaManualTest.java new file mode 100644 index 0000000..db6e6bf --- /dev/null +++ b/src/test/org/apache/sqoop/manager/db2/DB2ImportAllTableWithSchemaManualTest.java @@ -0,0 +1,274 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.manager.db2; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.manager.Db2Manager; +import org.apache.sqoop.tool.ImportAllTablesTool; +import org.apache.sqoop.Sqoop; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.ImportJobTestCase; +import com.cloudera.sqoop.tool.SqoopTool; +import com.cloudera.sqoop.util.FileListing; +import org.apache.sqoop.util.LoggingUtils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test the DB2 XML data type. + * + * This uses JDBC to import data from an DB2 database into HDFS. + * + * Since this requires an DB2 Server installation, + * this class is named in such a way that Sqoop's default QA process does + * not run it. You need to run this manually with + * -Dtestcase=DB2ImportAllTableWithSchema + + * You need to put DB2 JDBC driver library (db2jcc.jar) in a location + * where Sqoop will be able to access it (since this library cannot be checked + * into Apache's tree for licensing reasons). + * + * To set up your test environment: + * Install DB2 Express 9.7 C server. + * Create a database SQOOP + * Create a login SQOOP with password PASSWORD and grant all + * access for database SQOOP to user SQOOP. + */ +public class DB2ImportAllTableWithSchemaManualTest extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + DB2ImportAllTableWithSchemaManualTest.class.getName()); + + static final String HOST_URL = System.getProperty( + "sqoop.test.db2.connectstring.host_url", + "jdbc:db2://9.30.245.234:60000"); + + static final String DATABASE_NAME = System.getProperty( + "sqoop.test.db2.connectstring.database", + "TESTDB"); + static final String DATABASE_USER = System.getProperty( + "sqoop.test.db2.connectstring.username", + "DB2FENC1"); + static final String DATABASE_PASSWORD = System.getProperty( + "sqoop.test.db2.connectstring.password", + "DB2FENC1"); + + static final String TABLE_NAME = "TEST.COMPANY"; + static final String TABLE_SCHEMA = "TEST"; + static final String CONNECT_STRING = HOST_URL + + "/" + DATABASE_NAME; + static String ExpectedResults = + "1,doc1"; + + + String [] extraArgs = { "--", + "--schema", TABLE_SCHEMA, + }; + + static { + LOG.info("Using DB2 CONNECT_STRING HOST_URL is : "+HOST_URL); + LOG.info("Using DB2 CONNECT_STRING: " + CONNECT_STRING); + } + + // instance variables populated during setUp, used during tests + private Db2Manager manager; + + protected String getTableName() { + return TABLE_NAME; + } + + + @Before + public void setUp() { + super.setUp(); + + SqoopOptions options = new SqoopOptions(CONNECT_STRING, getTableName()); + options.setUsername(DATABASE_USER); + options.setPassword(DATABASE_PASSWORD); + + manager = new Db2Manager(options); + + // Drop the existing table, if there is one. + Connection conn = null; + Statement stmt = null; + try { + conn = manager.getConnection(); + stmt = conn.createStatement(); + stmt.execute("DROP TABLE " + getTableName()); + } catch (SQLException sqlE) { + LoggingUtils.logAll(LOG, "Table was not dropped: ", sqlE); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing stmt", ex); + } + } + + // Create and populate table + try { + conn = manager.getConnection(); + conn.setAutoCommit(false); + stmt = conn.createStatement(); + + // create the database table and populate it with data. + stmt.executeUpdate("CREATE TABLE " + getTableName() + " (" + + "ID int, " + + "DOCNAME VARCHAR(20))"); + + stmt.executeUpdate("INSERT INTO " + getTableName() + " VALUES(" + + "1,'doc1' " + + " )"); + conn.commit(); + } catch (SQLException sqlE) { + LoggingUtils.logAll(LOG, "Encountered SQL Exception: ", sqlE); + } finally { + try { + if (null != stmt) { + stmt.close(); + } + } catch (Exception ex) { + LOG.warn("Exception while closing connection/stmt", ex); + } + } + } + + @After + public void tearDown() { + super.tearDown(); + try { + manager.close(); + } catch (SQLException sqlE) { + LoggingUtils.logAll(LOG, "Got SQLException: ", sqlE); + } + } + + @Test + public void testDb2Import() throws IOException { + + runDb2Test(ExpectedResults); + + } + + private String [] getArgv() { + ArrayList<String> args = new ArrayList<String>(); + + CommonArgs.addHadoopFlags(args); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("--password"); + args.add(DATABASE_PASSWORD); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + + args.add("--m"); + args.add("1"); + + for (String arg : extraArgs) { + args.add(arg); + } + + return args.toArray(new String[0]); + } + + private void runDb2Test(String expectedResults) throws IOException { + + Path warehousePath = new Path(this.getWarehouseDir()); + Path tablePath = new Path(warehousePath, getTableName()); + Path filePath = new Path(tablePath, "part-m-00000"); + + File tableFile = new File(getTableName().toString()); + if (tableFile.exists() && tableFile.isDirectory()) { + // remove the directory before running the import. + FileListing.recursiveDeleteDir(tableFile); + } + + String [] argv = getArgv(); + try { + runImportAll(argv); + } catch (IOException ioe) { + LOG.error("Got IOException during import: " + ioe.getMessage()); + } + + File f = new File(filePath.toString()); + assertTrue("Could not find imported data file", f.exists()); + BufferedReader r = null; + try { + // Read through the file and make sure it's all there. + r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); + assertEquals(expectedResults, r.readLine()); + } catch (IOException ioe) { + LOG.error("Got IOException verifying results: " + ioe.getMessage()); + } finally { + IOUtils.closeStream(r); + } + } + + private void runImportAll(SqoopTool tool,String [] argv) throws IOException { + // run the tool through the normal entry-point. + int ret; + try { + Configuration conf = getConf(); + SqoopOptions opts = getSqoopOptions(conf); + Sqoop sqoop = new Sqoop(tool, conf, opts); + ret = Sqoop.runSqoop(sqoop, argv); + //ret = tool.run(opts); + } catch (Exception e) { + LOG.error("Got exception running Sqoop: " + e.toString()); + ret = 1; + } + + // expect a successful return. + if (0 != ret) { + throw new IOException("Failure during job; return status " + ret); + } + } + + /** run an import using the default ImportTool. */ + protected void runImportAll(String [] argv) throws IOException { + runImportAll(new ImportAllTablesTool(), argv); + } +}
