SQOOP-3216: Expanded Metastore support for MySql, Oracle, Postgresql, MSSql, and DB2
(Zach Berkowitz via Boglarka Egyed) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d0770ac6 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d0770ac6 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d0770ac6 Branch: refs/heads/trunk Commit: d0770ac6aea735b2f007d6f1987a41f3f50cd345 Parents: f378328 Author: Boglarka Egyed <[email protected]> Authored: Mon Sep 25 12:35:08 2017 +0200 Committer: Boglarka Egyed <[email protected]> Committed: Mon Sep 25 12:35:08 2017 +0200 ---------------------------------------------------------------------- src/docs/user/metastore-purpose.txt | 21 +- src/docs/user/saved-jobs.txt | 13 +- .../sqoop/metastore/GenericJobStorage.java | 36 + .../metastore/hsqldb/AutoHsqldbStorage.java | 41 - .../metastore/hsqldb/HsqldbJobStorage.java | 36 - src/java/org/apache/sqoop/SqoopOptions.java | 34 + .../org/apache/sqoop/manager/CubridManager.java | 8 +- .../org/apache/sqoop/manager/Db2Manager.java | 8 +- .../org/apache/sqoop/manager/HsqldbManager.java | 10 +- .../org/apache/sqoop/manager/JdbcDrivers.java | 44 + .../org/apache/sqoop/manager/MySQLManager.java | 9 +- .../apache/sqoop/manager/NetezzaManager.java | 7 +- .../org/apache/sqoop/manager/OracleManager.java | 7 +- .../apache/sqoop/manager/PostgresqlManager.java | 7 +- .../apache/sqoop/manager/SQLServerManager.java | 8 +- .../apache/sqoop/manager/SupportedManagers.java | 7 +- .../sqoop/metastore/GenericJobStorage.java | 858 +++++++++++++++++++ .../sqoop/metastore/JobStorageFactory.java | 3 +- .../metastore/hsqldb/AutoHsqldbStorage.java | 115 --- .../metastore/hsqldb/HsqldbJobStorage.java | 805 ----------------- .../org/apache/sqoop/tool/BaseSqoopTool.java | 12 + src/java/org/apache/sqoop/tool/JobTool.java | 58 +- .../cloudera/sqoop/TestIncrementalImport.java | 23 +- .../sqoop/metastore/JobToolTestBase.java | 215 +++++ .../MetaConnectIncrementalImportTestBase.java | 215 +++++ .../sqoop/metastore/SavedJobsTestBase.java | 314 +++++++ .../cloudera/sqoop/metastore/TestSavedJobs.java | 302 ------- .../sqoop/metastore/db2/DB2JobToolTest.java | 65 ++ .../DB2MetaConnectIncrementalImportTest.java | 65 ++ .../sqoop/metastore/db2/DB2SavedJobsTest.java | 66 ++ .../metastore/hsqldb/HsqldbJobToolTest.java | 38 + .../HsqldbMetaConnectIncrementalImportTest.java | 38 + .../metastore/hsqldb/HsqldbSavedJobsTest.java | 40 + .../sqoop/metastore/mysql/MySqlJobToolTest.java | 52 ++ .../MySqlMetaConnectIncrementalImportTest.java | 53 ++ .../metastore/mysql/MySqlSavedJobsTest.java | 53 ++ .../metastore/oracle/OracleJobToolTest.java | 51 ++ .../OracleMetaConnectIncrementalImportTest.java | 51 ++ .../metastore/oracle/OracleSavedJobsTest.java | 53 ++ .../metastore/postgres/PostgresJobToolTest.java | 53 ++ ...ostgresMetaConnectIncrementalImportTest.java | 53 ++ .../postgres/PostgresSavedJobsTest.java | 54 ++ .../sqlserver/SqlServerJobToolTest.java | 53 ++ ...lServerMetaConnectIncrementalImportTest.java | 53 ++ .../sqlserver/SqlServerSavedJobsTest.java | 55 ++ src/test/findbugsExcludeFile.xml | 2 +- 46 files changed, 2806 insertions(+), 1358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/docs/user/metastore-purpose.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/metastore-purpose.txt b/src/docs/user/metastore-purpose.txt index e7eb23d..95c2d77 100644 --- a/src/docs/user/metastore-purpose.txt +++ b/src/docs/user/metastore-purpose.txt @@ -17,11 +17,28 @@ limitations under the License. //// -The +metastore+ tool configures Sqoop to host a shared metadata repository. +The +metastore+ tool configures Sqoop to host a shared Hsqldb metadata repository. Multiple users and/or remote users can define and execute saved jobs (created with +sqoop job+) defined in this metastore. Clients must be configured to connect to the metastore in +sqoop-site.xml+ or -with the +--meta-connect+ argument. +with the +--meta-connect+ argument. These commands MySql, Hsqldb, PostgreSql, Oracle, DB2, +and SqlServer databases as well. All services other than Hsqldb and Postgres require the +download of the corresponding JDBC driver and connect string structured in the correct format. + +Migration of metastore data from one database service to another is not directly supported, but is possible. + +.JDBC Connect String Formats: +[grid="all"] +`---------------------------`------------------------------------------ +Service Connect String Format +----------------------------------------------------------------------- + +MySQL+ jdbc:mysql://<server>:<port>/<dbname> + +HSQLDB+ jdbc:hsqldb:hsql://<server>:<port>/<dbname> + +PostgreSQL+ jdbc:postgresql://<server>:<port>/<dbname> + +Oracle+ jdbc:oracle:thin:@//<server>:<port>/<SID> + +DB2+ jdbc:db2://<server>:<port>/<dbname> + +MSSQL+ jdbc:sqlserver://<server>:<port>;database=<dbname> +----------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/docs/user/saved-jobs.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/saved-jobs.txt b/src/docs/user/saved-jobs.txt index e875780..d21df02 100644 --- a/src/docs/user/saved-jobs.txt +++ b/src/docs/user/saved-jobs.txt @@ -131,6 +131,8 @@ Argument Description ----------------------------------------------------------------------- +\--meta-connect <jdbc-uri>+ Specifies the JDBC connect string used \ to connect to the metastore ++\--meta-username <username>+ Specifies the username for the metastore database ++\--meta-password <password>+ Specifies the password for the metastore database ----------------------------------------------------------------------- By default, a private metastore is instantiated in +$HOME/.sqoop+. If @@ -148,6 +150,15 @@ filesystem other than your home directory. If you configure +sqoop.metastore.client.enable.autoconnect+ with the value +false+, then you must explicitly supply +\--meta-connect+. +Job data can be stored in MySql, PostgreSql, DB2, SqlServer, and Oracle with +the +\--meta-connect+ argument. The +\--meta-username+ and +\--meta-password+ arguments are necessary +if the database containing the saved jobs requires a username and password. + +---- +$ sqoop job --exec myjob --meta-connect jdbc:hsqldb:hsql://localhost:3000/ --meta-username *username* --meta-password *password* + +---- + .Common options: [grid="all"] `---------------------------`------------------------------------------ @@ -229,7 +240,7 @@ The metastore is available over TCP/IP. The port is controlled by the Clients should connect to the metastore by specifying +sqoop.metastore.client.autoconnect.url+ or +\--meta-connect+ with the -value +jdbc:hsqldb:hsql://<server-name>:<port>/sqoop+. For example, +JDBC-URI string. For example, +jdbc:hsqldb:hsql://metaserver.example.com:16000/sqoop+. This metastore may be hosted on a machine within the Hadoop cluster, or http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/com/cloudera/sqoop/metastore/GenericJobStorage.java ---------------------------------------------------------------------- diff --git a/src/java/com/cloudera/sqoop/metastore/GenericJobStorage.java b/src/java/com/cloudera/sqoop/metastore/GenericJobStorage.java new file mode 100644 index 0000000..d42e5a3 --- /dev/null +++ b/src/java/com/cloudera/sqoop/metastore/GenericJobStorage.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.sqoop.metastore; + +/** + * @deprecated Moving to use org.apache.sqoop namespace. + */ +public class GenericJobStorage + extends org.apache.sqoop.metastore.GenericJobStorage { + + public static final String META_CONNECT_KEY = + org.apache.sqoop.metastore.GenericJobStorage.META_CONNECT_KEY; + public static final String META_USERNAME_KEY = + org.apache.sqoop.metastore.GenericJobStorage.META_USERNAME_KEY; + public static final String META_PASSWORD_KEY = + org.apache.sqoop.metastore.GenericJobStorage.META_PASSWORD_KEY; + public static final String ROOT_TABLE_NAME_KEY = + org.apache.sqoop.metastore.GenericJobStorage.ROOT_TABLE_NAME_KEY; + +} + http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java ---------------------------------------------------------------------- diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java deleted file mode 100644 index 259d9f6..0000000 --- a/src/java/com/cloudera/sqoop/metastore/hsqldb/AutoHsqldbStorage.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.sqoop.metastore.hsqldb; - -/** - * @deprecated Moving to use org.apache.sqoop namespace. - */ -public class AutoHsqldbStorage - extends org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage { - - public static final String AUTO_STORAGE_IS_ACTIVE_KEY = - org.apache.sqoop.metastore.hsqldb. - AutoHsqldbStorage.AUTO_STORAGE_IS_ACTIVE_KEY; - public static final String AUTO_STORAGE_CONNECT_STRING_KEY = - org.apache.sqoop.metastore.hsqldb. - AutoHsqldbStorage.AUTO_STORAGE_CONNECT_STRING_KEY; - public static final String AUTO_STORAGE_USER_KEY = - org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_USER_KEY; - public static final String AUTO_STORAGE_PASS_KEY = - org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.AUTO_STORAGE_PASS_KEY; - public static final String DEFAULT_AUTO_PASSWORD = - org.apache.sqoop.metastore.hsqldb.AutoHsqldbStorage.DEFAULT_AUTO_PASSWORD; - -} - http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java ---------------------------------------------------------------------- diff --git a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java b/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java deleted file mode 100644 index 083e2a3..0000000 --- a/src/java/com/cloudera/sqoop/metastore/hsqldb/HsqldbJobStorage.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.sqoop.metastore.hsqldb; - -/** - * @deprecated Moving to use org.apache.sqoop namespace. - */ -public class HsqldbJobStorage - extends org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage { - - public static final String META_CONNECT_KEY = - org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_CONNECT_KEY; - public static final String META_USERNAME_KEY = - org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_USERNAME_KEY; - public static final String META_PASSWORD_KEY = - org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.META_PASSWORD_KEY; - public static final String ROOT_TABLE_NAME_KEY = - org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.ROOT_TABLE_NAME_KEY; - -} - http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 2eb3d8a..587d4e1 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.accumulo.AccumuloConstants; import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration; +import org.apache.sqoop.metastore.GenericJobStorage; import org.apache.sqoop.tool.BaseSqoopTool; import org.apache.sqoop.util.CredentialsUtil; import org.apache.sqoop.util.LoggingUtils; @@ -391,6 +392,10 @@ public class SqoopOptions implements Cloneable { @StoredAsProperty(ORACLE_ESCAPING_DISABLED) private boolean oracleEscapingDisabled; + private String metaConnectStr; + private String metaUsername; + private String metaPassword; + public SqoopOptions() { initDefaults(null); } @@ -1076,6 +1081,25 @@ public class SqoopOptions implements Cloneable { // set escape column mapping to true this.escapeColumnMappingEnabled = true; + + this.metaConnectStr = + System.getProperty(GenericJobStorage.AUTO_STORAGE_CONNECT_STRING_KEY, getLocalAutoConnectString()); + this.metaUsername = + System.getProperty(GenericJobStorage.AUTO_STORAGE_USER_KEY, GenericJobStorage.DEFAULT_AUTO_USER); + this.metaPassword = + System.getProperty(GenericJobStorage.AUTO_STORAGE_PASS_KEY, GenericJobStorage.DEFAULT_AUTO_PASSWORD); + } + + private String getLocalAutoConnectString() { + String homeDir = System.getProperty("user.home"); + + File homeDirObj = new File(homeDir); + File sqoopDataDirObj = new File(homeDirObj, ".sqoop"); + File databaseFileObj = new File(sqoopDataDirObj, "metastore.db"); + + String dbFileStr = databaseFileObj.toString(); + return "jdbc:hsqldb:file:" + dbFileStr + + ";hsqldb.write_delay=false;shutdown=true"; } /** @@ -2787,5 +2811,15 @@ public class SqoopOptions implements Cloneable { } + public String getMetaConnectStr() { + return metaConnectStr; + } + public String getMetaUsername() { + return metaUsername; + } + + public String getMetaPassword() { + return metaPassword; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/CubridManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/CubridManager.java b/src/java/org/apache/sqoop/manager/CubridManager.java index 5a1a0e8..73b91d0 100644 --- a/src/java/org/apache/sqoop/manager/CubridManager.java +++ b/src/java/org/apache/sqoop/manager/CubridManager.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.manager; +import static org.apache.sqoop.manager.JdbcDrivers.CUBRID; + import java.io.IOException; import java.sql.Types; import java.util.Map; @@ -42,12 +44,8 @@ public class CubridManager extends public static final Log LOG = LogFactory .getLog(CubridManager.class.getName()); - // driver class to ensure is loaded when making db connection. - private static final String DRIVER_CLASS = - "cubrid.jdbc.driver.CUBRIDDriver"; - public CubridManager(final SqoopOptions opts) { - super(DRIVER_CLASS, opts); + super(CUBRID.getDriverClass(), opts); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/Db2Manager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/Db2Manager.java b/src/java/org/apache/sqoop/manager/Db2Manager.java index 61b6868..7525521 100644 --- a/src/java/org/apache/sqoop/manager/Db2Manager.java +++ b/src/java/org/apache/sqoop/manager/Db2Manager.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.manager; +import static org.apache.sqoop.manager.JdbcDrivers.DB2; + import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -51,10 +53,6 @@ public class Db2Manager public static final Log LOG = LogFactory.getLog( Db2Manager.class.getName()); - // driver class to ensure is loaded when making db connection. - private static final String DRIVER_CLASS = - "com.ibm.db2.jcc.DB2Driver"; - private static final String XML_TO_JAVA_DATA_TYPE = "String"; private Map<String, String> columnTypeNames; @@ -82,7 +80,7 @@ public class Db2Manager private String schema = null; public Db2Manager(final SqoopOptions opts) { - super(DRIVER_CLASS, opts); + super(DB2.getDriverClass(), opts); // Try to parse extra arguments try { http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/HsqldbManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/HsqldbManager.java b/src/java/org/apache/sqoop/manager/HsqldbManager.java index 9b9c582..92b7d53 100644 --- a/src/java/org/apache/sqoop/manager/HsqldbManager.java +++ b/src/java/org/apache/sqoop/manager/HsqldbManager.java @@ -18,16 +18,15 @@ package org.apache.sqoop.manager; -import java.io.IOException; +import static org.apache.sqoop.manager.JdbcDrivers.HSQLDB; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.cloudera.sqoop.SqoopOptions; - import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat; - import com.cloudera.sqoop.util.ExportException; +import java.io.IOException; /** * Manages connections to hsqldb databases. @@ -39,15 +38,12 @@ public class HsqldbManager public static final Log LOG = LogFactory.getLog( HsqldbManager.class.getName()); - // driver class to ensure is loaded when making db connection. - private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver"; - // HsqlDb doesn't have a notion of multiple "databases"; the user's database // is always called "PUBLIC". private static final String HSQL_SCHEMA_NAME = "PUBLIC"; public HsqldbManager(final SqoopOptions opts) { - super(DRIVER_CLASS, opts); + super(HSQLDB.getDriverClass(), opts); } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/JdbcDrivers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/JdbcDrivers.java b/src/java/org/apache/sqoop/manager/JdbcDrivers.java new file mode 100644 index 0000000..20bdc98 --- /dev/null +++ b/src/java/org/apache/sqoop/manager/JdbcDrivers.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.manager; + +public enum JdbcDrivers { + MYSQL("com.mysql.jdbc.Driver", "jdbc:mysql:"), POSTGRES("org.postgresql.Driver", "jdbc:postgresql:"), + HSQLDB("org.hsqldb.jdbcDriver","jdbc:hsqldb:"), ORACLE("oracle.jdbc.OracleDriver","jdbc:oracle:"), + SQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver:"), + JTDS_SQLSERVER("net.sourceforge.jtds.jdbc.Driver", "jdbc:jtds:sqlserver:"), + DB2("com.ibm.db2.jcc.DB2Driver", "jdbc:db2:"), NETEZZA("org.netezza.Driver", "jdbc:netezza:"), + CUBRID("cubrid.jdbc.driver.CUBRIDDriver", "jdbc:cubrid:"); + + private final String driverClass; + private final String schemePrefix; + + JdbcDrivers(String driverClass, String schemePrefix) { + this.driverClass = driverClass; + this.schemePrefix = schemePrefix; + } + + public String getDriverClass() { + return driverClass; + } + + public String getSchemePrefix() { + return schemePrefix; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/MySQLManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java index 3c2276f..ba612e2 100644 --- a/src/java/org/apache/sqoop/manager/MySQLManager.java +++ b/src/java/org/apache/sqoop/manager/MySQLManager.java @@ -18,6 +18,8 @@ package org.apache.sqoop.manager; +import static org.apache.sqoop.manager.JdbcDrivers.MYSQL; + import java.io.IOException; import java.io.PrintWriter; import java.net.URI; @@ -27,8 +29,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -53,16 +53,13 @@ public class MySQLManager public static final Log LOG = LogFactory.getLog(MySQLManager.class.getName()); - // driver class to ensure is loaded when making db connection. - private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver"; - // set to true after we warn the user that we can use direct fastpath. private static boolean warningPrinted = false; private static final String EXPORT_OPERATION = "export"; public MySQLManager(final SqoopOptions opts) { - super(DRIVER_CLASS, opts); + super(MYSQL.getDriverClass(), opts); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/NetezzaManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/NetezzaManager.java b/src/java/org/apache/sqoop/manager/NetezzaManager.java index 0ac7717..8c21073 100644 --- a/src/java/org/apache/sqoop/manager/NetezzaManager.java +++ b/src/java/org/apache/sqoop/manager/NetezzaManager.java @@ -18,6 +18,8 @@ package org.apache.sqoop.manager; +import static org.apache.sqoop.manager.JdbcDrivers.NETEZZA; + import java.io.IOException; import java.sql.SQLException; @@ -45,9 +47,6 @@ public class NetezzaManager extends GenericJdbcManager { public static final Log LOG = LogFactory.getLog(NetezzaManager.class .getName()); - // driver class to ensure is loaded when making db connection. - private static final String DRIVER_CLASS = "org.netezza.Driver"; - // set to true after we warn the user that we can use direct fastpath. protected static boolean directModeWarningPrinted = false; @@ -62,7 +61,7 @@ public class NetezzaManager extends GenericJdbcManager { "partitioned-access"; public NetezzaManager(final SqoopOptions opts) { - super(DRIVER_CLASS, opts); + super(NETEZZA.getDriverClass(), opts); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/OracleManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java index 2f4585c..c0f5114 100644 --- a/src/java/org/apache/sqoop/manager/OracleManager.java +++ b/src/java/org/apache/sqoop/manager/OracleManager.java @@ -18,6 +18,8 @@ package org.apache.sqoop.manager; +import static org.apache.sqoop.manager.JdbcDrivers.ORACLE; + import java.io.IOException; import java.lang.reflect.Method; import java.sql.Connection; @@ -118,9 +120,6 @@ public class OracleManager public static final String QUERY_GET_SESSIONUSER = "SELECT USER FROM DUAL"; - // driver class to ensure is loaded when making db connection. - private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver"; - // Configuration key to use to set the session timezone. public static final String ORACLE_TIMEZONE_KEY = "oracle.sessionTimeZone"; @@ -247,7 +246,7 @@ public class OracleManager } public OracleManager(final SqoopOptions opts) { - super(DRIVER_CLASS, opts); + super(ORACLE.getDriverClass(), opts); } public void close() throws SQLException { http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/PostgresqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/PostgresqlManager.java b/src/java/org/apache/sqoop/manager/PostgresqlManager.java index 44e041a..29f7c7c 100644 --- a/src/java/org/apache/sqoop/manager/PostgresqlManager.java +++ b/src/java/org/apache/sqoop/manager/PostgresqlManager.java @@ -18,6 +18,8 @@ package org.apache.sqoop.manager; +import static org.apache.sqoop.manager.JdbcDrivers.POSTGRES; + import java.io.IOException; import java.sql.SQLException; @@ -44,9 +46,6 @@ public class PostgresqlManager public static final Log LOG = LogFactory.getLog( PostgresqlManager.class.getName()); - // driver class to ensure is loaded when making db connection. - private static final String DRIVER_CLASS = "org.postgresql.Driver"; - // set to true after we warn the user that we can use direct fastpath. private static boolean warningPrinted = false; @@ -56,7 +55,7 @@ public class PostgresqlManager private String schema; public PostgresqlManager(final SqoopOptions opts) { - super(DRIVER_CLASS, opts); + super(POSTGRES.getDriverClass(), opts); // Try to parse extra arguments try { http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/SQLServerManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java index 9a3d918..cc5a1b4 100644 --- a/src/java/org/apache/sqoop/manager/SQLServerManager.java +++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java @@ -18,6 +18,8 @@ package org.apache.sqoop.manager; +import static org.apache.sqoop.manager.JdbcDrivers.SQLSERVER; + import java.io.IOException; import org.apache.commons.cli.CommandLine; @@ -68,10 +70,6 @@ public class SQLServerManager public static final String IDENTITY_INSERT_PROP = "org.apache.sqoop.manager.sqlserver.table.identity"; - // driver class to ensure is loaded when making db connection. - private static final String DRIVER_CLASS = - "com.microsoft.sqlserver.jdbc.SQLServerDriver"; - // Define SQL Server specific types that are not covered by parent classes private static final int DATETIMEOFFSET = -155; @@ -91,7 +89,7 @@ public class SQLServerManager private boolean identityInserts; public SQLServerManager(final SqoopOptions opts) { - this(DRIVER_CLASS, opts); + this(SQLSERVER.getDriverClass(), opts); } public SQLServerManager(final String driver, final SqoopOptions opts) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/manager/SupportedManagers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SupportedManagers.java b/src/java/org/apache/sqoop/manager/SupportedManagers.java index 8a6037a..1b65a9a 100644 --- a/src/java/org/apache/sqoop/manager/SupportedManagers.java +++ b/src/java/org/apache/sqoop/manager/SupportedManagers.java @@ -24,8 +24,11 @@ import org.apache.commons.logging.LogFactory; public enum SupportedManagers { - MYSQL("jdbc:mysql:", true), POSTGRES("jdbc:postgresql:", true), HSQLDB("jdbc:hsqldb:", false), ORACLE("jdbc:oracle:", true), SQLSERVER("jdbc:sqlserver:", false), - JTDS_SQLSERVER("jdbc:jtds:sqlserver:", false), DB2("jdbc:db2:", false), NETEZZA("jdbc:netezza:", true), CUBRID("jdbc:cubrid:", false); + MYSQL(JdbcDrivers.MYSQL.getSchemePrefix(), true), POSTGRES(JdbcDrivers.POSTGRES.getSchemePrefix(), true), + HSQLDB(JdbcDrivers.HSQLDB.getSchemePrefix(), false), ORACLE(JdbcDrivers.ORACLE.getSchemePrefix(), true), + SQLSERVER(JdbcDrivers.SQLSERVER.getSchemePrefix(), false), CUBRID(JdbcDrivers.CUBRID.getSchemePrefix(), false), + JTDS_SQLSERVER(JdbcDrivers.JTDS_SQLSERVER.getSchemePrefix(), false), DB2(JdbcDrivers.DB2.getSchemePrefix(), false), + NETEZZA(JdbcDrivers.NETEZZA.getSchemePrefix(), true); private final String schemePrefix; http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/metastore/GenericJobStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/metastore/GenericJobStorage.java b/src/java/org/apache/sqoop/metastore/GenericJobStorage.java new file mode 100644 index 0000000..9e1b18b --- /dev/null +++ b/src/java/org/apache/sqoop/metastore/GenericJobStorage.java @@ -0,0 +1,858 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.metastore; + +import java.io.IOException; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.cloudera.sqoop.manager.ConnManager; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.metastore.JobData; +import com.cloudera.sqoop.metastore.JobStorage; +import com.cloudera.sqoop.tool.SqoopTool; +import org.apache.sqoop.manager.DefaultManagerFactory; + +/** + * JobStorage implementation that uses a database to + * hold job information. + */ +public class GenericJobStorage extends JobStorage { + + public static final Log LOG = LogFactory.getLog( + GenericJobStorage.class.getName()); + + /** descriptor key identifying the connect string for the metastore. */ + public static final String META_CONNECT_KEY = "metastore.connect.string"; + + /** descriptor key identifying the username to use when connecting + * to the metastore. + */ + public static final String META_USERNAME_KEY = "metastore.username"; + + /** descriptor key identifying the password to use when connecting + * to the metastore. + */ + public static final String META_PASSWORD_KEY = "metastore.password"; + + /** descriptor key identifying the class name of the jdbc driver */ + public static final String META_DRIVER_KEY = "metastore.driver.class"; + + /** Default name for the root metadata table. */ + private static final String DEFAULT_ROOT_TABLE_NAME = "SQOOP_ROOT"; + + /** Configuration key used to override root table name. */ + public static final String ROOT_TABLE_NAME_KEY = + "sqoop.root.table.name"; + + /** root metadata table key used to define the current schema version. */ + private static final String STORAGE_VERSION_KEY = + "sqoop.job.storage.version"; + + /** The current version number for the schema edition. */ + private static final int CUR_STORAGE_VERSION = 0; + + /** This value represents an invalid version */ + private static final int NO_VERSION = -1; + + /** root metadata table key used to define the job table name. */ + private static final String SESSION_TABLE_KEY = + "sqoop.job.info.table"; + + /** Outdated key for job table data, kept for backward compatibility */ + public static final String HSQLDB_TABLE_KEY = "sqoop.hsqldb.job.info.table"; + + /** Outdated key for schema version, kept for backward compatibility */ + private static final String HSQLDB_VERSION_KEY = + "sqoop.hsqldb.job.storage.version"; + + /** Default value for SESSION_TABLE_KEY. */ + private static final String DEFAULT_SESSION_TABLE_NAME = + "SQOOP_SESSIONS"; + + /** Per-job key with propClass 'schema' that defines the set of + * properties valid to be defined for propClass 'SqoopOptions'. */ + private static final String PROPERTY_SET_KEY = + "sqoop.property.set.id"; + + /** Current value for PROPERTY_SET_KEY. */ + private static final String CUR_PROPERTY_SET_ID = "0"; + + // The following are values for propClass in the v0 schema which + // describe different aspects of the stored metadata. + + /** Property class for properties about the stored data itself. */ + private static final String PROPERTY_CLASS_SCHEMA = "schema"; + + /** Property class for properties that are loaded into SqoopOptions. */ + private static final String PROPERTY_CLASS_SQOOP_OPTIONS = "SqoopOptions"; + + /** Property class for properties that are loaded into a Configuration. */ + private static final String PROPERTY_CLASS_CONFIG = "config"; + + /** + * Configuration key specifying whether this storage agent is active. + * Defaults to "on" to allow zero-conf local users. + */ + public static final String AUTO_STORAGE_IS_ACTIVE_KEY = + "sqoop.metastore.client.enable.autoconnect"; + + /** + * Configuration key specifying the connect string used by this + * storage agent. + */ + public static final String AUTO_STORAGE_CONNECT_STRING_KEY = + "sqoop.metastore.client.autoconnect.url"; + + /** + * Configuration key specifying the username to bind with. + */ + public static final String AUTO_STORAGE_USER_KEY = + "sqoop.metastore.client.autoconnect.username"; + + + /** HSQLDB default user is named 'SA'. */ + public static final String DEFAULT_AUTO_USER = "SA"; + + /** + * Configuration key specifying the password to bind with. + */ + public static final String AUTO_STORAGE_PASS_KEY = + "sqoop.metastore.client.autoconnect.password"; + + /** HSQLDB default user has an empty password. */ + public static final String DEFAULT_AUTO_PASSWORD = ""; + + /** + * Per-job key with propClass 'schema' that specifies the SqoopTool + * to load. + */ + private static final String SQOOP_TOOL_KEY = "sqoop.tool"; + private Map<String, String> connectedDescriptor; + private String metastoreConnectStr; + private String metastoreUser; + private String metastorePassword; + private Connection connection; + private String driverClass; + private ConnManager connManager; + + protected Connection getConnection() { + return this.connection; + } + + // After connection to the database and initialization of the + // schema, this holds the name of the job table. + private String jobTableName; + + protected void setMetastoreConnectStr(String connectStr) { + this.metastoreConnectStr = connectStr; + } + + protected void setMetastoreUser(String user) { + this.metastoreUser = user; + } + + protected void setMetastorePassword(String pass) { + this.metastorePassword = pass; + } + + protected void setDriverClass(String driverClass) { + this.driverClass = driverClass; + } + /** + * Set the descriptor used to open() this storage. + */ + protected void setConnectedDescriptor(Map<String, String> descriptor) { + this.connectedDescriptor = descriptor; + } + + @Override + /** + * Initialize the connection to the database. + */ + public void open(Map<String, String> descriptor) throws IOException { + setMetastoreConnectStr(descriptor.get(META_CONNECT_KEY)); + setMetastoreUser(descriptor.get(META_USERNAME_KEY)); + setMetastorePassword(descriptor.get(META_PASSWORD_KEY)); + setDriverClass(descriptor.get(META_DRIVER_KEY)); + setConnectedDescriptor(descriptor); + + init(); + } + + protected void init() throws IOException { + try { + connManager = createConnManager(); + connection = connManager.getConnection(); + + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + connection.setAutoCommit(false); + + // Initialize the root schema. + if (!rootTableExists()) { + createRootTable(); + } + + // Check the schema version. + String curStorageVerStr = getRootProperty(STORAGE_VERSION_KEY, NO_VERSION); + + // If schema version is not present under the current key, + // sets it correctly. Present for backward compatibility + if (curStorageVerStr == null) { + setRootProperty(STORAGE_VERSION_KEY, NO_VERSION, Integer.toString(CUR_STORAGE_VERSION)); + curStorageVerStr = Integer.toString(CUR_STORAGE_VERSION); + } + int actualStorageVer = NO_VERSION; + try { + actualStorageVer = Integer.valueOf(curStorageVerStr); + } catch (NumberFormatException nfe) { + LOG.warn("Could not interpret as a number: " + curStorageVerStr); + } + if (actualStorageVer != CUR_STORAGE_VERSION) { + LOG.error("Can not interpret metadata schema"); + LOG.error("The metadata schema version is " + curStorageVerStr); + LOG.error("The highest version supported is " + CUR_STORAGE_VERSION); + LOG.error("To use this version of Sqoop, " + + "you must downgrade your metadata schema."); + throw new IOException("Invalid metadata version."); + } + + // Initialize the versioned schema. + initV0Schema(); + } catch (SQLException sqle) { + if (null != connection) { + try { + connection.rollback(); + } catch (SQLException e2) { + LOG.warn("Error rolling back transaction in error handler: " + e2); + } + } + + throw new IOException("Exception creating SQL connection", sqle); + } + } + + @Override + public void close() throws IOException { + try { + LOG.debug("Closing connection manager"); + connManager.close(); + } catch (SQLException sqlE) { + throw new IOException("Exception closing connection manager", sqlE); + } finally { + this.connection = null; + } + } + + @Override + /** {@inheritDoc} */ + public boolean canAccept(Map<String, String> descriptor) { + // We return true if the desciptor contains a connect string to find + // the database or auto-connect is enabled + Configuration conf = this.getConf(); + boolean metaConnectTrue = descriptor.get(META_CONNECT_KEY) != null; + boolean autoConnectEnabled = conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true); + return metaConnectTrue || autoConnectEnabled; + } + + @Override + /** {@inheritDoc} */ + public JobData read(String jobName) throws IOException { + try { + if (!jobExists(jobName)) { + LOG.error("Cannot restore job: " + jobName); + LOG.error("(No such job)"); + throw new IOException("Cannot restore missing job " + jobName); + } + + LOG.debug("Restoring job: " + jobName); + Properties schemaProps = getV0Properties(jobName, + PROPERTY_CLASS_SCHEMA); + Properties sqoopOptProps = getV0Properties(jobName, + PROPERTY_CLASS_SQOOP_OPTIONS); + Properties configProps = getV0Properties(jobName, + PROPERTY_CLASS_CONFIG); + + // Check that we're not using a saved job from a previous + // version whose functionality has been deprecated. + String thisPropSetId = schemaProps.getProperty(PROPERTY_SET_KEY); + LOG.debug("System property set: " + CUR_PROPERTY_SET_ID); + LOG.debug("Stored property set: " + thisPropSetId); + if (!CUR_PROPERTY_SET_ID.equals(thisPropSetId)) { + LOG.warn("The property set present in this database was written by"); + LOG.warn("an incompatible version of Sqoop. This may result in an"); + LOG.warn("incomplete operation."); + // TODO(aaron): Should this fail out-right? + } + + String toolName = schemaProps.getProperty(SQOOP_TOOL_KEY); + if (null == toolName) { + // Don't know what tool to create. + throw new IOException("Incomplete metadata; missing " + + SQOOP_TOOL_KEY); + } + + SqoopTool tool = SqoopTool.getTool(toolName); + if (null == tool) { + throw new IOException("Error in job metadata: invalid tool " + + toolName); + } + + Configuration conf = new Configuration(); + for (Map.Entry<Object, Object> entry : configProps.entrySet()) { + conf.set(entry.getKey().toString(), entry.getValue().toString()); + } + + SqoopOptions opts = new SqoopOptions(); + opts.setConf(conf); + opts.loadProperties(sqoopOptProps); + + // Set the job connection information for this job. + opts.setJobName(jobName); + opts.setStorageDescriptor(connectedDescriptor); + + return new JobData(opts, tool); + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + } + + private boolean jobExists(String jobName) throws SQLException { + PreparedStatement s = connection.prepareStatement( + "SELECT COUNT(job_name) FROM " + connManager.escapeTableName(this.jobTableName) + + " WHERE job_name = ? GROUP BY job_name"); + ResultSet rs = null; + try { + s.setString(1, jobName); + rs = s.executeQuery(); + if (rs.next()) { + return true; // We got a result, meaning the job exists. + } + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing result set: " + sqlE); + } + } + + s.close(); + } + + return false; // No result. + } + + @Override + /** {@inheritDoc} */ + public void delete(String jobName) throws IOException { + try { + if (!jobExists(jobName)) { + LOG.error("No such job: " + jobName); + } else { + LOG.debug("Deleting job: " + jobName); + PreparedStatement s = connection.prepareStatement("DELETE FROM " + + connManager.escapeTableName(this.jobTableName) + " WHERE job_name = ?"); + try { + s.setString(1, jobName); + s.executeUpdate(); + } finally { + s.close(); + } + connection.commit(); + } + } catch (SQLException sqlEx) { + try { + connection.rollback(); + } catch (SQLException e2) { + LOG.warn("Error rolling back transaction in error handler: " + e2); + } + throw new IOException("Error communicating with database", sqlEx); + } + } + + @Override + /** {@inheritDoc} */ + public void create(String jobName, JobData data) + throws IOException { + try { + if (jobExists(jobName)) { + LOG.error("Cannot create job " + jobName + + ": it already exists"); + throw new IOException("Job " + jobName + " already exists"); + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + + createInternal(jobName, data); + } + + /** + * Actually insert/update the resources for this job. + */ + private void createInternal(String jobName, JobData data) + throws IOException { + try { + LOG.debug("Creating job: " + jobName); + + // Save the name of the Sqoop tool. + setV0Property(jobName, PROPERTY_CLASS_SCHEMA, SQOOP_TOOL_KEY, + data.getSqoopTool().getToolName()); + + // Save the property set id. + setV0Property(jobName, PROPERTY_CLASS_SCHEMA, PROPERTY_SET_KEY, + CUR_PROPERTY_SET_ID); + + // Save all properties of the SqoopOptions. + Properties props = data.getSqoopOptions().writeProperties(); + setV0Properties(jobName, PROPERTY_CLASS_SQOOP_OPTIONS, props); + + // And save all unique properties of the configuration. + Configuration saveConf = data.getSqoopOptions().getConf(); + Configuration baseConf = new Configuration(); + + for (Map.Entry<String, String> entry : saveConf) { + String key = entry.getKey(); + String rawVal = saveConf.getRaw(key); + String baseVal = baseConf.getRaw(key); + if (baseVal != null && rawVal.equals(baseVal)) { + continue; // Don't save this; it's set in the base configuration. + } + + LOG.debug("Saving " + key + " => " + rawVal + " / " + baseVal); + setV0Property(jobName, PROPERTY_CLASS_CONFIG, key, rawVal); + } + + connection.commit(); + } catch (SQLException sqlE) { + try { + connection.rollback(); + } catch (SQLException sqlE2) { + LOG.warn("Exception rolling back transaction during error handling: " + + sqlE2); + } + throw new IOException("Error communicating with database", sqlE); + } + } + + @Override + /** {@inheritDoc} */ + public void update(String jobName, JobData data) + throws IOException { + try { + if (!jobExists(jobName)) { + LOG.error("Cannot update job " + jobName + ": not found"); + throw new IOException("Job " + jobName + " does not exist"); + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + + // Since we set properties with update-or-insert, this is the same + // as create on this system. + createInternal(jobName, data); + } + + @Override + /** {@inheritDoc} */ + public List<String> list() throws IOException { + ResultSet rs = null; + try { + PreparedStatement s = connection.prepareStatement( + "SELECT DISTINCT job_name FROM " + connManager.escapeTableName(this.jobTableName)); + try { + rs = s.executeQuery(); + ArrayList<String> jobs = new ArrayList<String>(); + while (rs.next()) { + jobs.add(rs.getString(1)); + } + + return jobs; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + if (null != s) { + s.close(); + } + } + } catch (SQLException sqlE) { + throw new IOException("Error communicating with database", sqlE); + } + } + + // Determine the name to use for the root metadata table. + private String getRootTableName() { + Configuration conf = getConf(); + return conf.get(ROOT_TABLE_NAME_KEY, DEFAULT_ROOT_TABLE_NAME).toUpperCase(); + } + + private String getEscapedRootTableName() { + return connManager.escapeTableName(getRootTableName()); + } + + private boolean tableExists(String tableToCheck) throws SQLException { + String[] tables = connManager.listTables(); + for (String table : tables) { + if (table.equals(tableToCheck)) { + return true; + } + } + return false; + } + + private boolean rootTableExists() throws SQLException { + String rootTableName = getRootTableName(); + return tableExists(rootTableName); + } + + private void createRootTable() throws SQLException { + String rootTableName = getRootTableName(); + LOG.debug("Creating root table: " + rootTableName); + + // TODO: Sanity-check the value of rootTableName to ensure it is + // not a SQL-injection attack vector. + Statement s = connection.createStatement(); + try { + s.executeUpdate("CREATE TABLE " + getEscapedRootTableName() + " (" + + "version INT NOT NULL, " + + "propname VARCHAR(128) NOT NULL, " + + "propval VARCHAR(256), " + + "CONSTRAINT " + rootTableName + "_unq UNIQUE (version, propname))"); + } finally { + s.close(); + } + + setRootProperty(STORAGE_VERSION_KEY, NO_VERSION, + Integer.toString(CUR_STORAGE_VERSION)); + + LOG.debug("Saving root table."); + connection.commit(); + } + + /** + * Look up a value for the specified version (may be null) in the + * root metadata table. + */ + private String getRootProperty(String propertyName, Integer version) + throws SQLException { + LOG.debug("Looking up property " + propertyName + " for version " + + version); + PreparedStatement s = null; + ResultSet rs = null; + + try { + if (null == version) { + s = connection.prepareStatement( + "SELECT propval FROM " + getEscapedRootTableName() + + " WHERE version IS NULL AND propname = ?"); + s.setString(1, propertyName); + } else { + s = connection.prepareStatement( + "SELECT propval FROM " + getEscapedRootTableName() + " WHERE version = ? " + + " AND propname = ?"); + s.setInt(1, version); + s.setString(2, propertyName); + } + + rs = s.executeQuery(); + if (!rs.next()) { + LOG.debug(" => (no result)"); + return null; // No such result. + } else { + String result = rs.getString(1); // Return the only result col. + LOG.debug(" => " + result); + return result; + } + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + if (null != s) { + s.close(); + } + } + } + + /** + * Set a value for the specified version (may be null) in the root + * metadata table. + */ + private void setRootProperty(String propertyName, Integer version, + String val) throws SQLException { + LOG.debug("Setting property " + propertyName + " for version " + + version + " => " + val); + + PreparedStatement s; + String curVal = getRootProperty(propertyName, version); + if (null == curVal) { + // INSERT the row. + s = connection.prepareStatement("INSERT INTO " + getEscapedRootTableName() + + " (propval, propname, version) VALUES ( ? , ? , ? )"); + } else { + // UPDATE an existing row with non-null version. + s = connection.prepareStatement("UPDATE " + getEscapedRootTableName() + + " SET propval = ? WHERE propname = ? AND version = ?"); + } + + try { + s.setString(1, val); + s.setString(2, propertyName); + //Replaces null value with -1 constant, for backward compatibility + if (null == version) { + s.setInt(3, NO_VERSION); + } else { + s.setInt(3, version); + } + s.executeUpdate(); + } finally { + s.close(); + } + } + + /** + * Create the jobs table in the V0 schema. + */ + private void createJobTable() throws SQLException { + String curTableName = DEFAULT_SESSION_TABLE_NAME; + int tableNum = -1; + while (true) { + if (tableExists(curTableName)) { + tableNum++; + curTableName = DEFAULT_SESSION_TABLE_NAME + "_" + tableNum; + } else { + break; + } + } + + // curTableName contains a table name that does not exist. + // Create this table. + LOG.debug("Creating job storage table: " + curTableName); + Statement s = connection.createStatement(); + try { + s.executeUpdate("CREATE TABLE " + connManager.escapeTableName(curTableName) + " (" + + "job_name VARCHAR(64) NOT NULL, " + + "propname VARCHAR(128) NOT NULL, " + + "propval VARCHAR(1024), " + + "propclass VARCHAR(32) NOT NULL, " + + "CONSTRAINT " + curTableName + "_unq UNIQUE " + + "(job_name, propname, propclass))"); + + // Then set a property in the root table pointing to it. + setRootProperty(SESSION_TABLE_KEY, 0, curTableName); + connection.commit(); + } finally { + s.close(); + } + + this.jobTableName = curTableName; + } + + /** + * Given a root schema that exists, + * initialize a version-0 key/value storage schema on top of it, + * if it does not already exist. + */ + private void initV0Schema() throws SQLException { + this.jobTableName = getRootProperty(SESSION_TABLE_KEY, 0); + + checkForOldRootProperties(); + + if (null == this.jobTableName) { + createJobTable(); + } + if (!tableExists(this.jobTableName)) { + LOG.debug("Could not find job table: " + jobTableName); + createJobTable(); + } + } + + /** Checks to see if there is an existing job table under the old root table schema + * and reconfigures under the present schema, present for backward compatibility. **/ + private void checkForOldRootProperties() throws SQLException { + String hsqldbStorageJobTableName = getRootProperty(HSQLDB_TABLE_KEY, 0); + if(hsqldbStorageJobTableName != null && this.jobTableName == null) { + this.jobTableName = hsqldbStorageJobTableName; + setRootProperty(SESSION_TABLE_KEY, 0, jobTableName); + } + } + + /** + * INSERT or UPDATE a single (job, propname, class) to point + * to the specified property value. + */ + private void setV0Property(String jobName, String propClass, + String propName, String propVal) throws SQLException { + LOG.debug("Job: " + jobName + "; Setting property " + + propName + " with class " + propClass + " => " + propVal); + + PreparedStatement s = null; + try { + String curValue = getV0Property(jobName, propClass, propName); + if (null == curValue) { + // Property is not yet set. + s = connection.prepareStatement("INSERT INTO " + connManager.escapeTableName(this.jobTableName) + + " (propval, job_name, propclass, propname) " + + "VALUES (?, ?, ?, ?)"); + } else { + // Overwrite existing property. + s = connection.prepareStatement("UPDATE " + connManager.escapeTableName(this.jobTableName) + + " SET propval = ? WHERE job_name = ? AND propclass = ? " + + "AND propname = ?"); + } + + s.setString(1, propVal); + s.setString(2, jobName); + s.setString(3, propClass); + s.setString(4, propName); + + s.executeUpdate(); + } finally { + if (null != s) { + s.close(); + } + } + } + + /** + * Return a string containing the value of a specified property, + * or null if it is not set. + */ + private String getV0Property(String jobName, String propClass, + String propertyName) throws SQLException { + LOG.debug("Job: " + jobName + "; Getting property " + + propertyName + " with class " + propClass); + + ResultSet rs = null; + PreparedStatement s = connection.prepareStatement( + "SELECT propval FROM " + connManager.escapeTableName(this.jobTableName) + + " WHERE job_name = ? AND propclass = ? AND propname = ?"); + + try { + s.setString(1, jobName); + s.setString(2, propClass); + s.setString(3, propertyName); + rs = s.executeQuery(); + + if (!rs.next()) { + LOG.debug(" => (no result)"); + return null; + } + + String result = rs.getString(1); + LOG.debug(" => " + result); + return result; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing resultset: " + sqlE); + } + } + + s.close(); + } + } + + /** + * Get a java.util.Properties containing all propName -> propVal + * bindings for a given (jobName, propClass). + */ + private Properties getV0Properties(String jobName, String propClass) + throws SQLException { + LOG.debug("Job: " + jobName + + "; Getting properties with class " + propClass); + + ResultSet rs = null; + PreparedStatement s = connection.prepareStatement( + "SELECT propname, propval FROM " + connManager.escapeTableName(this.jobTableName) + + " WHERE job_name = ? AND propclass = ?"); + try { + s.setString(1, jobName); + s.setString(2, propClass); + rs = s.executeQuery(); + + Properties p = new Properties(); + while (rs.next()) { + p.setProperty(rs.getString(1), rs.getString(2)); + } + + return p; + } finally { + if (null != rs) { + try { + rs.close(); + } catch (SQLException sqlE) { + LOG.warn("Error closing result set: " + sqlE); + } + } + + s.close(); + } + } + + private void setV0Properties(String jobName, String propClass, + Properties properties) throws SQLException { + LOG.debug("Job: " + jobName + + "; Setting bulk properties for class " + propClass); + + for (Map.Entry<Object, Object> entry : properties.entrySet()) { + String key = entry.getKey().toString(); + String val = entry.getValue().toString(); + setV0Property(jobName, propClass, key, val); + } + } + + private ConnManager createConnManager() { + SqoopOptions sqoopOptions = new SqoopOptions(); + sqoopOptions.setConnectString(metastoreConnectStr); + sqoopOptions.setUsername(metastoreUser); + sqoopOptions.setPassword(metastorePassword); + JobData jd = new JobData(); + jd.setSqoopOptions(sqoopOptions); + DefaultManagerFactory dmf = new DefaultManagerFactory(); + return dmf.accept(jd); + } + +} + http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/metastore/JobStorageFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/metastore/JobStorageFactory.java b/src/java/org/apache/sqoop/metastore/JobStorageFactory.java index 2edc33b..9a348d5 100644 --- a/src/java/org/apache/sqoop/metastore/JobStorageFactory.java +++ b/src/java/org/apache/sqoop/metastore/JobStorageFactory.java @@ -42,8 +42,7 @@ public class JobStorageFactory { /** The default list of available JobStorage implementations. */ private static final String DEFAULT_AVAILABLE_STORAGES = - "com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage," - + "com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage"; + "com.cloudera.sqoop.metastore.GenericJobStorage"; public JobStorageFactory(Configuration config) { this.conf = config; http://git-wip-us.apache.org/repos/asf/sqoop/blob/d0770ac6/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java b/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java deleted file mode 100644 index 49e3031..0000000 --- a/src/java/org/apache/sqoop/metastore/hsqldb/AutoHsqldbStorage.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.metastore.hsqldb; - -import java.io.File; -import java.io.IOException; - -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; - -/** - * JobStorage implementation that auto-configures an HSQLDB - * local-file-based instance to hold jobs. - */ -public class AutoHsqldbStorage - extends com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage { - - public static final Log LOG = LogFactory.getLog( - AutoHsqldbStorage.class.getName()); - - /** - * Configuration key specifying whether this storage agent is active. - * Defaults to "on" to allow zero-conf local users. - */ - public static final String AUTO_STORAGE_IS_ACTIVE_KEY = - "sqoop.metastore.client.enable.autoconnect"; - - /** - * Configuration key specifying the connect string used by this - * storage agent. - */ - public static final String AUTO_STORAGE_CONNECT_STRING_KEY = - "sqoop.metastore.client.autoconnect.url"; - - /** - * Configuration key specifying the username to bind with. - */ - public static final String AUTO_STORAGE_USER_KEY = - "sqoop.metastore.client.autoconnect.username"; - - - /** HSQLDB default user is named 'SA'. */ - private static final String DEFAULT_AUTO_USER = "SA"; - - /** - * Configuration key specifying the password to bind with. - */ - public static final String AUTO_STORAGE_PASS_KEY = - "sqoop.metastore.client.autoconnect.password"; - - /** HSQLDB default user has an empty password. */ - public static final String DEFAULT_AUTO_PASSWORD = ""; - - @Override - /** {@inheritDoc} */ - public boolean canAccept(Map<String, String> descriptor) { - Configuration conf = this.getConf(); - return conf.getBoolean(AUTO_STORAGE_IS_ACTIVE_KEY, true); - } - - /** - * Determine the user's home directory and return a connect - * string to HSQLDB that uses ~/.sqoop/ as the storage location - * for the metastore database. - */ - private String getHomeDirFileConnectStr() { - String homeDir = System.getProperty("user.home"); - - File homeDirObj = new File(homeDir); - File sqoopDataDirObj = new File(homeDirObj, ".sqoop"); - File databaseFileObj = new File(sqoopDataDirObj, "metastore.db"); - - String dbFileStr = databaseFileObj.toString(); - return "jdbc:hsqldb:file:" + dbFileStr - + ";hsqldb.write_delay=false;shutdown=true"; - } - - @Override - /** - * Set the connection information to use the auto-inferred connection - * string. - */ - public void open(Map<String, String> descriptor) throws IOException { - Configuration conf = getConf(); - setMetastoreConnectStr(conf.get(AUTO_STORAGE_CONNECT_STRING_KEY, - getHomeDirFileConnectStr())); - setMetastoreUser(conf.get(AUTO_STORAGE_USER_KEY, DEFAULT_AUTO_USER)); - setMetastorePassword(conf.get(AUTO_STORAGE_PASS_KEY, - DEFAULT_AUTO_PASSWORD)); - setConnectedDescriptor(descriptor); - - init(); - } -} -
