Updated Branches: refs/heads/sqoop2 20bbef04c -> e5f664508
SQOOP-1139: Sqoop2: JobManager.java indentation is inconsistent with rest of the codebase (Venkat Ranganathan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e5f66450 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e5f66450 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e5f66450 Branch: refs/heads/sqoop2 Commit: e5f664508e751afeca4b2462a850ac5436bf439d Parents: 20bbef0 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Jul 17 14:31:55 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Jul 17 14:31:55 2013 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/framework/JobManager.java | 1189 +++++++++--------- 1 file changed, 606 insertions(+), 583 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5f66450/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index 2d37020..9f09982 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.sqoop.framework; import org.apache.log4j.Logger; @@ -43,571 +43,594 @@ import java.util.Date; import java.util.List; public class JobManager implements Reconfigurable { - /** - * Logger object. - */ - private static final Logger LOG = Logger.getLogger(JobManager.class); - - /** - * Private instance to singleton of this class. - */ - private static JobManager instance; - /** - * Create default object by default. - * - * Every Sqoop server application needs one so this should not be performance issue. - */ - static { - instance = new JobManager(); - } - - /** - * Return current instance. - * - * @return Current instance - */ - public static JobManager getInstance() { - return instance; - } - - /** - * Allows to set instance in case that it's need. - * - * This method should not be normally used as the default instance should be sufficient. One target - * user use case for this method are unit tests. - * - * @param newInstance New instance - */ - public static void setInstance(JobManager newInstance) { - instance = newInstance; - } - - /** - * Default interval for purging old submissions from repository. - */ - private static final long DEFAULT_PURGE_THRESHOLD = 24*60*60*1000; - - /** - * Default sleep interval for purge thread. - */ - private static final long DEFAULT_PURGE_SLEEP = 24*60*60*1000; - - /** - * Default interval for update thread. - */ - private static final long DEFAULT_UPDATE_SLEEP = 60*5*1000; - - /** - * Configured submission engine instance - */ - private SubmissionEngine submissionEngine; - - /** - * Configured execution engine instance - */ - private ExecutionEngine executionEngine; - - /** - * Purge thread that will periodically remove old submissions from repository. - */ - private PurgeThread purgeThread = null; - - /** - * Update thread that will periodically check status of running submissions. - */ - private UpdateThread updateThread = null; - - /** - * Synchronization variable between threads. - */ - private boolean running = true; - - /** - * Specifies how old submissions should be removed from repository. - */ - private long purgeThreshold; - - /** - * Number of milliseconds for purge thread to sleep. - */ - private long purgeSleep; - - /** - * Number of milliseconds for update thread to slepp. - */ - private long updateSleep; - - /** - * Base notification URL. - * - * Framework manager will always add job id. - */ - private String notificationBaseUrl; - - /** - * Set notification base URL. - * - * @param url Base URL - */ - public void setNotificationBaseUrl(String url) { - LOG.debug("Setting notification base URL to " + url); - notificationBaseUrl = url; - } - - /** - * Get base notification url. - * - * @return String representation of the URL - */ - public String getNotificationBaseUrl() { - return notificationBaseUrl; - } - - public synchronized void destroy() { - LOG.trace("Begin submission engine manager destroy"); - - running = false; - - try { - purgeThread.interrupt(); - purgeThread.join(); - } catch (InterruptedException e) { - //TODO(jarcec): Do I want to wait until it actually finish here? - LOG.error("Interrupted joining purgeThread"); - } - - try { - updateThread.interrupt(); - updateThread.join(); - } catch (InterruptedException e) { - //TODO(jarcec): Do I want to wait until it actually finish here? - LOG.error("Interrupted joining updateThread"); - } - - if(submissionEngine != null) { - submissionEngine.destroy(); - } - - if(executionEngine != null) { - executionEngine.destroy(); - } - } - - - public synchronized void initialize() { - LOG.trace("Begin submission engine manager initialization"); - MapContext context = SqoopConfiguration.getInstance().getContext(); - - - // Let's load configured submission engine - String submissionEngineClassName = - context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); - - submissionEngine = (SubmissionEngine) ClassUtils.instantiate(submissionEngineClassName); - if(submissionEngine == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0001, - submissionEngineClassName); - } - - submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG); - - // Execution engine - String executionEngineClassName = - context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); - - executionEngine = (ExecutionEngine) ClassUtils.instantiate(executionEngineClassName); - if(executionEngine == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0007, - executionEngineClassName); - } - - // We need to make sure that user has configured compatible combination of - // submission engine and execution engine - if(! submissionEngine.isExecutionEngineSupported(executionEngine.getClass())) { - throw new SqoopException(FrameworkError.FRAMEWORK_0008); - } - - executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG); - - // Set up worker threads - purgeThreshold = context.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, - DEFAULT_PURGE_THRESHOLD - ); - purgeSleep = context.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, - DEFAULT_PURGE_SLEEP - ); - - purgeThread = new PurgeThread(); - purgeThread.start(); - - updateSleep = context.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, - DEFAULT_UPDATE_SLEEP - ); - - updateThread = new UpdateThread(); - updateThread.start(); - - SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this)); - - LOG.info("Submission manager initialized: OK"); - } - public MSubmission submit(long jobId) { - Repository repository = RepositoryManager.getInstance().getRepository(); - - MJob job = repository.findJob(jobId); - if(job == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0004, - "Unknown job id " + jobId); - } - MConnection connection = repository.findConnection(job.getConnectionId()); - SqoopConnector connector = - ConnectorManager.getInstance().getConnector(job.getConnectorId()); - - // Transform forms to connector specific classes - Object connectorConnection = ClassUtils.instantiate( - connector.getConnectionConfigurationClass()); - FormUtils.fromForms(connection.getConnectorPart().getForms(), - connectorConnection); - - Object connectorJob = ClassUtils.instantiate( - connector.getJobConfigurationClass(job.getType())); - FormUtils.fromForms(job.getConnectorPart().getForms(), connectorJob); - - // Transform framework specific forms - Object frameworkConnection = ClassUtils.instantiate( - FrameworkManager.getInstance().getConnectionConfigurationClass()); - FormUtils.fromForms(connection.getFrameworkPart().getForms(), - frameworkConnection); - - Object frameworkJob = ClassUtils.instantiate( - FrameworkManager.getInstance().getJobConfigurationClass(job.getType())); - FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob); - - // Create request object - MSubmission summary = new MSubmission(jobId); - SubmissionRequest request = executionEngine.createSubmissionRequest(); - - // Save important variables to the submission request - request.setSummary(summary); - request.setConnector(connector); - request.setConfigConnectorConnection(connectorConnection); - request.setConfigConnectorJob(connectorJob); - request.setConfigFrameworkConnection(frameworkConnection); - request.setConfigFrameworkJob(frameworkJob); - request.setJobType(job.getType()); - request.setJobName(job.getName()); - request.setJobId(job.getPersistenceId()); - request.setNotificationUrl(notificationBaseUrl + jobId); - - // Let's register all important jars - // sqoop-common - request.addJarForClass(MapContext.class); - // sqoop-core - request.addJarForClass(FrameworkManager.class); - // sqoop-spi - request.addJarForClass(SqoopConnector.class); - // Execution engine jar - request.addJarForClass(executionEngine.getClass()); - // Connector in use - request.addJarForClass(connector.getClass()); - - // Extra libraries that Sqoop code requires - request.addJarForClass(JSONValue.class); - - // Get connector callbacks - switch (job.getType()) { - case IMPORT: - request.setConnectorCallbacks(connector.getImporter()); - break; - case EXPORT: - request.setConnectorCallbacks(connector.getExporter()); - break; - default: - throw new SqoopException(FrameworkError.FRAMEWORK_0005, - "Unsupported job type " + job.getType().name()); - } - LOG.debug("Using callbacks: " + request.getConnectorCallbacks()); - - // Initialize submission from connector perspective - CallbackBase baseCallbacks = request.getConnectorCallbacks(); - - Class<? extends Initializer> initializerClass = baseCallbacks.getInitializer(); - Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass); - - if(initializer == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create initializer instance: " + initializerClass.getName()); - } - - // Initializer context - InitializerContext initializerContext = new InitializerContext(request.getConnectorContext()); - - // Initialize submission from connector perspective - initializer.initialize(initializerContext, - request.getConfigConnectorConnection(), - request.getConfigConnectorJob()); - - // Add job specific jars to - request.addJars(initializer.getJars(initializerContext, - request.getConfigConnectorConnection(), - request.getConfigConnectorJob())); - - // Retrieve and persist the schema - request.getSummary().setConnectorSchema(initializer.getSchema( - initializerContext, - request.getConfigConnectorConnection(), - request.getConfigConnectorJob() - )); - - // Bootstrap job from framework perspective - switch (job.getType()) { - case IMPORT: - prepareImportSubmission(request); - break; - case EXPORT: - prepareExportSubmission(request); - break; - default: - throw new SqoopException(FrameworkError.FRAMEWORK_0005, - "Unsupported job type " + job.getType().name()); - } - - // Make sure that this job id is not currently running and submit the job - // only if it's not. - synchronized (getClass()) { - MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId); - if(lastSubmission != null && lastSubmission.getStatus().isRunning()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0002, - "Job with id " + jobId); - } - - // TODO(jarcec): We might need to catch all exceptions here to ensure - // that Destroyer will be executed in all cases. - boolean submitted = submissionEngine.submit(request); - if(!submitted) { - destroySubmission(request); - summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); - } - - repository.createSubmission(summary); - } - - // Return job status most recent - return summary; - } - - private void prepareImportSubmission(SubmissionRequest request) { - ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request.getConfigFrameworkJob(); - - // Initialize the map-reduce part (all sort of required classes, ...) - request.setOutputDirectory(jobConfiguration.output.outputDirectory); - - // We're directly moving configured number of extractors and loaders to - // underlying request object. In the future we might need to throttle this - // count based on other running jobs to meet our SLAs. - request.setExtractors(jobConfiguration.throttling.extractors); - request.setLoaders(jobConfiguration.throttling.loaders); - - // Delegate rest of the job to execution engine - executionEngine.prepareImportSubmission(request); - } - - private void prepareExportSubmission(SubmissionRequest request) { - ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request.getConfigFrameworkJob(); - - // We're directly moving configured number of extractors and loaders to - // underlying request object. In the future we might need to throttle this - // count based on other running jobs to meet our SLAs. - request.setExtractors(jobConfiguration.throttling.extractors); - request.setLoaders(jobConfiguration.throttling.loaders); - - // Delegate rest of the job to execution engine - executionEngine.prepareExportSubmission(request); - } - - /** - * Callback that will be called only if we failed to submit the job to the - * remote cluster. - */ - private void destroySubmission(SubmissionRequest request) { - CallbackBase baseCallbacks = request.getConnectorCallbacks(); - - Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer(); - Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass); - - if(destroyer == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create destroyer instance: " + destroyerClass.getName()); - } - - DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(), false, request.getSummary().getConnectorSchema()); - - // Initialize submission from connector perspective - destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), request.getConfigConnectorJob()); - } - - public MSubmission stop(long jobId) { - Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission submission = repository.findSubmissionLastForJob(jobId); - - if(submission == null || !submission.getStatus().isRunning()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0003, - "Job with id " + jobId + " is not running"); - } - - String externalId = submission.getExternalId(); - submissionEngine.stop(externalId); - - // Fetch new information to verify that the stop command has actually worked - update(submission); - - // Return updated structure - return submission; - } - - public MSubmission status(long jobId) { - Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission submission = repository.findSubmissionLastForJob(jobId); - - if(submission == null) { - return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); - } - - // If the submission is in running state, let's update it - if(submission.getStatus().isRunning()) { - update(submission); - } - - return submission; - } - - private void update(MSubmission submission) { - double progress = -1; - Counters counters = null; - String externalId = submission.getExternalId(); - SubmissionStatus newStatus = submissionEngine.status(externalId); - String externalLink = submissionEngine.externalLink(externalId); - - if(newStatus.isRunning()) { - progress = submissionEngine.progress(externalId); - } else { - counters = submissionEngine.counters(externalId); - } - - submission.setStatus(newStatus); - submission.setProgress(progress); - submission.setCounters(counters); - submission.setExternalLink(externalLink); - submission.setLastUpdateDate(new Date()); - - RepositoryManager.getInstance().getRepository().updateSubmission(submission); - } - - @Override - public synchronized void configurationChanged() { - LOG.info("Begin submission engine manager reconfiguring"); - MapContext newContext = SqoopConfiguration.getInstance().getContext(); - MapContext oldContext = SqoopConfiguration.getInstance().getOldContext(); - - String newSubmissionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); - if (newSubmissionEngineClassName == null - || newSubmissionEngineClassName.trim().length() == 0) { - throw new SqoopException(FrameworkError.FRAMEWORK_0001, - newSubmissionEngineClassName); - } - - String oldSubmissionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); - if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) { - LOG.warn("Submission engine cannot be replaced at the runtime. " + - "You might need to restart the server."); - } - - String newExecutionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); - if (newExecutionEngineClassName == null - || newExecutionEngineClassName.trim().length() == 0) { - throw new SqoopException(FrameworkError.FRAMEWORK_0007, - newExecutionEngineClassName); - } - - String oldExecutionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); - if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) { - LOG.warn("Execution engine cannot be replaced at the runtime. " + - "You might need to restart the server."); - } - - // Set up worker threads - purgeThreshold = newContext.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, - DEFAULT_PURGE_THRESHOLD - ); - purgeSleep = newContext.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, - DEFAULT_PURGE_SLEEP - ); - purgeThread.interrupt(); - - updateSleep = newContext.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, - DEFAULT_UPDATE_SLEEP - ); - updateThread.interrupt(); - - LOG.info("Submission engine manager reconfigured."); - } - - private class PurgeThread extends Thread { - public PurgeThread() { - super("PurgeThread"); - } - - public void run() { - LOG.info("Starting submission manager purge thread"); - - while(running) { - try { - LOG.info("Purging old submissions"); - Date threshold = new Date((new Date()).getTime() - purgeThreshold); - RepositoryManager.getInstance().getRepository().purgeSubmissions(threshold); - Thread.sleep(purgeSleep); - } catch (InterruptedException e) { - LOG.debug("Purge thread interrupted", e); - } - } - - LOG.info("Ending submission manager purge thread"); - } - } - - private class UpdateThread extends Thread { - public UpdateThread() { - super("UpdateThread"); - } - - public void run() { - LOG.info("Starting submission manager update thread"); - - while(running) { - try { - LOG.debug("Updating running submissions"); - - // Let's get all running submissions from repository to check them out - List<MSubmission> unfinishedSubmissions = - RepositoryManager.getInstance().getRepository().findSubmissionsUnfinished(); - - for(MSubmission submission : unfinishedSubmissions) { - update(submission); - } - - Thread.sleep(updateSleep); - } catch (InterruptedException e) { - LOG.debug("Purge thread interrupted", e); - } - } - - LOG.info("Ending submission manager update thread"); - } - } + /** + * Logger object. + */ + private static final Logger LOG = Logger.getLogger(JobManager.class); + + /** + * Private instance to singleton of this class. + */ + private static JobManager instance; + /** + * Create default object by default. + * + * Every Sqoop server application needs one so this should not be performance + * issue. + */ + static { + instance = new JobManager(); + } + + /** + * Return current instance. + * + * @return Current instance + */ + public static JobManager getInstance() { + return instance; + } + + /** + * Allows to set instance in case that it's need. + * + * This method should not be normally used as the default instance should be + * sufficient. One target user use case for this method are unit tests. + * + * @param newInstance + * New instance + */ + public static void setInstance(JobManager newInstance) { + instance = newInstance; + } + + /** + * Default interval for purging old submissions from repository. + */ + private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000; + + /** + * Default sleep interval for purge thread. + */ + private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000; + + /** + * Default interval for update thread. + */ + private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000; + + /** + * Configured submission engine instance + */ + private SubmissionEngine submissionEngine; + + /** + * Configured execution engine instance + */ + private ExecutionEngine executionEngine; + + /** + * Purge thread that will periodically remove old submissions from repository. + */ + private PurgeThread purgeThread = null; + + /** + * Update thread that will periodically check status of running submissions. + */ + private UpdateThread updateThread = null; + + /** + * Synchronization variable between threads. + */ + private boolean running = true; + + /** + * Specifies how old submissions should be removed from repository. + */ + private long purgeThreshold; + + /** + * Number of milliseconds for purge thread to sleep. + */ + private long purgeSleep; + + /** + * Number of milliseconds for update thread to slepp. + */ + private long updateSleep; + + /** + * Base notification URL. + * + * Framework manager will always add job id. + */ + private String notificationBaseUrl; + + /** + * Set notification base URL. + * + * @param url + * Base URL + */ + public void setNotificationBaseUrl(String url) { + LOG.debug("Setting notification base URL to " + url); + notificationBaseUrl = url; + } + + /** + * Get base notification url. + * + * @return String representation of the URL + */ + public String getNotificationBaseUrl() { + return notificationBaseUrl; + } + + public synchronized void destroy() { + LOG.trace("Begin submission engine manager destroy"); + + running = false; + + try { + purgeThread.interrupt(); + purgeThread.join(); + } catch (InterruptedException e) { + // TODO(jarcec): Do I want to wait until it actually finish here? + LOG.error("Interrupted joining purgeThread"); + } + + try { + updateThread.interrupt(); + updateThread.join(); + } catch (InterruptedException e) { + // TODO(jarcec): Do I want to wait until it actually finish here? + LOG.error("Interrupted joining updateThread"); + } + + if (submissionEngine != null) { + submissionEngine.destroy(); + } + + if (executionEngine != null) { + executionEngine.destroy(); + } + } + + public synchronized void initialize() { + LOG.trace("Begin submission engine manager initialization"); + MapContext context = SqoopConfiguration.getInstance().getContext(); + + // Let's load configured submission engine + String submissionEngineClassName = + context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); + + submissionEngine = (SubmissionEngine) ClassUtils + .instantiate(submissionEngineClassName); + if (submissionEngine == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0001, + submissionEngineClassName); + } + + submissionEngine.initialize(context, + FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG); + + // Execution engine + String executionEngineClassName = + context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); + + executionEngine = (ExecutionEngine) ClassUtils + .instantiate(executionEngineClassName); + if (executionEngine == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0007, + executionEngineClassName); + } + + // We need to make sure that user has configured compatible combination of + // submission engine and execution engine + if (!submissionEngine + .isExecutionEngineSupported(executionEngine.getClass())) { + throw new SqoopException(FrameworkError.FRAMEWORK_0008); + } + + executionEngine.initialize(context, + FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG); + + // Set up worker threads + purgeThreshold = context.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, + DEFAULT_PURGE_THRESHOLD + ); + purgeSleep = context.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, + DEFAULT_PURGE_SLEEP + ); + + purgeThread = new PurgeThread(); + purgeThread.start(); + + updateSleep = context.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, + DEFAULT_UPDATE_SLEEP + ); + + updateThread = new UpdateThread(); + updateThread.start(); + + SqoopConfiguration.getInstance().getProvider() + .registerListener(new CoreConfigurationListener(this)); + + LOG.info("Submission manager initialized: OK"); + } + + public MSubmission submit(long jobId) { + Repository repository = RepositoryManager.getInstance().getRepository(); + + MJob job = repository.findJob(jobId); + if (job == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0004, + "Unknown job id " + jobId); + } + MConnection connection = repository.findConnection(job.getConnectionId()); + SqoopConnector connector = + ConnectorManager.getInstance().getConnector(job.getConnectorId()); + + // Transform forms to connector specific classes + Object connectorConnection = ClassUtils.instantiate( + connector.getConnectionConfigurationClass()); + FormUtils.fromForms(connection.getConnectorPart().getForms(), + connectorConnection); + + Object connectorJob = ClassUtils.instantiate( + connector.getJobConfigurationClass(job.getType())); + FormUtils.fromForms(job.getConnectorPart().getForms(), connectorJob); + + // Transform framework specific forms + Object frameworkConnection = ClassUtils.instantiate( + FrameworkManager.getInstance().getConnectionConfigurationClass()); + FormUtils.fromForms(connection.getFrameworkPart().getForms(), + frameworkConnection); + + Object frameworkJob = ClassUtils.instantiate( + FrameworkManager.getInstance().getJobConfigurationClass(job.getType())); + FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob); + + // Create request object + MSubmission summary = new MSubmission(jobId); + SubmissionRequest request = executionEngine.createSubmissionRequest(); + + // Save important variables to the submission request + request.setSummary(summary); + request.setConnector(connector); + request.setConfigConnectorConnection(connectorConnection); + request.setConfigConnectorJob(connectorJob); + request.setConfigFrameworkConnection(frameworkConnection); + request.setConfigFrameworkJob(frameworkJob); + request.setJobType(job.getType()); + request.setJobName(job.getName()); + request.setJobId(job.getPersistenceId()); + request.setNotificationUrl(notificationBaseUrl + jobId); + + // Let's register all important jars + // sqoop-common + request.addJarForClass(MapContext.class); + // sqoop-core + request.addJarForClass(FrameworkManager.class); + // sqoop-spi + request.addJarForClass(SqoopConnector.class); + // Execution engine jar + request.addJarForClass(executionEngine.getClass()); + // Connector in use + request.addJarForClass(connector.getClass()); + + // Extra libraries that Sqoop code requires + request.addJarForClass(JSONValue.class); + + // Get connector callbacks + switch (job.getType()) { + case IMPORT: + request.setConnectorCallbacks(connector.getImporter()); + break; + case EXPORT: + request.setConnectorCallbacks(connector.getExporter()); + break; + default: + throw new SqoopException(FrameworkError.FRAMEWORK_0005, + "Unsupported job type " + job.getType().name()); + } + LOG.debug("Using callbacks: " + request.getConnectorCallbacks()); + + // Initialize submission from connector perspective + CallbackBase baseCallbacks = request.getConnectorCallbacks(); + + Class<? extends Initializer> initializerClass = baseCallbacks + .getInitializer(); + Initializer initializer = (Initializer) ClassUtils + .instantiate(initializerClass); + + if (initializer == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0006, + "Can't create initializer instance: " + initializerClass.getName()); + } + + // Initializer context + InitializerContext initializerContext = new InitializerContext( + request.getConnectorContext()); + + // Initialize submission from connector perspective + initializer.initialize(initializerContext, + request.getConfigConnectorConnection(), + request.getConfigConnectorJob()); + + // Add job specific jars to + request.addJars(initializer.getJars(initializerContext, + request.getConfigConnectorConnection(), + request.getConfigConnectorJob())); + + // Retrieve and persist the schema + request.getSummary().setConnectorSchema(initializer.getSchema( + initializerContext, + request.getConfigConnectorConnection(), + request.getConfigConnectorJob() + )); + + // Bootstrap job from framework perspective + switch (job.getType()) { + case IMPORT: + prepareImportSubmission(request); + break; + case EXPORT: + prepareExportSubmission(request); + break; + default: + throw new SqoopException(FrameworkError.FRAMEWORK_0005, + "Unsupported job type " + job.getType().name()); + } + + // Make sure that this job id is not currently running and submit the job + // only if it's not. + synchronized (getClass()) { + MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId); + if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { + throw new SqoopException(FrameworkError.FRAMEWORK_0002, + "Job with id " + jobId); + } + + // TODO(jarcec): We might need to catch all exceptions here to ensure + // that Destroyer will be executed in all cases. + boolean submitted = submissionEngine.submit(request); + if (!submitted) { + destroySubmission(request); + summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); + } + + repository.createSubmission(summary); + } + + // Return job status most recent + return summary; + } + + private void prepareImportSubmission(SubmissionRequest request) { + ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request + .getConfigFrameworkJob(); + + // Initialize the map-reduce part (all sort of required classes, ...) + request.setOutputDirectory(jobConfiguration.output.outputDirectory); + + // We're directly moving configured number of extractors and loaders to + // underlying request object. In the future we might need to throttle this + // count based on other running jobs to meet our SLAs. + request.setExtractors(jobConfiguration.throttling.extractors); + request.setLoaders(jobConfiguration.throttling.loaders); + + // Delegate rest of the job to execution engine + executionEngine.prepareImportSubmission(request); + } + + private void prepareExportSubmission(SubmissionRequest request) { + ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request + .getConfigFrameworkJob(); + + // We're directly moving configured number of extractors and loaders to + // underlying request object. In the future we might need to throttle this + // count based on other running jobs to meet our SLAs. + request.setExtractors(jobConfiguration.throttling.extractors); + request.setLoaders(jobConfiguration.throttling.loaders); + + // Delegate rest of the job to execution engine + executionEngine.prepareExportSubmission(request); + } + + /** + * Callback that will be called only if we failed to submit the job to the + * remote cluster. + */ + private void destroySubmission(SubmissionRequest request) { + CallbackBase baseCallbacks = request.getConnectorCallbacks(); + + Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer(); + Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass); + + if (destroyer == null) { + throw new SqoopException(FrameworkError.FRAMEWORK_0006, + "Can't create destroyer instance: " + destroyerClass.getName()); + } + + DestroyerContext destroyerContext = new DestroyerContext( + request.getConnectorContext(), false, request.getSummary() + .getConnectorSchema()); + + // Initialize submission from connector perspective + destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), + request.getConfigConnectorJob()); + } + + public MSubmission stop(long jobId) { + Repository repository = RepositoryManager.getInstance().getRepository(); + MSubmission submission = repository.findSubmissionLastForJob(jobId); + + if (submission == null || !submission.getStatus().isRunning()) { + throw new SqoopException(FrameworkError.FRAMEWORK_0003, + "Job with id " + jobId + " is not running"); + } + + String externalId = submission.getExternalId(); + submissionEngine.stop(externalId); + + // Fetch new information to verify that the stop command has actually worked + update(submission); + + // Return updated structure + return submission; + } + + public MSubmission status(long jobId) { + Repository repository = RepositoryManager.getInstance().getRepository(); + MSubmission submission = repository.findSubmissionLastForJob(jobId); + + if (submission == null) { + return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); + } + + // If the submission is in running state, let's update it + if (submission.getStatus().isRunning()) { + update(submission); + } + + return submission; + } + + private void update(MSubmission submission) { + double progress = -1; + Counters counters = null; + String externalId = submission.getExternalId(); + SubmissionStatus newStatus = submissionEngine.status(externalId); + String externalLink = submissionEngine.externalLink(externalId); + + if (newStatus.isRunning()) { + progress = submissionEngine.progress(externalId); + } else { + counters = submissionEngine.counters(externalId); + } + + submission.setStatus(newStatus); + submission.setProgress(progress); + submission.setCounters(counters); + submission.setExternalLink(externalLink); + submission.setLastUpdateDate(new Date()); + + RepositoryManager.getInstance().getRepository() + .updateSubmission(submission); + } + + @Override + public synchronized void configurationChanged() { + LOG.info("Begin submission engine manager reconfiguring"); + MapContext newContext = SqoopConfiguration.getInstance().getContext(); + MapContext oldContext = SqoopConfiguration.getInstance().getOldContext(); + + String newSubmissionEngineClassName = newContext + .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); + if (newSubmissionEngineClassName == null + || newSubmissionEngineClassName.trim().length() == 0) { + throw new SqoopException(FrameworkError.FRAMEWORK_0001, + newSubmissionEngineClassName); + } + + String oldSubmissionEngineClassName = oldContext + .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); + if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) { + LOG.warn("Submission engine cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + String newExecutionEngineClassName = newContext + .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); + if (newExecutionEngineClassName == null + || newExecutionEngineClassName.trim().length() == 0) { + throw new SqoopException(FrameworkError.FRAMEWORK_0007, + newExecutionEngineClassName); + } + + String oldExecutionEngineClassName = oldContext + .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); + if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) { + LOG.warn("Execution engine cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + // Set up worker threads + purgeThreshold = newContext.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, + DEFAULT_PURGE_THRESHOLD + ); + purgeSleep = newContext.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, + DEFAULT_PURGE_SLEEP + ); + purgeThread.interrupt(); + + updateSleep = newContext.getLong( + FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, + DEFAULT_UPDATE_SLEEP + ); + updateThread.interrupt(); + + LOG.info("Submission engine manager reconfigured."); + } + + private class PurgeThread extends Thread { + public PurgeThread() { + super("PurgeThread"); + } + + public void run() { + LOG.info("Starting submission manager purge thread"); + + while (running) { + try { + LOG.info("Purging old submissions"); + Date threshold = new Date((new Date()).getTime() - purgeThreshold); + RepositoryManager.getInstance().getRepository() + .purgeSubmissions(threshold); + Thread.sleep(purgeSleep); + } catch (InterruptedException e) { + LOG.debug("Purge thread interrupted", e); + } + } + + LOG.info("Ending submission manager purge thread"); + } + } + + private class UpdateThread extends Thread { + public UpdateThread() { + super("UpdateThread"); + } + + public void run() { + LOG.info("Starting submission manager update thread"); + + while (running) { + try { + LOG.debug("Updating running submissions"); + + // Let's get all running submissions from repository to check them out + List<MSubmission> unfinishedSubmissions = + RepositoryManager.getInstance().getRepository() + .findSubmissionsUnfinished(); + + for (MSubmission submission : unfinishedSubmissions) { + update(submission); + } + + Thread.sleep(updateSleep); + } catch (InterruptedException e) { + LOG.debug("Purge thread interrupted", e); + } + } + + LOG.info("Ending submission manager update thread"); + } + } }
