Updated Branches: refs/heads/sqoop2 156facc49 -> d62567ddf
SQOOP-971: Sqoop2: Component reconfigurability (Mengwei Ding via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d62567dd Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d62567dd Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d62567dd Branch: refs/heads/sqoop2 Commit: d62567ddf3b553956a3cb2a999ef47abdbc8eeb3 Parents: 156facc Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Jun 24 09:31:22 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Jun 24 09:31:22 2013 -0700 ---------------------------------------------------------------------- .../sqoop/connector/ConnectorManager.java | 16 +++- .../java/org/apache/sqoop/core/CoreError.java | 3 + .../org/apache/sqoop/core/Reconfigurable.java | 29 ++++++ .../apache/sqoop/core/SqoopConfiguration.java | 44 +++++++-- .../sqoop/framework/FrameworkManager.java | 14 ++- .../org/apache/sqoop/framework/JobManager.java | 57 ++++++++++- .../repository/JdbcRepositoryProvider.java | 99 ++++++++++++++++++++ .../sqoop/repository/RepositoryManager.java | 31 +++++- .../sqoop/repository/RepositoryProvider.java | 3 +- 9 files changed, 281 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java index 500189a..0540f6b 100644 --- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java +++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java @@ -33,12 +33,15 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.core.ConfigurationConstants; +import org.apache.sqoop.core.Reconfigurable; +import org.apache.sqoop.core.SqoopConfiguration; +import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.repository.RepositoryTransaction; import org.apache.sqoop.model.MConnector; -public class ConnectorManager { +public class ConnectorManager implements Reconfigurable { /** * Logger object. @@ -184,6 +187,8 @@ public class ConnectorManager { registerConnectors(); + SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this)); + if (LOG.isInfoEnabled()) { LOG.info("Connectors loaded: " + handlerMap); } @@ -231,4 +236,13 @@ public class ConnectorManager { handlerMap = null; nameMap = null; } + + @Override + public synchronized void configurationChanged() { + LOG.info("Begin connector manager reconfiguring"); + // If there are configuration options for ConnectorManager, + // implement the reconfiguration procedure right here. + LOG.info("Connector manager reconfigured"); + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/CoreError.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/CoreError.java b/core/src/main/java/org/apache/sqoop/core/CoreError.java index f59d132..eb7c1dc 100644 --- a/core/src/main/java/org/apache/sqoop/core/CoreError.java +++ b/core/src/main/java/org/apache/sqoop/core/CoreError.java @@ -51,6 +51,9 @@ public enum CoreError implements ErrorCode { /** The configuration system has not been initialized correctly. */ CORE_0007("System not initialized"), + /** The system has not been reconfigured */ + CORE_0008("System not reconfigured"); + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java b/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java new file mode 100644 index 0000000..d25ce41 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/core/Reconfigurable.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.core; + +/** + * Interface that make Sqoop Server components sensitive to + * configuration file changes at the runtime + */ +public interface Reconfigurable { + /** + * Method to notify each reconfigurable components + */ + public void configurationChanged(); +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java b/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java index deb24c9..13bbfc2 100644 --- a/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -33,7 +32,7 @@ import org.apache.sqoop.common.SqoopException; /** * Configuration manager that loads Sqoop configuration. */ -public class SqoopConfiguration { +public class SqoopConfiguration implements Reconfigurable { /** * Logger object. @@ -79,6 +78,7 @@ public class SqoopConfiguration { private boolean initialized = false; private ConfigurationProvider provider = null; private Map<String, String> config = null; + private Map<String, String> oldConfig = null; public synchronized void initialize() { if (initialized) { @@ -165,8 +165,9 @@ public class SqoopConfiguration { // Initialize the configuration provider provider.initialize(configDir, bootstrapProperties); - refreshConfiguration(); - provider.registerListener(new CoreConfigurationListener()); + configurationChanged(); + + provider.registerListener(new CoreConfigurationListener(SqoopConfiguration.getInstance())); initialized = true; } @@ -176,10 +177,19 @@ public class SqoopConfiguration { throw new SqoopException(CoreError.CORE_0007); } - Map<String,String> parameters = new HashMap<String, String>(); - parameters.putAll(config); + return new MapContext(config); + } + + public synchronized MapContext getOldContext() { + if (!initialized) { + throw new SqoopException(CoreError.CORE_0007); + } + + if (oldConfig == null) { + throw new SqoopException(CoreError.CORE_0008); + } - return new MapContext(parameters); + return new MapContext(oldConfig); } public synchronized void destroy() { @@ -193,6 +203,7 @@ public class SqoopConfiguration { provider = null; configDir = null; config = null; + oldConfig = null; initialized = false; } @@ -209,15 +220,28 @@ public class SqoopConfiguration { PropertyConfigurator.configure(props); } - private synchronized void refreshConfiguration() { + public ConfigurationProvider getProvider() { + return provider; + } + + @Override + public synchronized void configurationChanged() { + oldConfig = config; config = provider.getConfiguration(); configureLogging(); } - public class CoreConfigurationListener implements ConfigurationListener { + public static class CoreConfigurationListener implements ConfigurationListener { + + private Reconfigurable listener; + + public CoreConfigurationListener(Reconfigurable target) { + listener = target; + } + @Override public void configurationChanged() { - refreshConfiguration(); + listener.configurationChanged(); } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 704b809..a81306b 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -19,6 +19,9 @@ package org.apache.sqoop.framework; import org.apache.log4j.Logger; import org.apache.sqoop.connector.spi.MetadataUpgrader; +import org.apache.sqoop.core.Reconfigurable; +import org.apache.sqoop.core.SqoopConfiguration; +import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; import org.apache.sqoop.framework.configuration.ConnectionConfiguration; import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration; @@ -45,7 +48,7 @@ import java.util.ResourceBundle; * be the fastest way and we might want to introduce internal structures for * running jobs in case that this approach will be too slow. */ -public class FrameworkManager { +public class FrameworkManager implements Reconfigurable { /** * Logger object. @@ -141,6 +144,8 @@ public class FrameworkManager { // Register framework metadata in repository mFramework = RepositoryManager.getInstance().getRepository().registerFramework(mFramework); + SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this)); + LOG.info("Submission manager initialized: OK"); } @@ -165,4 +170,11 @@ public class FrameworkManager { FrameworkConstants.RESOURCE_BUNDLE_NAME, locale); } + @Override + public void configurationChanged() { + LOG.info("Begin framework manager reconfiguring"); + // If there are configuration options for FrameworkManager, + // implement the reconfiguration procedure right here. + LOG.info("Framework manager reconfigured"); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index 6d22c62..5a2f490 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -22,7 +22,9 @@ import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; +import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.job.etl.*; @@ -40,7 +42,7 @@ import org.json.simple.JSONValue; import java.util.Date; import java.util.List; -public class JobManager { +public class JobManager implements Reconfigurable { /** * Logger object. */ @@ -248,6 +250,8 @@ public class JobManager { updateThread = new UpdateThread(); updateThread.start(); + SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this)); + LOG.info("Submission manager initialized: OK"); } public MSubmission submit(long jobId) { @@ -495,6 +499,57 @@ public class JobManager { RepositoryManager.getInstance().getRepository().updateSubmission(submission); } + @Override + public synchronized void configurationChanged() { + LOG.info("Begin submission engine manager reconfiguring"); + MapContext newContext = SqoopConfiguration.getInstance().getContext(); + MapContext oldContext = SqoopConfiguration.getInstance().getOldContext(); + + String newSubmissionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); + if (newSubmissionEngineClassName == null + || newSubmissionEngineClassName.trim().length() == 0) { + throw new SqoopException(FrameworkError.FRAMEWORK_0001, + newSubmissionEngineClassName); + } + + String oldSubmissionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); + if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) { + LOG.warn("Submission engine cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + String newExecutionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); + if (newExecutionEngineClassName == null + || newExecutionEngineClassName.trim().length() == 0) { + throw new SqoopException(FrameworkError.FRAMEWORK_0007, + newExecutionEngineClassName); + } + + String oldExecutionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); + if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) { + LOG.warn("Execution engine cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + // Set up worker threads + purgeThreshold = newContext.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, + DEFAULT_PURGE_THRESHOLD + ); + purgeSleep = newContext.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, + DEFAULT_PURGE_SLEEP + ); + purgeThread.interrupt(); + + updateSleep = newContext.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, + DEFAULT_UPDATE_SLEEP + ); + updateThread.interrupt(); + + LOG.info("Submission engine manager reconfigured."); + } private class PurgeThread extends Thread { public PurgeThread() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java index 1fd092a..011527f 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryProvider.java @@ -164,4 +164,103 @@ public class JdbcRepositoryProvider implements RepositoryProvider { public synchronized Repository getRepository() { return repository; } + + @Override + public void configurationChanged() { + LOG.info("Begin JdbcRepository reconfiguring."); + JdbcRepositoryContext oldRepoContext = repoContext; + repoContext = new JdbcRepositoryContext(SqoopConfiguration.getInstance().getContext()); + + // reconfigure jdbc handler + String newJdbcHandlerClassName = repoContext.getHandlerClassName(); + if (newJdbcHandlerClassName == null + || newJdbcHandlerClassName.trim().length() == 0) { + throw new SqoopException(RepositoryError.JDBCREPO_0001, + newJdbcHandlerClassName); + } + + String oldJdbcHandlerClassName = oldRepoContext.getHandlerClassName(); + if (!newJdbcHandlerClassName.equals(oldJdbcHandlerClassName)) { + LOG.warn("Repository JDBC handler cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + // reconfigure jdbc driver + String newJdbcDriverClassName = repoContext.getDriverClass(); + if (newJdbcDriverClassName == null + || newJdbcDriverClassName.trim().length() == 0) { + throw new SqoopException(RepositoryError.JDBCREPO_0003, + newJdbcDriverClassName); + } + + String oldJdbcDriverClassName = oldRepoContext.getDriverClass(); + if (!newJdbcDriverClassName.equals(oldJdbcDriverClassName)) { + LOG.warn("Repository JDBC driver cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + // reconfigure max connection + connectionPool.setMaxActive(repoContext.getMaximumConnections()); + + // reconfigure the url of repository + String connectUrl = repoContext.getConnectionUrl(); + String oldurl = oldRepoContext.getConnectionUrl(); + if (connectUrl != null && !connectUrl.equals(oldurl)) { + LOG.warn("Repository URL cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + // if connection properties or transaction isolation option changes + boolean connFactoryChanged = false; + + // compare connection properties + if (!connFactoryChanged) { + Properties oldProp = oldRepoContext.getConnectionProperties(); + Properties newProp = repoContext.getConnectionProperties(); + + if (newProp.size() != oldProp.size()) { + connFactoryChanged = true; + } else { + for (Object key : newProp.keySet()) { + if (!newProp.getProperty((String) key).equals(oldProp.getProperty((String) key))) { + connFactoryChanged = true; + break; + } + } + } + } + + // compare the transaction isolation option + if (!connFactoryChanged) { + String oldTxOption = oldRepoContext.getTransactionIsolation().toString(); + String newTxOption = repoContext.getTransactionIsolation().toString(); + + if (!newTxOption.equals(oldTxOption)) { + connFactoryChanged = true; + } + } + + if (connFactoryChanged) { + // try to reconfigure connection factory + try { + LOG.info("Reconfiguring Connection Factory."); + Properties jdbcProps = repoContext.getConnectionProperties(); + + ConnectionFactory connFactory = + new DriverManagerConnectionFactory(connectUrl, jdbcProps); + + new PoolableConnectionFactory(connFactory, connectionPool, statementPool, + handler.validationQuery(), false, false, + repoContext.getTransactionIsolation().getCode()); + } catch (IllegalStateException ex) { + // failed to reconfigure connection factory + LOG.warn("Repository connection cannot be reconfigured currently. " + + "You might need to restart the server."); + } + } + + // ignore the create schema option, because the repo url is not allowed to change + + LOG.info("JdbcRepository reconfigured."); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java index a178238..d77a39b 100644 --- a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java +++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java @@ -22,10 +22,12 @@ import java.util.Map; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.MapContext; +import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; +import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; import org.apache.sqoop.utils.ClassUtils; -public class RepositoryManager { +public class RepositoryManager implements Reconfigurable { /** * Logger object. @@ -120,6 +122,8 @@ public class RepositoryManager { throw new SqoopException(RepositoryError.REPO_0002); } + SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this)); + LOG.info("Repository initialized: OK"); } @@ -134,4 +138,29 @@ public class RepositoryManager { public synchronized Repository getRepository() { return provider.getRepository(); } + + @Override + public synchronized void configurationChanged() { + LOG.info("Begin repository manager reconfiguring"); + MapContext newContext = SqoopConfiguration.getInstance().getContext(); + MapContext oldContext = SqoopConfiguration.getInstance().getOldContext(); + + String newProviderClassName = newContext.getString(RepoConfigurationConstants.SYSCFG_REPO_PROVIDER); + if (newProviderClassName == null + || newProviderClassName.trim().length() == 0) { + throw new SqoopException(RepositoryError.REPO_0001, + RepoConfigurationConstants.SYSCFG_REPO_PROVIDER); + } + + String oldProviderClassName = oldContext.getString(RepoConfigurationConstants.SYSCFG_REPO_PROVIDER); + if (!newProviderClassName.equals(oldProviderClassName)) { + LOG.warn("Repository provider cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + provider.configurationChanged(); + + LOG.info("Repository manager reconfigured."); + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d62567dd/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java index 4ea52e9..1ec6bdf 100644 --- a/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java +++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryProvider.java @@ -18,8 +18,9 @@ package org.apache.sqoop.repository; import org.apache.sqoop.common.MapContext; +import org.apache.sqoop.core.Reconfigurable; -public interface RepositoryProvider { +public interface RepositoryProvider extends Reconfigurable { void initialize(MapContext context);
