http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java new file mode 100644 index 0000000..277c6be --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -0,0 +1,712 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import java.util.Date; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.MapContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.ConnectorManager; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.core.Reconfigurable; +import org.apache.sqoop.core.SqoopConfiguration; +import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; +import org.apache.sqoop.driver.configuration.JobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.job.etl.Transferable; +import org.apache.sqoop.model.FormUtils; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.repository.Repository; +import org.apache.sqoop.repository.RepositoryManager; +import org.apache.sqoop.request.HttpEventContext; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.submission.SubmissionStatus; +import org.apache.sqoop.submission.counter.Counters; +import org.apache.sqoop.utils.ClassUtils; +import org.json.simple.JSONValue; + +public class JobManager implements Reconfigurable { + /** + * Logger object. + */ + private static final Logger LOG = Logger.getLogger(JobManager.class); + + /** + * Private instance to singleton of this class. + */ + private static JobManager instance; + /** + * Create default object by default. + * + * Every Sqoop server application needs one so this should not be performance + * issue. + */ + static { + instance = new JobManager(); + } + + /** + * Return current instance. + * + * @return Current instance + */ + public static JobManager getInstance() { + return instance; + } + + /** + * Allows to set instance in case that it's need. + * + * This method should not be normally used as the default instance should be + * sufficient. One target user use case for this method are unit tests. + * + * @param newInstance + * New instance + */ + public static void setInstance(JobManager newInstance) { + instance = newInstance; + } + + /** + * Default interval for purging old submissions from repository. + */ + private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000; + + /** + * Default sleep interval for purge thread. + */ + private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000; + + /** + * Default interval for update thread. + */ + private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000; + + /** + * Configured submission engine instance + */ + private SubmissionEngine submissionEngine; + + /** + * Configured execution engine instance + */ + private ExecutionEngine executionEngine; + + /** + * Purge thread that will periodically remove old submissions from repository. + */ + private PurgeThread purgeThread = null; + + /** + * Update thread that will periodically check status of running submissions. + */ + private UpdateThread updateThread = null; + + /** + * Synchronization variable between threads. + */ + private boolean running = true; + + /** + * Specifies how old submissions should be removed from repository. + */ + private long purgeThreshold; + + /** + * Number of milliseconds for purge thread to sleep. + */ + private long purgeSleep; + + /** + * Number of milliseconds for update thread to slepp. + */ + private long updateSleep; + + /** + * Base notification URL. + * + * Driver manager will always add job id. + */ + private String notificationBaseUrl; + + /** + * Set notification base URL. + * + * @param url + * Base URL + */ + public void setNotificationBaseUrl(String url) { + LOG.debug("Setting notification base URL to " + url); + notificationBaseUrl = url; + } + + /** + * Get base notification url. + * + * @return String representation of the URL + */ + public String getNotificationBaseUrl() { + return notificationBaseUrl; + } + + public synchronized void destroy() { + LOG.trace("Begin submission engine manager destroy"); + + running = false; + + try { + purgeThread.interrupt(); + purgeThread.join(); + } catch (InterruptedException e) { + // TODO(jarcec): Do I want to wait until it actually finish here? + LOG.error("Interrupted joining purgeThread"); + } + + try { + updateThread.interrupt(); + updateThread.join(); + } catch (InterruptedException e) { + // TODO(jarcec): Do I want to wait until it actually finish here? + LOG.error("Interrupted joining updateThread"); + } + + if (submissionEngine != null) { + submissionEngine.destroy(); + } + + if (executionEngine != null) { + executionEngine.destroy(); + } + } + + public synchronized void initialize() { + LOG.trace("Begin submission engine manager initialization"); + MapContext context = SqoopConfiguration.getInstance().getContext(); + + // Let's load configured submission engine + String submissionEngineClassName = + context.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE); + + submissionEngine = (SubmissionEngine) ClassUtils + .instantiate(submissionEngineClassName); + if (submissionEngine == null) { + throw new SqoopException(DriverError.DRIVER_0001, + submissionEngineClassName); + } + + submissionEngine.initialize(context, + DriverConstants.PREFIX_SUBMISSION_ENGINE_CONFIG); + + // Execution engine + String executionEngineClassName = + context.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE); + + executionEngine = (ExecutionEngine) ClassUtils + .instantiate(executionEngineClassName); + if (executionEngine == null) { + throw new SqoopException(DriverError.DRIVER_0007, + executionEngineClassName); + } + + // We need to make sure that user has configured compatible combination of + // submission engine and execution engine + if (!submissionEngine + .isExecutionEngineSupported(executionEngine.getClass())) { + throw new SqoopException(DriverError.DRIVER_0008); + } + + executionEngine.initialize(context, + DriverConstants.PREFIX_EXECUTION_ENGINE_CONFIG); + + // Set up worker threads + purgeThreshold = context.getLong( + DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, + DEFAULT_PURGE_THRESHOLD + ); + purgeSleep = context.getLong( + DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, + DEFAULT_PURGE_SLEEP + ); + + purgeThread = new PurgeThread(); + purgeThread.start(); + + updateSleep = context.getLong( + DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, + DEFAULT_UPDATE_SLEEP + ); + + updateThread = new UpdateThread(); + updateThread.start(); + + SqoopConfiguration.getInstance().getProvider() + .registerListener(new CoreConfigurationListener(this)); + + LOG.info("Submission manager initialized: OK"); + } + + public MSubmission submit(long jobId, HttpEventContext ctx) { + + MSubmission mSubmission = createJobSubmission(ctx, jobId); + JobRequest jobRequest = createJobRequest(jobId, mSubmission); + // Bootstrap job to execute + prepareJob(jobRequest); + // Make sure that this job id is not currently running and submit the job + // only if it's not. + synchronized (getClass()) { + MSubmission lastSubmission = RepositoryManager.getInstance().getRepository() + .findSubmissionLastForJob(jobId); + if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { + throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId); + } + // TODO(Abe): Call multiple destroyers. + // TODO(jarcec): We might need to catch all exceptions here to ensure + // that Destroyer will be executed in all cases. + // NOTE: the following is a blocking call + boolean success = submissionEngine.submit(jobRequest); + if (!success) { + destroySubmission(jobRequest); + mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); + } + RepositoryManager.getInstance().getRepository().createSubmission(mSubmission); + } + return mSubmission; + } + + private JobRequest createJobRequest(long jobId, MSubmission submission) { + // get job + MJob job = getJob(jobId); + + // get from/to connections for the job + MLink fromConnection = getLink(job.getLinkId(Direction.FROM)); + MLink toConnection = getLink(job.getLinkId(Direction.TO)); + + // get from/to connectors for the connection + SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId()); + validateSupportedDirection(fromConnector, Direction.FROM); + SqoopConnector toConnector = getConnector(toConnection.getConnectorId()); + validateSupportedDirection(toConnector, Direction.TO); + + // Transform config to fromConnector specific classes + Object fromConnectionConfig = ClassUtils.instantiate(fromConnector + .getLinkConfigurationClass()); + FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig); + + // Transform config to toConnector specific classes + Object toConnectorConfig = ClassUtils + .instantiate(toConnector.getLinkConfigurationClass()); + FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig); + + Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM)); + FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob); + + Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO)); + FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob); + + // Transform framework specific configs + // Q(VB) : Aren't the following 2 exactly the same? + Object fromDriverConnection = ClassUtils.instantiate(Driver.getInstance() + .getLinkConfigurationClass()); + FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromDriverConnection); + + Object toDriverConnection = ClassUtils.instantiate(Driver.getInstance() + .getLinkConfigurationClass()); + FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toDriverConnection); + + Object frameworkJob = ClassUtils.instantiate(Driver.getInstance() + .getJobConfigurationClass()); + FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob); + + // Create a job request for submit/execution + JobRequest jobRequest = executionEngine.createJobRequest(); + // Save important variables to the job request + jobRequest.setSummary(submission); + jobRequest.setConnector(Direction.FROM, fromConnector); + jobRequest.setConnector(Direction.TO, toConnector); + jobRequest.setConnectorLinkConfig(Direction.FROM, fromConnectionConfig); + jobRequest.setConnectorLinkConfig(Direction.TO, toConnectorConfig); + jobRequest.setConnectorJobConfig(Direction.FROM, fromJob); + jobRequest.setConnectorJobConfig(Direction.TO, toJob); + // TODO(Abe): Should we actually have 2 different Driver Connection config objects? + jobRequest.setFrameworkLinkConfig(Direction.FROM, fromDriverConnection); + jobRequest.setFrameworkLinkConfig(Direction.TO, toDriverConnection); + jobRequest.setFrameworkJobConfig(frameworkJob); + jobRequest.setJobName(job.getName()); + jobRequest.setJobId(job.getPersistenceId()); + jobRequest.setNotificationUrl(notificationBaseUrl + jobId); + Class<? extends IntermediateDataFormat<?>> dataFormatClass = + fromConnector.getIntermediateDataFormat(); + jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat()); + + + jobRequest.setFrom(fromConnector.getFrom()); + jobRequest.setTo(toConnector.getTo()); + + addStandardJars(jobRequest); + addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass); + addConnectorInitializerJars(jobRequest, Direction.FROM); + addConnectorInitializerJars(jobRequest, Direction.TO); + + Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM); + Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO); + + // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378 + if (fromSchema != null) { + jobRequest.getSummary().setFromSchema(fromSchema); + } + else { + jobRequest.getSummary().setFromSchema(toSchema); + } + LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo()); + return jobRequest; + } + + private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector, + SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) { + jobRequest.addJarForClass(fromConnector.getClass()); + jobRequest.addJarForClass(toConnector.getClass()); + jobRequest.addJarForClass(dataFormatClass); + } + + private void addStandardJars(JobRequest jobRequest) { + // Let's register all important jars + // sqoop-common + jobRequest.addJarForClass(MapContext.class); + // sqoop-core + jobRequest.addJarForClass(Driver.class); + // sqoop-spi + jobRequest.addJarForClass(SqoopConnector.class); + // Execution engine jar + jobRequest.addJarForClass(executionEngine.getClass()); + // Extra libraries that Sqoop code requires + jobRequest.addJarForClass(JSONValue.class); + } + + MSubmission createJobSubmission(HttpEventContext ctx, long jobId) { + MSubmission summary = new MSubmission(jobId); + summary.setCreationUser(ctx.getUsername()); + summary.setLastUpdateUser(ctx.getUsername()); + return summary; + } + + SqoopConnector getConnector(long connnectorId) { + return ConnectorManager.getInstance().getConnector(connnectorId); + } + + void validateSupportedDirection(SqoopConnector connector, Direction direction) { + // Make sure that connector supports the given direction + if (!connector.getSupportedDirections().contains(direction)) { + throw new SqoopException(DriverError.DRIVER_0011, "Connector: " + + connector.getClass().getCanonicalName()); + } + } + + MLink getLink(long linkId) { + MLink link = RepositoryManager.getInstance().getRepository() + .findLink(linkId); + if (!link.getEnabled()) { + throw new SqoopException(DriverError.DRIVER_0010, "Connection id: " + + link.getPersistenceId()); + } + return link; + } + + MJob getJob(long jobId) { + MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId); + if (job == null) { + throw new SqoopException(DriverError.DRIVER_0004, "Unknown job id: " + jobId); + } + + if (!job.getEnabled()) { + throw new SqoopException(DriverError.DRIVER_0009, "Job id: " + job.getPersistenceId()); + } + return job; + } + + private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) { + + Initializer initializer = getConnectorInitializer(jobRequest, direction); + + // Initializer context + InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); + + // Initialize submission from the connector perspective + initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction), + jobRequest.getConnectorJobConfig(direction)); + + // TODO(Abe): Alter behavior of Schema here. + return initializer.getSchema(initializerContext, + jobRequest.getConnectorLinkConfig(direction), + jobRequest.getConnectorJobConfig(direction)); + } + + private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) { + + Initializer initializer = getConnectorInitializer(jobRequest, direction); + InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); + // Add job specific jars to + jobRequest.addJars(initializer.getJars(initializerContext, + jobRequest.getConnectorLinkConfig(direction), + jobRequest.getConnectorJobConfig(direction))); + } + + private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) { + Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo(); + Class<? extends Initializer> initializerClass = transferable.getInitializer(); + Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass); + + if (initializer == null) { + throw new SqoopException(DriverError.DRIVER_0006, + "Can't create connector initializer instance: " + initializerClass.getName()); + } + return initializer; + } + + private InitializerContext getConnectorInitializerContext(JobRequest jobRequest, Direction direction) { + return new InitializerContext(jobRequest.getConnectorContext(direction)); + } + + void prepareJob(JobRequest request) { + JobConfiguration jobConfiguration = (JobConfiguration) request.getFrameworkJobConfig(); + // We're directly moving configured number of extractors and loaders to + // underlying request object. In the future we might need to throttle this + // count based on other running jobs to meet our SLAs. + request.setExtractors(jobConfiguration.throttling.extractors); + request.setLoaders(jobConfiguration.throttling.loaders); + + // Delegate rest of the job to execution engine + executionEngine.prepareJob(request); + } + + /** + * Callback that will be called only if we failed to submit the job to the + * remote cluster. + */ + void destroySubmission(JobRequest request) { + Transferable from = request.getFrom(); + Transferable to = request.getTo(); + + Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer(); + Class<? extends Destroyer> toDestroyerClass = to.getDestroyer(); + Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass); + Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass); + + if (fromDestroyer == null) { + throw new SqoopException(DriverError.DRIVER_0006, + "Can't create toDestroyer instance: " + fromDestroyerClass.getName()); + } + + if (toDestroyer == null) { + throw new SqoopException(DriverError.DRIVER_0006, + "Can't create toDestroyer instance: " + toDestroyerClass.getName()); + } + + // TODO(Abe): Update context to manage multiple connectors. As well as summary. + DestroyerContext fromDestroyerContext = new DestroyerContext( + request.getConnectorContext(Direction.FROM), false, request.getSummary() + .getFromSchema()); + DestroyerContext toDestroyerContext = new DestroyerContext( + request.getConnectorContext(Direction.TO), false, request.getSummary() + .getToSchema()); + + // destroy submission from connector perspective + fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM), + request.getConnectorJobConfig(Direction.FROM)); + toDestroyer.destroy(toDestroyerContext, request.getConnectorLinkConfig(Direction.TO), + request.getConnectorJobConfig(Direction.TO)); + } + + public MSubmission stop(long jobId, HttpEventContext ctx) { + + Repository repository = RepositoryManager.getInstance().getRepository(); + MSubmission mSubmission = repository.findSubmissionLastForJob(jobId); + + if (mSubmission == null || !mSubmission.getStatus().isRunning()) { + throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId + + " is not running"); + } + submissionEngine.stop(mSubmission.getExternalId()); + + mSubmission.setLastUpdateUser(ctx.getUsername()); + + // Fetch new information to verify that the stop command has actually worked + update(mSubmission); + + // Return updated structure + return mSubmission; + } + + public MSubmission status(long jobId) { + Repository repository = RepositoryManager.getInstance().getRepository(); + MSubmission mSubmission = repository.findSubmissionLastForJob(jobId); + + if (mSubmission == null) { + return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); + } + + // If the submission is in running state, let's update it + if (mSubmission.getStatus().isRunning()) { + update(mSubmission); + } + + return mSubmission; + } + + private void update(MSubmission submission) { + double progress = -1; + Counters counters = null; + String externalId = submission.getExternalId(); + SubmissionStatus newStatus = submissionEngine.status(externalId); + String externalLink = submissionEngine.externalLink(externalId); + + if (newStatus.isRunning()) { + progress = submissionEngine.progress(externalId); + } else { + counters = submissionEngine.counters(externalId); + } + + submission.setStatus(newStatus); + submission.setProgress(progress); + submission.setCounters(counters); + submission.setExternalLink(externalLink); + submission.setLastUpdateDate(new Date()); + + RepositoryManager.getInstance().getRepository() + .updateSubmission(submission); + } + + @Override + public synchronized void configurationChanged() { + LOG.info("Begin submission engine manager reconfiguring"); + MapContext newContext = SqoopConfiguration.getInstance().getContext(); + MapContext oldContext = SqoopConfiguration.getInstance().getOldContext(); + + String newSubmissionEngineClassName = newContext + .getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE); + if (newSubmissionEngineClassName == null + || newSubmissionEngineClassName.trim().length() == 0) { + throw new SqoopException(DriverError.DRIVER_0001, + newSubmissionEngineClassName); + } + + String oldSubmissionEngineClassName = oldContext + .getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE); + if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) { + LOG.warn("Submission engine cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + String newExecutionEngineClassName = newContext + .getString(DriverConstants.SYSCFG_EXECUTION_ENGINE); + if (newExecutionEngineClassName == null + || newExecutionEngineClassName.trim().length() == 0) { + throw new SqoopException(DriverError.DRIVER_0007, + newExecutionEngineClassName); + } + + String oldExecutionEngineClassName = oldContext + .getString(DriverConstants.SYSCFG_EXECUTION_ENGINE); + if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) { + LOG.warn("Execution engine cannot be replaced at the runtime. " + + "You might need to restart the server."); + } + + // Set up worker threads + purgeThreshold = newContext.getLong( + DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, + DEFAULT_PURGE_THRESHOLD + ); + purgeSleep = newContext.getLong( + DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, + DEFAULT_PURGE_SLEEP + ); + purgeThread.interrupt(); + + updateSleep = newContext.getLong( + DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, + DEFAULT_UPDATE_SLEEP + ); + updateThread.interrupt(); + + LOG.info("Submission engine manager reconfigured."); + } + + private class PurgeThread extends Thread { + public PurgeThread() { + super("PurgeThread"); + } + + public void run() { + LOG.info("Starting submission manager purge thread"); + + while (running) { + try { + LOG.info("Purging old submissions"); + Date threshold = new Date((new Date()).getTime() - purgeThreshold); + RepositoryManager.getInstance().getRepository() + .purgeSubmissions(threshold); + Thread.sleep(purgeSleep); + } catch (InterruptedException e) { + LOG.debug("Purge thread interrupted", e); + } + } + + LOG.info("Ending submission manager purge thread"); + } + } + + private class UpdateThread extends Thread { + public UpdateThread() { + super("UpdateThread"); + } + + public void run() { + LOG.info("Starting submission manager update thread"); + + while (running) { + try { + LOG.debug("Updating running submissions"); + + // Let's get all running submissions from repository to check them out + List<MSubmission> unfinishedSubmissions = + RepositoryManager.getInstance().getRepository() + .findSubmissionsUnfinished(); + + for (MSubmission submission : unfinishedSubmissions) { + update(submission); + } + + Thread.sleep(updateSleep); + } catch (InterruptedException e) { + LOG.debug("Purge thread interrupted", e); + } + } + + LOG.info("Ending submission manager update thread"); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/JobRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java new file mode 100644 index 0000000..63e1e49 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.common.DirectionError; +import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.idf.IntermediateDataFormat; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.Transferable; +import org.apache.sqoop.model.MSubmission; +import org.apache.sqoop.utils.ClassUtils; + +import java.util.LinkedList; +import java.util.List; + +/** + * Submission details class is used when creating new submission and contains + * all information that we need to create a new submission (including mappers, + * reducers, ...). + */ +public class JobRequest { + + /** + * Submission summary + */ + MSubmission summary; + + /** + * Original job name + */ + String jobName; + + /** + * Associated job (from metadata perspective) id + */ + long jobId; + + /** + * Connector instances associated with this submission request + */ + SqoopConnector fromConnector; + SqoopConnector toConnector; + + /** + * List of required local jars for the job + */ + List<String> jars; + + /** + * From entity + */ + Transferable from; + + /** + * To entity + */ + Transferable to; + + /** + * All configuration objects + */ + Object fromConnectorLinkConfig; + Object toConnectorLinkConfig; + Object fromConnectorJobConfig; + Object toConnectorJobConfig; + Object fromFrameworkLinkConfig; + Object toFrameworkLinkConfig; + Object frameworkJobConfig; + + /** + * Connector context (submission specific configuration) + */ + MutableMapContext fromConnectorContext; + MutableMapContext toConnectorContext; + + /** + * Framework context (submission specific configuration) + */ + MutableMapContext driverContext; + + /** + * Optional notification URL for job progress + */ + String notificationUrl; + + /** + * Number of extractors + */ + Integer extractors; + + /** + * Number of loaders + */ + Integer loaders; + + /** + * The intermediate data format this submission should use. + */ + Class<? extends IntermediateDataFormat> intermediateDataFormat; + + public JobRequest() { + this.jars = new LinkedList<String>(); + this.fromConnectorContext = new MutableMapContext(); + this.toConnectorContext = new MutableMapContext(); + this.driverContext = new MutableMapContext(); + this.fromConnector = null; + this.toConnector = null; + this.fromConnectorLinkConfig = null; + this.toConnectorLinkConfig = null; + this.fromConnectorJobConfig = null; + this.toConnectorJobConfig = null; + this.fromFrameworkLinkConfig = null; + this.toFrameworkLinkConfig = null; + } + + public MSubmission getSummary() { + return summary; + } + + public void setSummary(MSubmission summary) { + this.summary = summary; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public long getJobId() { + return jobId; + } + + public void setJobId(long jobId) { + this.jobId = jobId; + } + + public SqoopConnector getConnector(Direction type) { + switch(type) { + case FROM: + return fromConnector; + + case TO: + return toConnector; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public void setConnector(Direction type, SqoopConnector connector) { + switch(type) { + case FROM: + fromConnector = connector; + break; + + case TO: + toConnector = connector; + break; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public List<String> getJars() { + return jars; + } + + public void addJar(String jar) { + if(!jars.contains(jar)) { + jars.add(jar); + } + } + + public void addJarForClass(Class klass) { + addJar(ClassUtils.jarForClass(klass)); + } + + public void addJars(List<String> jars) { + for(String j : jars) { + addJar(j); + } + } + + public Transferable getFrom() { + return from; + } + + public void setFrom(Transferable from) { + this.from = from; + } + + public Transferable getTo() { + return to; + } + + public void setTo(Transferable to) { + this.to = to; + } + + public Object getConnectorLinkConfig(Direction type) { + switch(type) { + case FROM: + return fromConnectorLinkConfig; + + case TO: + return toConnectorLinkConfig; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public void setConnectorLinkConfig(Direction type, Object config) { + switch(type) { + case FROM: + fromConnectorLinkConfig = config; + break; + case TO: + toConnectorLinkConfig = config; + break; + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public Object getConnectorJobConfig(Direction type) { + switch(type) { + case FROM: + return fromConnectorJobConfig; + + case TO: + return toConnectorJobConfig; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public void setConnectorJobConfig(Direction type, Object config) { + switch(type) { + case FROM: + fromConnectorJobConfig = config; + break; + case TO: + toConnectorJobConfig = config; + break; + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public Object getFrameworkLinkConfig(Direction type) { + switch(type) { + case FROM: + return fromFrameworkLinkConfig; + + case TO: + return toFrameworkLinkConfig; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public void setFrameworkLinkConfig(Direction type, Object config) { + switch(type) { + case FROM: + fromFrameworkLinkConfig = config; + break; + case TO: + toFrameworkLinkConfig = config; + break; + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public Object getFrameworkJobConfig() { + return frameworkJobConfig; + } + + public void setFrameworkJobConfig(Object config) { + frameworkJobConfig = config; + } + + public MutableMapContext getConnectorContext(Direction type) { + switch(type) { + case FROM: + return fromConnectorContext; + + case TO: + return toConnectorContext; + + default: + throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); + } + } + + public MutableMapContext getDriverContext() { + return driverContext; + } + + public String getNotificationUrl() { + return notificationUrl; + } + + public void setNotificationUrl(String url) { + this.notificationUrl = url; + } + + public Integer getExtractors() { + return extractors; + } + + public void setExtractors(Integer extractors) { + this.extractors = extractors; + } + + public Integer getLoaders() { + return loaders; + } + + public void setLoaders(Integer loaders) { + this.loaders = loaders; + } + + public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() { + return intermediateDataFormat; + } + + public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) { + this.intermediateDataFormat = intermediateDataFormat; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java new file mode 100644 index 0000000..3a32e9f --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.sqoop.common.MapContext; +import org.apache.sqoop.submission.counter.Counters; +import org.apache.sqoop.submission.SubmissionStatus; + +/** + * Submission engine is responsible in conveying the information about the + * job instances (submissions) to remote (hadoop) cluster. + */ +public abstract class SubmissionEngine { + + /** + * Initialize submission engine + * + * @param context Configuration context + * @param prefix Submission engine prefix + */ + public void initialize(MapContext context, String prefix) { + } + + /** + * Destroy submission engine when stopping server + */ + public void destroy() { + } + + /** + * Callback to verify that configured submission engine and execution engine + * are compatible. + * + * @param executionEngineClass Configured execution class. + * @return True if such execution engine is supported + */ + public abstract boolean isExecutionEngineSupported(Class<?> executionEngineClass); + + /** + * Submit new job to remote (hadoop) cluster. This method *must* fill + * submission.getSummary.setExternalId(), otherwise Sqoop won't + * be able to track progress on this job! + * + * @return Return true if we were able to submit job to remote cluster. + */ + public abstract boolean submit(JobRequest submission); + + /** + * Hard stop for given submission. + * + * @param submissionId Submission internal id. + */ + public abstract void stop(String submissionId); + + /** + * Return status of given submission. + * + * @param submissionId Submission internal id. + * @return Current submission status. + */ + public abstract SubmissionStatus status(String submissionId); + + /** + * Return submission progress. + * + * Expected is number from interval <0, 1> denoting how far the processing + * has gone or -1 in case that this submission engine do not supports + * progress reporting. + * + * @param submissionId Submission internal id. + * @return {-1} union <0, 1> + */ + public double progress(String submissionId) { + return -1; + } + + /** + * Return statistics for given submission id. + * + * Sqoop will call counters only for submission in state SUCCEEDED, + * it's consider exceptional state to call this method for other states. + * + * @param submissionId Submission internal id. + * @return Submission statistics + */ + public Counters counters(String submissionId) { + return null; + } + + /** + * Return link to external web page with given submission. + * + * @param submissionId Submission internal id. + * @return Null in case that external page is not supported or available or + * HTTP link to given submission. + */ + public String externalLink(String submissionId) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java new file mode 100644 index 0000000..908a4eb --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver.configuration; + +import org.apache.sqoop.model.ConfigurationClass; +import org.apache.sqoop.model.Form; + +/** + * Representing the core job configuration + */ +@ConfigurationClass +public class JobConfiguration { + @Form + public ThrottlingForm throttling; + + public JobConfiguration() { + throttling = new ThrottlingForm(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java new file mode 100644 index 0000000..3202844 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver.configuration; + +import org.apache.sqoop.model.ConfigurationClass; + +/** + * Representing the core link configuration + */ +@ConfigurationClass +public class LinkConfiguration { + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java b/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java new file mode 100644 index 0000000..e73007e --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver.configuration; + +import org.apache.sqoop.model.FormClass; +import org.apache.sqoop.model.Input; + +/** + * Form to set up number of loaders and extractors + */ +@FormClass +public class ThrottlingForm { + + @Input public Integer extractors; + + @Input public Integer loaders; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java deleted file mode 100644 index 75b570d..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.sqoop.common.ImmutableContext; - -/** - * Execution engine drives execution of sqoop job. It's responsible - * for executing all defined steps in the import/export workflow. - * A successful job execution will be recorded in the job submission entity - */ -public abstract class ExecutionEngine { - - /** - * Initialize execution engine - * - * @param context Configuration context - * @parma prefix Execution engine prefix - */ - public void initialize(ImmutableContext context, String prefix) { - } - - /** - * Destroy execution engine when stopping server - */ - public void destroy() { - } - - /** - * Return new JobRequest class or any subclass if it's needed by - * execution and submission engine combination. - * - * @return new JobRequestobject - */ - public JobRequest createJobRequest() { - return new JobRequest(); - } - - /** - * Prepare given job request. - * - * @param request JobRequest - */ - public abstract void prepareJob(JobRequest request); -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java deleted file mode 100644 index 4293dce..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.sqoop.core.ConfigurationConstants; - -/** - * Constants that are used in framework module. - */ -public final class FrameworkConstants { - - // Sqoop configuration constants - - public static final String PREFIX_SUBMISSION_CONFIG = - ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission."; - - public static final String PREFIX_EXECUTION_CONFIG = - ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution."; - - public static final String SYSCFG_SUBMISSION_ENGINE = - PREFIX_SUBMISSION_CONFIG + "engine"; - - public static final String PREFIX_SUBMISSION_ENGINE_CONFIG = - SYSCFG_SUBMISSION_ENGINE + "."; - - public static final String PREFIX_SUBMISSION_PURGE_CONFIG = - PREFIX_SUBMISSION_CONFIG + "purge."; - - public static final String SYSCFG_SUBMISSION_PURGE_THRESHOLD = - PREFIX_SUBMISSION_PURGE_CONFIG + "threshold"; - - public static final String SYSCFG_SUBMISSION_PURGE_SLEEP = - PREFIX_SUBMISSION_PURGE_CONFIG + "sleep"; - - public static final String PREFIX_SUBMISSION_UPDATE_CONFIG = - PREFIX_SUBMISSION_CONFIG + "update."; - - public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP = - PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep"; - - public static final String SYSCFG_EXECUTION_ENGINE = - PREFIX_EXECUTION_CONFIG + "engine"; - - public static final String PREFIX_EXECUTION_ENGINE_CONFIG = - SYSCFG_EXECUTION_ENGINE + "."; - - // Bundle names - - public static final String RESOURCE_BUNDLE_NAME = "framework-resources"; - - private FrameworkConstants() { - // Instantiation of this class is prohibited - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java deleted file mode 100644 index 8ecb197..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.sqoop.common.ErrorCode; - -/** - * - */ -public enum FrameworkError implements ErrorCode { - - FRAMEWORK_0000("Metadata are not registered in repository"), - - FRAMEWORK_0001("Invalid submission engine"), - - FRAMEWORK_0002("Given job is already running"), - - FRAMEWORK_0003("Given job is not running"), - - FRAMEWORK_0004("Unknown job id"), - - FRAMEWORK_0005("Unsupported job type"), - - FRAMEWORK_0006("Can't bootstrap job"), - - FRAMEWORK_0007("Invalid execution engine"), - - FRAMEWORK_0008("Invalid combination of submission and execution engines"), - - FRAMEWORK_0009("Job has been disabled. Cannot submit this job."), - - FRAMEWORK_0010("Connection for this job has been disabled. Cannot submit this job."), - - FRAMEWORK_0011("Connector does not support direction. Cannot submit this job."), - - ; - - private final String message; - - private FrameworkError(String message) { - this.message = message; - } - - public String getCode() { - return name(); - } - - public String getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java deleted file mode 100644 index 81e1147..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.log4j.Logger; -import org.apache.sqoop.connector.spi.MetadataUpgrader; -import org.apache.sqoop.core.ConfigurationConstants; -import org.apache.sqoop.core.Reconfigurable; -import org.apache.sqoop.core.SqoopConfiguration; -import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.JobConfiguration; -import org.apache.sqoop.model.*; -import org.apache.sqoop.repository.RepositoryManager; -import org.apache.sqoop.validation.Validator; - -import java.util.Locale; -import java.util.ResourceBundle; - -/** - * Manager for Sqoop framework itself. - * - * All Sqoop internals are handled in this class: - * * Submission engine - * * Execution engine - * * Framework metadata - * - * Current implementation of entire submission engine is using repository - * for keeping track of running submissions. Thus, server might be restarted at - * any time without any affect on running jobs. This approach however might not - * be the fastest way and we might want to introduce internal structures for - * running jobs in case that this approach will be too slow. - */ -public class FrameworkManager implements Reconfigurable { - - /** - * Logger object. - */ - private static final Logger LOG = Logger.getLogger(FrameworkManager.class); - - /** - * Private instance to singleton of this class. - */ - private static FrameworkManager instance; - - /** - * Create default object by default. - * - * Every Sqoop server application needs one so this should not be performance issue. - */ - static { - instance = new FrameworkManager(); - } - - /** - * Return current instance. - * - * @return Current instance - */ - public static FrameworkManager getInstance() { - return instance; - } - - /** - * Allows to set instance in case that it's need. - * - * This method should not be normally used as the default instance should be sufficient. One target - * user use case for this method are unit tests. - * - * @param newInstance New instance - */ - public static void setInstance(FrameworkManager newInstance) { - instance = newInstance; - } - - /** - * Framework metadata structures in MForm format - */ - private MFramework mFramework; - - /** - * Validator instance - */ - private final Validator validator; - - /** - * Upgrader instance - */ - private final MetadataUpgrader upgrader; - - /** - * Default framework auto upgrade option value - */ - private static final boolean DEFAULT_AUTO_UPGRADE = false; - - public static final String CURRENT_FRAMEWORK_VERSION = "1"; - - public Class getJobConfigurationClass() { - return JobConfiguration.class; - } - - public Class getConnectionConfigurationClass() { - return ConnectionConfiguration.class; - } - - public FrameworkManager() { - MConnectionForms connectionForms = new MConnectionForms( - FormUtils.toForms(getConnectionConfigurationClass()) - ); - mFramework = new MFramework(connectionForms, new MJobForms(FormUtils.toForms(getJobConfigurationClass())), - CURRENT_FRAMEWORK_VERSION); - - // Build validator - validator = new FrameworkValidator(); - - // Build upgrader - upgrader = new FrameworkMetadataUpgrader(); - } - - public synchronized void initialize() { - initialize(SqoopConfiguration.getInstance().getContext().getBoolean(ConfigurationConstants.FRAMEWORK_AUTO_UPGRADE, DEFAULT_AUTO_UPGRADE)); - } - - public synchronized void initialize(boolean autoUpgrade) { - LOG.trace("Begin submission engine manager initialization"); - - // Register framework metadata in repository - mFramework = RepositoryManager.getInstance().getRepository().registerFramework(mFramework, autoUpgrade); - - SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this)); - - LOG.info("Submission manager initialized: OK"); - } - - public synchronized void destroy() { - LOG.trace("Begin submission engine manager destroy"); - } - - public Validator getValidator() { - return validator; - } - - public MetadataUpgrader getMetadataUpgrader() { - return upgrader; - } - - public MFramework getFramework() { - return mFramework; - } - - public ResourceBundle getBundle(Locale locale) { - return ResourceBundle.getBundle( - FrameworkConstants.RESOURCE_BUNDLE_NAME, locale); - } - - @Override - public void configurationChanged() { - LOG.info("Begin framework manager reconfiguring"); - // If there are configuration options for FrameworkManager, - // implement the reconfiguration procedure right here. - LOG.info("Framework manager reconfigured"); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java deleted file mode 100644 index 2437fa6..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.spi.MetadataUpgrader; -import org.apache.sqoop.model.MConnectionForms; -import org.apache.sqoop.model.MForm; -import org.apache.sqoop.model.MInput; -import org.apache.sqoop.model.MJobForms; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class FrameworkMetadataUpgrader extends MetadataUpgrader{ - - private static final Logger LOG = Logger.getLogger(FrameworkMetadataUpgrader.class); - - @Override - public void upgrade(MConnectionForms original, - MConnectionForms upgradeTarget) { - doUpgrade(original.getForms(), upgradeTarget.getForms()); - } - - @Override - public void upgrade(MJobForms original, MJobForms upgradeTarget) { - doUpgrade(original.getForms(), upgradeTarget.getForms()); - - } - - @SuppressWarnings("unchecked") - private void doUpgrade(List<MForm> original, List<MForm> target) { - // Easier to find the form in the original forms list if we use a map. - // Since the constructor of MJobForms takes a list, - // index is not guaranteed to be the same, so we need to look for - // equivalence - Map<String, MForm> formMap = new HashMap<String, MForm>(); - for (MForm form : original) { - formMap.put(form.getName(), form); - } - for (MForm form : target) { - List<MInput<?>> inputs = form.getInputs(); - MForm originalForm = formMap.get(form.getName()); - if(originalForm == null) { - LOG.warn("Form: " + form.getName() + " not present in old " + - "framework metadata. So it will not be transferred by the upgrader."); - continue; - } - - for (MInput input : inputs) { - try { - MInput originalInput = originalForm.getInput(input.getName()); - input.setValue(originalInput.getValue()); - } catch (SqoopException ex) { - LOG.warn("Input: " + input.getName() + " not present in old " + - "framework metadata. So it will not be transferred by the upgrader."); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java deleted file mode 100644 index 46257f2..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.JobConfiguration; -import org.apache.sqoop.framework.configuration.ThrottlingForm; -import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.Validation; -import org.apache.sqoop.validation.Validator; - -public class FrameworkValidator extends Validator { - @Override - public Validation validateConnection(Object connectionConfiguration) { - Validation validation = new Validation(ConnectionConfiguration.class); - // No validation on connection object - return validation; - } - - @Override - public Validation validateJob(Object jobConfiguration) { - Validation validation = new Validation(JobConfiguration.class); - JobConfiguration conf = (JobConfiguration)jobConfiguration; - validateThrottlingForm(validation,conf.throttling); - - return validation; - }; - - private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) { - if(throttling.extractors != null && throttling.extractors < 1) { - validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor"); - } - - if(throttling.loaders != null && throttling.loaders < 1) { - validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader"); - } - } - -}
