Repository: sqoop Updated Branches: refs/heads/sqoop2 dfd1fd348 -> 272fc2f8a
SQOOP-1741: Port SQOOP-1736 to sqoop2 branch (Abraham Elmahrek via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/272fc2f8 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/272fc2f8 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/272fc2f8 Branch: refs/heads/sqoop2 Commit: 272fc2f8af735dac21e095328722878bc1ddc4f5 Parents: dfd1fd3 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Nov 19 08:52:11 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Nov 19 08:52:11 2014 -0800 ---------------------------------------------------------------------- .../apache/sqoop/repository/JdbcRepository.java | 8 +- .../common/CommonRepositoryHandler.java | 2 +- .../sqoop/repository/derby/DerbyRepoError.java | 3 + .../derby/DerbyRepositoryHandler.java | 99 +++++++++++++++++++- .../derby/DerbySchemaUpgradeQuery.java | 26 +++++ .../apache/sqoop/tools/tool/UpgradeTool.java | 6 +- 6 files changed, 132 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index f41e60e..e5415e8 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -220,14 +220,14 @@ public class JdbcRepository extends Repository { return (MDriver) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn); - if (existingDriverConfig == null) { + MDriver existingDriver = handler.findDriver(mDriver.getUniqueName(), conn); + if (existingDriver == null) { handler.registerDriver(mDriver, conn); return mDriver; } else { // We're currently not serializing version into repository // so let's just compare the structure to see if we need upgrade. - if(!mDriver.equals(existingDriverConfig)) { + if(!mDriver.equals(existingDriver)) { if (autoUpgrade) { upgradeDriver(mDriver); return mDriver; @@ -236,7 +236,7 @@ public class JdbcRepository extends Repository { "Driver: " + mDriver.getPersistenceId()); } } - return existingDriverConfig; + return existingDriver; } } }); http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java index 3ae2bfc..c278406 100644 --- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java @@ -263,7 +263,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { baseInputStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_INPUT, Statement.RETURN_GENERATED_KEYS); - // Register the job config type, since driver config is per job + // Register a driver config as a job type with no direction registerConfigs(null, null, mDriver.getDriverConfig().getConfigs(), MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java index 769544b..6bc5674 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java @@ -53,6 +53,9 @@ public enum DerbyRepoError implements ErrorCode { /** Can't get ID of direction **/ DERBYREPO_0008("Could not get ID of recently added direction"), + /** The system was unable to register driver due to a server error **/ + DERBYREPO_0009("Registration of driver failed"), + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index d792554..907978f 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -44,7 +44,10 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorHandler; import org.apache.sqoop.connector.ConnectorManagerUtils; +import org.apache.sqoop.driver.Driver; import org.apache.sqoop.model.MConfigType; +import org.apache.sqoop.model.MConfigurableType; +import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MInputType; import org.apache.sqoop.repository.JdbcRepositoryContext; import org.apache.sqoop.repository.common.CommonRepoConstants; @@ -242,7 +245,10 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler { runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO, conn); - // Data modifications only for non-fresh install. + // force register HDFS-connector as a first class citizen in the connector list + // and re-associate old frameworks configs and connections/links with the new hdfs connector + // Data modifications only for non-fresh install hence the > 0 check + if (repositoryVersion > 0) { LOG.info("Force registering the HDFS connector as a new configurable"); long hdfsConnectorId = registerHdfsConnector(conn); @@ -267,6 +273,21 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler { runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn); + + // force register the driver as a first class citizen and re-associate the old framework configs with the new driver Id + // Data modifications only for non-fresh install hence the > 0 check + if (repositoryVersion > 0) { + LOG.info("Force registering the Driver as a new configurable"); + long driverId = registerDriver(conn); + LOG.info("Finished Force registering of the driver as a new configurable"); + + LOG.info("Updating config and inputs for the driver."); + updateDriverConfigInput(conn, driverId); + LOG.info("Finished Updating config and inputs for the driver."); + } + } + + if (repositoryVersion <= 4) { runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_ADD_UNIQUE_CONSTRAINT_NAME_TYPE_AND_CONFIGURABLE_ID, conn); runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_ADD_UNIQUE_CONSTRAINT_NAME_TYPE_AND_CONFIG_ID, conn); @@ -498,6 +519,7 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler { * 10. Update 'table' config names to 'fromJobConfig' and 'toJobConfig'. * Also update the relevant inputs as well. * @param conn + * @param hdfsConnectorId */ // NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection // tables instead of the latest config/link tables @@ -558,6 +580,74 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler { "toJobConfig", "toJobConfig", Direction.TO.toString()); } + // NOTE: This upgrade code happened after the SQOOP-1498 renaming, hence it + // uses the configurable and config + @Deprecated + private void updateDriverConfigInput(Connection conn, long driverId) { + + // update configs and inputs for driver + // update the name from throttling ==> throttlingConfig config and associate + // it with the driverId + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_NAME_FOR_DRIVER, conn, + "throttlingConfig", "throttling"); + + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_CONFIGURABLE_ID_FOR_DRIVER, conn, + driverId, "throttlingConfig"); + + // nuke security.maxConnections + runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_REMOVE_SECURITY_CONFIG_INPUT_FOR_DRIVER, conn, + "security.maxConnections"); + + // nuke the security config since 1.99.3 we do not use it + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_SECURITY_CONFIG_FOR_DRIVER, conn, + "security"); + + // update throttling.extractors ==> throttlingConfig.numExtractors + runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER, conn, + "throttlingConfig.numExtractors", "throttling.extractors"); + + // update throttling.loaders ==> throttlingConfig.numLoaders + runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER, conn, + "throttlingConfig.numLoaders", "throttling.loaders"); + + } + + /** + * Pre-register Driver since the 1.99.3 release NOTE: This should be used only + * in the upgrade path + */ + @Deprecated + protected long registerDriver(Connection conn) { + if (LOG.isTraceEnabled()) { + LOG.trace("Begin Driver loading."); + } + + PreparedStatement baseDriverStmt = null; + try { + baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE, + Statement.RETURN_GENERATED_KEYS); + baseDriverStmt.setString(1, MDriver.DRIVER_NAME); + baseDriverStmt.setString(2, Driver.getClassName()); + baseDriverStmt.setString(3, "1"); + baseDriverStmt.setString(4, MConfigurableType.DRIVER.name()); + + int baseDriverCount = baseDriverStmt.executeUpdate(); + if (baseDriverCount != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0003, Integer.toString(baseDriverCount)); + } + + ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys(); + + if (!rsetDriverId.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0004); + } + return rsetDriverId.getLong(1); + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0009, ex); + } finally { + closeStatements(baseDriverStmt); + } + } /** * Pre-register HDFS Connector so that config upgrade will work. @@ -582,10 +672,10 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler { if (handler.getConnectorConfigurable().getPersistenceId() != -1) { return handler.getConnectorConfigurable().getPersistenceId(); } - + PreparedStatement baseConnectorStmt = null; if (handler.getUniqueName().equals(CONNECTOR_HDFS)) { try { - PreparedStatement baseConnectorStmt = conn.prepareStatement( + baseConnectorStmt = conn.prepareStatement( STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS, Statement.RETURN_GENERATED_KEYS); baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName()); @@ -602,6 +692,8 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler { } } catch (SQLException e) { throw new SqoopException(DerbyRepoError.DERBYREPO_0004); + } finally { + closeStatements(baseConnectorStmt); } break; @@ -660,7 +752,6 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler { } } - /** * We are creating the LINK FORM for HDFS and later it the schema will * be renamed to LINK CONFIG http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java index 51024c8..fb48daf 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java @@ -200,6 +200,32 @@ public final class DerbySchemaUpgradeQuery { "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= NULL" + " WHERE " + COLUMN_SQF_NAME + "= ?"; + + /** Intended for force driver creation and its related upgrades*/ + + public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_NAME_FOR_DRIVER = + "UPDATE " + TABLE_SQ_CONFIG + " SET " + + COLUMN_SQ_CFG_NAME + "= ?" + + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"; + + public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_CONFIGURABLE_ID_FOR_DRIVER = + "UPDATE " + TABLE_SQ_CONFIG + " SET " + + COLUMN_SQ_CFG_CONFIGURABLE + "= ?" + + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"; + + public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_SECURITY_CONFIG_FOR_DRIVER = + "DELETE FROM " + TABLE_SQ_CONFIG + + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"; + + public static final String QUERY_UPGRADE_TABLE_SQ_INPUT_REMOVE_SECURITY_CONFIG_INPUT_FOR_DRIVER = + "DELETE FROM " + TABLE_SQ_INPUT + + " WHERE " + COLUMN_SQI_NAME + "= ?"; + + public static final String QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER = + "UPDATE " + TABLE_SQ_INPUT + " SET " + + COLUMN_SQI_NAME + "= ?" + + " WHERE " + COLUMN_SQI_NAME + "= ?"; + /** * Intended to change SQ_JOB_INPUT.SQBI_INPUT from EXPORT * throttling form, to IMPORT throttling form. http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java index f117411..ba88ddd 100644 --- a/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java +++ b/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java @@ -38,12 +38,12 @@ public class UpgradeTool extends ConfiguredTool { LOG.info("Initializing the RepositoryManager with immutable option turned off."); RepositoryManager.getInstance().initialize(false); - LOG.info("Initializing the Driver with upgrade option turned on."); - Driver.getInstance().initialize(true); - LOG.info("Initializing the Connection Manager with upgrade option turned on."); ConnectorManager.getInstance().initialize(true); + LOG.info("Initializing the Driver with upgrade option turned on."); + Driver.getInstance().initialize(true); + LOG.info("Upgrade completed successfully."); LOG.info("Tearing all managers down.");
