http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java deleted file mode 100644 index 8149d1c..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ /dev/null @@ -1,710 +0,0 @@ - - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.log4j.Logger; -import org.apache.sqoop.common.Direction; -import org.apache.sqoop.common.MapContext; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.ConnectorManager; -import org.apache.sqoop.framework.configuration.JobConfiguration; -import org.apache.sqoop.request.HttpEventContext; -import org.apache.sqoop.connector.idf.IntermediateDataFormat; -import org.apache.sqoop.connector.spi.SqoopConnector; -import org.apache.sqoop.core.Reconfigurable; -import org.apache.sqoop.core.SqoopConfiguration; -import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; -import org.apache.sqoop.job.etl.*; -import org.apache.sqoop.model.FormUtils; -import org.apache.sqoop.model.MConnection; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.model.MSubmission; -import org.apache.sqoop.repository.Repository; -import org.apache.sqoop.repository.RepositoryManager; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.submission.SubmissionStatus; -import org.apache.sqoop.submission.counter.Counters; -import org.apache.sqoop.utils.ClassUtils; -import org.json.simple.JSONValue; - -import java.util.Date; -import java.util.List; - -public class JobManager implements Reconfigurable { - /** - * Logger object. - */ - private static final Logger LOG = Logger.getLogger(JobManager.class); - - /** - * Private instance to singleton of this class. - */ - private static JobManager instance; - /** - * Create default object by default. - * - * Every Sqoop server application needs one so this should not be performance - * issue. - */ - static { - instance = new JobManager(); - } - - /** - * Return current instance. - * - * @return Current instance - */ - public static JobManager getInstance() { - return instance; - } - - /** - * Allows to set instance in case that it's need. - * - * This method should not be normally used as the default instance should be - * sufficient. One target user use case for this method are unit tests. - * - * @param newInstance - * New instance - */ - public static void setInstance(JobManager newInstance) { - instance = newInstance; - } - - /** - * Default interval for purging old submissions from repository. - */ - private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000; - - /** - * Default sleep interval for purge thread. - */ - private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000; - - /** - * Default interval for update thread. - */ - private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000; - - /** - * Configured submission engine instance - */ - private SubmissionEngine submissionEngine; - - /** - * Configured execution engine instance - */ - private ExecutionEngine executionEngine; - - /** - * Purge thread that will periodically remove old submissions from repository. - */ - private PurgeThread purgeThread = null; - - /** - * Update thread that will periodically check status of running submissions. - */ - private UpdateThread updateThread = null; - - /** - * Synchronization variable between threads. - */ - private boolean running = true; - - /** - * Specifies how old submissions should be removed from repository. - */ - private long purgeThreshold; - - /** - * Number of milliseconds for purge thread to sleep. - */ - private long purgeSleep; - - /** - * Number of milliseconds for update thread to slepp. - */ - private long updateSleep; - - /** - * Base notification URL. - * - * Framework manager will always add job id. - */ - private String notificationBaseUrl; - - /** - * Set notification base URL. - * - * @param url - * Base URL - */ - public void setNotificationBaseUrl(String url) { - LOG.debug("Setting notification base URL to " + url); - notificationBaseUrl = url; - } - - /** - * Get base notification url. - * - * @return String representation of the URL - */ - public String getNotificationBaseUrl() { - return notificationBaseUrl; - } - - public synchronized void destroy() { - LOG.trace("Begin submission engine manager destroy"); - - running = false; - - try { - purgeThread.interrupt(); - purgeThread.join(); - } catch (InterruptedException e) { - // TODO(jarcec): Do I want to wait until it actually finish here? - LOG.error("Interrupted joining purgeThread"); - } - - try { - updateThread.interrupt(); - updateThread.join(); - } catch (InterruptedException e) { - // TODO(jarcec): Do I want to wait until it actually finish here? - LOG.error("Interrupted joining updateThread"); - } - - if (submissionEngine != null) { - submissionEngine.destroy(); - } - - if (executionEngine != null) { - executionEngine.destroy(); - } - } - - public synchronized void initialize() { - LOG.trace("Begin submission engine manager initialization"); - MapContext context = SqoopConfiguration.getInstance().getContext(); - - // Let's load configured submission engine - String submissionEngineClassName = - context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); - - submissionEngine = (SubmissionEngine) ClassUtils - .instantiate(submissionEngineClassName); - if (submissionEngine == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0001, - submissionEngineClassName); - } - - submissionEngine.initialize(context, - FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG); - - // Execution engine - String executionEngineClassName = - context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); - - executionEngine = (ExecutionEngine) ClassUtils - .instantiate(executionEngineClassName); - if (executionEngine == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0007, - executionEngineClassName); - } - - // We need to make sure that user has configured compatible combination of - // submission engine and execution engine - if (!submissionEngine - .isExecutionEngineSupported(executionEngine.getClass())) { - throw new SqoopException(FrameworkError.FRAMEWORK_0008); - } - - executionEngine.initialize(context, - FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG); - - // Set up worker threads - purgeThreshold = context.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, - DEFAULT_PURGE_THRESHOLD - ); - purgeSleep = context.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, - DEFAULT_PURGE_SLEEP - ); - - purgeThread = new PurgeThread(); - purgeThread.start(); - - updateSleep = context.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, - DEFAULT_UPDATE_SLEEP - ); - - updateThread = new UpdateThread(); - updateThread.start(); - - SqoopConfiguration.getInstance().getProvider() - .registerListener(new CoreConfigurationListener(this)); - - LOG.info("Submission manager initialized: OK"); - } - - public MSubmission submit(long jobId, HttpEventContext ctx) { - - MSubmission mSubmission = createJobSubmission(ctx, jobId); - JobRequest jobRequest = createJobRequest(jobId, mSubmission); - // Bootstrap job to execute - prepareJob(jobRequest); - // Make sure that this job id is not currently running and submit the job - // only if it's not. - synchronized (getClass()) { - MSubmission lastSubmission = RepositoryManager.getInstance().getRepository() - .findSubmissionLastForJob(jobId); - if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0002, "Job with id " + jobId); - } - // TODO(Abe): Call multiple destroyers. - // TODO(jarcec): We might need to catch all exceptions here to ensure - // that Destroyer will be executed in all cases. - // NOTE: the following is a blocking call - boolean success = submissionEngine.submit(jobRequest); - if (!success) { - destroySubmission(jobRequest); - mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT); - } - RepositoryManager.getInstance().getRepository().createSubmission(mSubmission); - } - return mSubmission; - } - - private JobRequest createJobRequest(long jobId, MSubmission submission) { - // get job - MJob job = getJob(jobId); - - // get from/to connections for the job - MConnection fromConnection = getConnection(job.getConnectionId(Direction.FROM)); - MConnection toConnection = getConnection(job.getConnectionId(Direction.TO)); - - // get from/to connectors for the connection - SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId()); - validateSupportedDirection(fromConnector, Direction.FROM); - SqoopConnector toConnector = getConnector(toConnection.getConnectorId()); - validateSupportedDirection(toConnector, Direction.TO); - - // Transform config to fromConnector specific classes - Object fromConnectionConfig = ClassUtils.instantiate(fromConnector - .getConnectionConfigurationClass()); - FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig); - - // Transform config to toConnector specific classes - Object toConnectorConfig = ClassUtils - .instantiate(toConnector.getConnectionConfigurationClass()); - FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig); - - Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM)); - FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob); - - Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO)); - FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob); - - // Transform framework specific configs - // Q(VB) : Aren't the following 2 exactly the same? - Object fromFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance() - .getConnectionConfigurationClass()); - FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromFrameworkConnection); - - Object toFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance() - .getConnectionConfigurationClass()); - FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toFrameworkConnection); - - Object frameworkJob = ClassUtils.instantiate(FrameworkManager.getInstance() - .getJobConfigurationClass()); - FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob); - - // Create a job request for submit/execution - JobRequest jobRequest = executionEngine.createJobRequest(); - // Save important variables to the job request - jobRequest.setSummary(submission); - jobRequest.setConnector(Direction.FROM, fromConnector); - jobRequest.setConnector(Direction.TO, toConnector); - jobRequest.setConnectorConnectionConfig(Direction.FROM, fromConnectionConfig); - jobRequest.setConnectorConnectionConfig(Direction.TO, toConnectorConfig); - jobRequest.setConnectorJobConfig(Direction.FROM, fromJob); - jobRequest.setConnectorJobConfig(Direction.TO, toJob); - // TODO(Abe): Should we actually have 2 different Framework Connection config objects? - jobRequest.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection); - jobRequest.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection); - jobRequest.setConfigFrameworkJob(frameworkJob); - jobRequest.setJobName(job.getName()); - jobRequest.setJobId(job.getPersistenceId()); - jobRequest.setNotificationUrl(notificationBaseUrl + jobId); - Class<? extends IntermediateDataFormat<?>> dataFormatClass = - fromConnector.getIntermediateDataFormat(); - jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat()); - - - jobRequest.setFrom(fromConnector.getFrom()); - jobRequest.setTo(toConnector.getTo()); - - addStandardJars(jobRequest); - addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass); - addConnectorInitializerJars(jobRequest, Direction.FROM); - addConnectorInitializerJars(jobRequest, Direction.TO); - - Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM); - Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO); - - // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378 - if (fromSchema != null) { - jobRequest.getSummary().setFromSchema(fromSchema); - } - else { - jobRequest.getSummary().setFromSchema(toSchema); - } - LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo()); - return jobRequest; - } - - private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector, - SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) { - jobRequest.addJarForClass(fromConnector.getClass()); - jobRequest.addJarForClass(toConnector.getClass()); - jobRequest.addJarForClass(dataFormatClass); - } - - private void addStandardJars(JobRequest jobRequest) { - // Let's register all important jars - // sqoop-common - jobRequest.addJarForClass(MapContext.class); - // sqoop-core - jobRequest.addJarForClass(FrameworkManager.class); - // sqoop-spi - jobRequest.addJarForClass(SqoopConnector.class); - // Execution engine jar - jobRequest.addJarForClass(executionEngine.getClass()); - // Extra libraries that Sqoop code requires - jobRequest.addJarForClass(JSONValue.class); - } - - MSubmission createJobSubmission(HttpEventContext ctx, long jobId) { - MSubmission summary = new MSubmission(jobId); - summary.setCreationUser(ctx.getUsername()); - summary.setLastUpdateUser(ctx.getUsername()); - return summary; - } - - SqoopConnector getConnector(long connnectorId) { - return ConnectorManager.getInstance().getConnector(connnectorId); - } - - void validateSupportedDirection(SqoopConnector connector, Direction direction) { - // Make sure that connector supports the given direction - if (!connector.getSupportedDirections().contains(direction)) { - throw new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: " - + connector.getClass().getCanonicalName()); - } - } - - MConnection getConnection(long connectionId) { - MConnection connection = RepositoryManager.getInstance().getRepository() - .findConnection(connectionId); - if (!connection.getEnabled()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: " - + connection.getPersistenceId()); - } - return connection; - } - - MJob getJob(long jobId) { - MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId); - if (job == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: " + jobId); - } - - if (!job.getEnabled()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: " + job.getPersistenceId()); - } - return job; - } - - private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) { - - Initializer initializer = getConnectorInitializer(jobRequest, direction); - - // Initializer context - InitializerContext initializerContext = getInitializerContext(jobRequest, direction); - - // Initialize submission from the connector perspective - initializer.initialize(initializerContext, jobRequest.getConnectorConnectionConfig(direction), - jobRequest.getConnectorJobConfig(direction)); - - // TODO(Abe): Alter behavior of Schema here. - return initializer.getSchema(initializerContext, - jobRequest.getConnectorConnectionConfig(direction), - jobRequest.getConnectorJobConfig(direction)); - } - - private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) { - - Initializer initializer = getConnectorInitializer(jobRequest, direction); - InitializerContext initializerContext = getInitializerContext(jobRequest, direction); - // Add job specific jars to - jobRequest.addJars(initializer.getJars(initializerContext, - jobRequest.getConnectorConnectionConfig(direction), - jobRequest.getConnectorJobConfig(direction))); - } - - private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) { - Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo(); - Class<? extends Initializer> initializerClass = transferable.getInitializer(); - Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass); - - if (initializer == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create connector initializer instance: " + initializerClass.getName()); - } - return initializer; - } - - private InitializerContext getInitializerContext(JobRequest jobRequest, Direction direction) { - return new InitializerContext(jobRequest.getConnectorContext(direction)); - } - - void prepareJob(JobRequest request) { - JobConfiguration jobConfiguration = (JobConfiguration) request.getConfigFrameworkJob(); - // We're directly moving configured number of extractors and loaders to - // underlying request object. In the future we might need to throttle this - // count based on other running jobs to meet our SLAs. - request.setExtractors(jobConfiguration.throttling.extractors); - request.setLoaders(jobConfiguration.throttling.loaders); - - // Delegate rest of the job to execution engine - executionEngine.prepareJob(request); - } - - /** - * Callback that will be called only if we failed to submit the job to the - * remote cluster. - */ - void destroySubmission(JobRequest request) { - Transferable from = request.getFrom(); - Transferable to = request.getTo(); - - Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer(); - Class<? extends Destroyer> toDestroyerClass = to.getDestroyer(); - Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass); - Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass); - - if (fromDestroyer == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create toDestroyer instance: " + fromDestroyerClass.getName()); - } - - if (toDestroyer == null) { - throw new SqoopException(FrameworkError.FRAMEWORK_0006, - "Can't create toDestroyer instance: " + toDestroyerClass.getName()); - } - - // TODO(Abe): Update context to manage multiple connectors. As well as summary. - DestroyerContext fromDestroyerContext = new DestroyerContext( - request.getConnectorContext(Direction.FROM), false, request.getSummary() - .getFromSchema()); - DestroyerContext toDestroyerContext = new DestroyerContext( - request.getConnectorContext(Direction.TO), false, request.getSummary() - .getToSchema()); - - // destroy submission from connector perspective - fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(Direction.FROM), - request.getConnectorJobConfig(Direction.FROM)); - toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(Direction.TO), - request.getConnectorJobConfig(Direction.TO)); - } - - public MSubmission stop(long jobId, HttpEventContext ctx) { - - Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission mSubmission = repository.findSubmissionLastForJob(jobId); - - if (mSubmission == null || !mSubmission.getStatus().isRunning()) { - throw new SqoopException(FrameworkError.FRAMEWORK_0003, "Job with id " + jobId - + " is not running"); - } - submissionEngine.stop(mSubmission.getExternalId()); - - mSubmission.setLastUpdateUser(ctx.getUsername()); - - // Fetch new information to verify that the stop command has actually worked - update(mSubmission); - - // Return updated structure - return mSubmission; - } - - public MSubmission status(long jobId) { - Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission mSubmission = repository.findSubmissionLastForJob(jobId); - - if (mSubmission == null) { - return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); - } - - // If the submission is in running state, let's update it - if (mSubmission.getStatus().isRunning()) { - update(mSubmission); - } - - return mSubmission; - } - - private void update(MSubmission submission) { - double progress = -1; - Counters counters = null; - String externalId = submission.getExternalId(); - SubmissionStatus newStatus = submissionEngine.status(externalId); - String externalLink = submissionEngine.externalLink(externalId); - - if (newStatus.isRunning()) { - progress = submissionEngine.progress(externalId); - } else { - counters = submissionEngine.counters(externalId); - } - - submission.setStatus(newStatus); - submission.setProgress(progress); - submission.setCounters(counters); - submission.setExternalLink(externalLink); - submission.setLastUpdateDate(new Date()); - - RepositoryManager.getInstance().getRepository() - .updateSubmission(submission); - } - - @Override - public synchronized void configurationChanged() { - LOG.info("Begin submission engine manager reconfiguring"); - MapContext newContext = SqoopConfiguration.getInstance().getContext(); - MapContext oldContext = SqoopConfiguration.getInstance().getOldContext(); - - String newSubmissionEngineClassName = newContext - .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); - if (newSubmissionEngineClassName == null - || newSubmissionEngineClassName.trim().length() == 0) { - throw new SqoopException(FrameworkError.FRAMEWORK_0001, - newSubmissionEngineClassName); - } - - String oldSubmissionEngineClassName = oldContext - .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE); - if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) { - LOG.warn("Submission engine cannot be replaced at the runtime. " + - "You might need to restart the server."); - } - - String newExecutionEngineClassName = newContext - .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); - if (newExecutionEngineClassName == null - || newExecutionEngineClassName.trim().length() == 0) { - throw new SqoopException(FrameworkError.FRAMEWORK_0007, - newExecutionEngineClassName); - } - - String oldExecutionEngineClassName = oldContext - .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE); - if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) { - LOG.warn("Execution engine cannot be replaced at the runtime. " + - "You might need to restart the server."); - } - - // Set up worker threads - purgeThreshold = newContext.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, - DEFAULT_PURGE_THRESHOLD - ); - purgeSleep = newContext.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, - DEFAULT_PURGE_SLEEP - ); - purgeThread.interrupt(); - - updateSleep = newContext.getLong( - FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, - DEFAULT_UPDATE_SLEEP - ); - updateThread.interrupt(); - - LOG.info("Submission engine manager reconfigured."); - } - - private class PurgeThread extends Thread { - public PurgeThread() { - super("PurgeThread"); - } - - public void run() { - LOG.info("Starting submission manager purge thread"); - - while (running) { - try { - LOG.info("Purging old submissions"); - Date threshold = new Date((new Date()).getTime() - purgeThreshold); - RepositoryManager.getInstance().getRepository() - .purgeSubmissions(threshold); - Thread.sleep(purgeSleep); - } catch (InterruptedException e) { - LOG.debug("Purge thread interrupted", e); - } - } - - LOG.info("Ending submission manager purge thread"); - } - } - - private class UpdateThread extends Thread { - public UpdateThread() { - super("UpdateThread"); - } - - public void run() { - LOG.info("Starting submission manager update thread"); - - while (running) { - try { - LOG.debug("Updating running submissions"); - - // Let's get all running submissions from repository to check them out - List<MSubmission> unfinishedSubmissions = - RepositoryManager.getInstance().getRepository() - .findSubmissionsUnfinished(); - - for (MSubmission submission : unfinishedSubmissions) { - update(submission); - } - - Thread.sleep(updateSleep); - } catch (InterruptedException e) { - LOG.debug("Purge thread interrupted", e); - } - } - - LOG.info("Ending submission manager update thread"); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/JobRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java deleted file mode 100644 index 1f77693..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java +++ /dev/null @@ -1,356 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.sqoop.common.Direction; -import org.apache.sqoop.common.DirectionError; -import org.apache.sqoop.common.MutableMapContext; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.idf.IntermediateDataFormat; -import org.apache.sqoop.connector.spi.SqoopConnector; -import org.apache.sqoop.job.etl.Transferable; -import org.apache.sqoop.model.MSubmission; -import org.apache.sqoop.utils.ClassUtils; - -import java.util.LinkedList; -import java.util.List; - -/** - * Submission details class is used when creating new submission and contains - * all information that we need to create a new submission (including mappers, - * reducers, ...). - */ -public class JobRequest { - - /** - * Submission summary - */ - MSubmission summary; - - /** - * Original job name - */ - String jobName; - - /** - * Associated job (from metadata perspective) id - */ - long jobId; - - /** - * Connector instances associated with this submission request - */ - SqoopConnector fromConnector; - SqoopConnector toConnector; - - /** - * List of required local jars for the job - */ - List<String> jars; - - /** - * From entity - */ - Transferable from; - - /** - * To entity - */ - Transferable to; - - /** - * All configuration objects - */ - Object fromConnectorConnectionConfig; - Object toConnectorConnectionConfig; - Object fromConnectorJobConfig; - Object toConnectorJobConfig; - Object fromFrameworkConnectionConfig; - Object toFrameworkConnectionConfig; - Object configFrameworkJob; - - /** - * Connector context (submission specific configuration) - */ - MutableMapContext fromConnectorContext; - MutableMapContext toConnectorContext; - - /** - * Framework context (submission specific configuration) - */ - MutableMapContext frameworkContext; - - /** - * Optional notification URL for job progress - */ - String notificationUrl; - - /** - * Number of extractors - */ - Integer extractors; - - /** - * Number of loaders - */ - Integer loaders; - - /** - * The intermediate data format this submission should use. - */ - Class<? extends IntermediateDataFormat> intermediateDataFormat; - - public JobRequest() { - this.jars = new LinkedList<String>(); - this.fromConnectorContext = new MutableMapContext(); - this.toConnectorContext = new MutableMapContext(); - this.frameworkContext = new MutableMapContext(); - this.fromConnector = null; - this.toConnector = null; - this.fromConnectorConnectionConfig = null; - this.toConnectorConnectionConfig = null; - this.fromConnectorJobConfig = null; - this.toConnectorJobConfig = null; - this.fromFrameworkConnectionConfig = null; - this.toFrameworkConnectionConfig = null; - } - - public MSubmission getSummary() { - return summary; - } - - public void setSummary(MSubmission summary) { - this.summary = summary; - } - - public String getJobName() { - return jobName; - } - - public void setJobName(String jobName) { - this.jobName = jobName; - } - - public long getJobId() { - return jobId; - } - - public void setJobId(long jobId) { - this.jobId = jobId; - } - - public SqoopConnector getConnector(Direction type) { - switch(type) { - case FROM: - return fromConnector; - - case TO: - return toConnector; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public void setConnector(Direction type, SqoopConnector connector) { - switch(type) { - case FROM: - fromConnector = connector; - break; - - case TO: - toConnector = connector; - break; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public List<String> getJars() { - return jars; - } - - public void addJar(String jar) { - if(!jars.contains(jar)) { - jars.add(jar); - } - } - - public void addJarForClass(Class klass) { - addJar(ClassUtils.jarForClass(klass)); - } - - public void addJars(List<String> jars) { - for(String j : jars) { - addJar(j); - } - } - - public Transferable getFrom() { - return from; - } - - public void setFrom(Transferable from) { - this.from = from; - } - - public Transferable getTo() { - return to; - } - - public void setTo(Transferable to) { - this.to = to; - } - - public Object getConnectorConnectionConfig(Direction type) { - switch(type) { - case FROM: - return fromConnectorConnectionConfig; - - case TO: - return toConnectorConnectionConfig; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public void setConnectorConnectionConfig(Direction type, Object config) { - switch(type) { - case FROM: - fromConnectorConnectionConfig = config; - break; - case TO: - toConnectorConnectionConfig = config; - break; - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public Object getConnectorJobConfig(Direction type) { - switch(type) { - case FROM: - return fromConnectorJobConfig; - - case TO: - return toConnectorJobConfig; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public void setConnectorJobConfig(Direction type, Object config) { - switch(type) { - case FROM: - fromConnectorJobConfig = config; - break; - case TO: - toConnectorJobConfig = config; - break; - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public Object getFrameworkConnectionConfig(Direction type) { - switch(type) { - case FROM: - return fromFrameworkConnectionConfig; - - case TO: - return toFrameworkConnectionConfig; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public void setFrameworkConnectionConfig(Direction type, Object config) { - switch(type) { - case FROM: - fromFrameworkConnectionConfig = config; - break; - case TO: - toFrameworkConnectionConfig = config; - break; - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public Object getConfigFrameworkJob() { - return configFrameworkJob; - } - - public void setConfigFrameworkJob(Object config) { - configFrameworkJob = config; - } - - public MutableMapContext getConnectorContext(Direction type) { - switch(type) { - case FROM: - return fromConnectorContext; - - case TO: - return toConnectorContext; - - default: - throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); - } - } - - public MutableMapContext getFrameworkContext() { - return frameworkContext; - } - - public String getNotificationUrl() { - return notificationUrl; - } - - public void setNotificationUrl(String url) { - this.notificationUrl = url; - } - - public Integer getExtractors() { - return extractors; - } - - public void setExtractors(Integer extractors) { - this.extractors = extractors; - } - - public Integer getLoaders() { - return loaders; - } - - public void setLoaders(Integer loaders) { - this.loaders = loaders; - } - - public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() { - return intermediateDataFormat; - } - - public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) { - this.intermediateDataFormat = intermediateDataFormat; - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java deleted file mode 100644 index 732be3b..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework; - -import org.apache.sqoop.common.MapContext; -import org.apache.sqoop.submission.counter.Counters; -import org.apache.sqoop.submission.SubmissionStatus; - -/** - * Submission engine is responsible in conveying the information about the - * job instances (submissions) to remote (hadoop) cluster. - */ -public abstract class SubmissionEngine { - - /** - * Initialize submission engine - * - * @param context Configuration context - * @param prefix Submission engine prefix - */ - public void initialize(MapContext context, String prefix) { - } - - /** - * Destroy submission engine when stopping server - */ - public void destroy() { - } - - /** - * Callback to verify that configured submission engine and execution engine - * are compatible. - * - * @param executionEngineClass Configured execution class. - * @return True if such execution engine is supported - */ - public abstract boolean isExecutionEngineSupported(Class executionEngineClass); - - /** - * Submit new job to remote (hadoop) cluster. This method *must* fill - * submission.getSummary.setExternalId(), otherwise Sqoop framework won't - * be able to track progress on this job! - * - * @return Return true if we were able to submit job to remote cluster. - */ - public abstract boolean submit(JobRequest submission); - - /** - * Hard stop for given submission. - * - * @param submissionId Submission internal id. - */ - public abstract void stop(String submissionId); - - /** - * Return status of given submission. - * - * @param submissionId Submission internal id. - * @return Current submission status. - */ - public abstract SubmissionStatus status(String submissionId); - - /** - * Return submission progress. - * - * Expected is number from interval <0, 1> denoting how far the processing - * has gone or -1 in case that this submission engine do not supports - * progress reporting. - * - * @param submissionId Submission internal id. - * @return {-1} union <0, 1> - */ - public double progress(String submissionId) { - return -1; - } - - /** - * Return statistics for given submission id. - * - * Sqoop framework will call counters only for submission in state SUCCEEDED, - * it's consider exceptional state to call this method for other states. - * - * @param submissionId Submission internal id. - * @return Submission statistics - */ - public Counters counters(String submissionId) { - return null; - } - - /** - * Return link to external web page with given submission. - * - * @param submissionId Submission internal id. - * @return Null in case that external page is not supported or available or - * HTTP link to given submission. - */ - public String externalLink(String submissionId) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java deleted file mode 100644 index 897d3c7..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Form; - -/** - * Framework class representing connection configuration - */ -@ConfigurationClass -public class ConnectionConfiguration { - - @Form SecurityForm security = new SecurityForm(); -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java deleted file mode 100644 index 0abc611..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Form; - -@ConfigurationClass -public class JobConfiguration { - @Form - public ThrottlingForm throttling; - - public JobConfiguration() { - throttling = new ThrottlingForm(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java deleted file mode 100644 index 8ab50ed..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -/** - * Security form - */ -@FormClass -public class SecurityForm { - @Input public Integer maxConnections; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java deleted file mode 100644 index c435f6b..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -/** - * Form to set up number of loaders and extractors - */ -@FormClass -public class ThrottlingForm { - - @Input public Integer extractors; - - @Input public Integer loaders; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index fa119a5..3466116 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -23,9 +23,9 @@ import java.util.List; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.model.MConnection; +import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MConnector; -import org.apache.sqoop.model.MFramework; +import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MSubmission; @@ -43,13 +43,13 @@ public class JdbcRepository extends Repository { } /** - * Private interface to wrap specific code that requires fresh connection to - * repository with general code that will get the connection and handle + * Private interface to wrap specific code that requires fresh link to + * repository with general code that will get the link and handle * exceptions. */ private interface DoWithConnection { /** - * Do what is needed to be done with given connection object. + * Do what is needed to be done with given link object. * * @param conn Connection to metadata repository. * @return Arbitrary value @@ -62,7 +62,7 @@ public class JdbcRepository extends Repository { } /** - * Handle transaction and connection functionality and delegate action to + * Handle transaction and link functionality and delegate action to * given delegator. * * @param delegator Code for specific action @@ -77,7 +77,7 @@ public class JdbcRepository extends Repository { boolean shouldCloseTxn = false; try { - // Get transaction and connection + // Get transaction and link Connection conn; if (tx == null) { tx = getTransaction(); @@ -205,6 +205,7 @@ public class JdbcRepository extends Repository { /** * {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public List<MConnector> findConnectors() { return (List<MConnector>) doWithConnection(new DoWithConnection() { @@ -219,24 +220,24 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public MFramework registerFramework(final MFramework mFramework, final boolean autoUpgrade) { - return (MFramework) doWithConnection(new DoWithConnection() { + public MDriverConfig registerDriverConfig(final MDriverConfig mDriverConfig, final boolean autoUpgrade) { + return (MDriverConfig) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - MFramework result = handler.findFramework(conn); + MDriverConfig result = handler.findDriverConfig(conn); if (result == null) { - handler.registerFramework(mFramework, conn); - return mFramework; + handler.registerDriverConfig(mDriverConfig, conn); + return mDriverConfig; } else { - // We're currently not serializing framework version into repository + // We're currently not serializing version into repository // so let's just compare the structure to see if we need upgrade. - if(!mFramework.equals(result)) { + if(!mDriverConfig.equals(result)) { if (autoUpgrade) { - upgradeFramework(mFramework); - return mFramework; + upgradeDriverConfig(mDriverConfig); + return mDriverConfig; } else { throw new SqoopException(RepositoryError.JDBCREPO_0026, - "Framework: " + mFramework.getPersistenceId()); + "DriverConfig: " + mDriverConfig.getPersistenceId()); } } return result; @@ -249,15 +250,15 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public void createConnection(final MConnection connection) { + public void createLink(final MLink link) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(connection.hasPersistenceId()) { + if(link.hasPersistenceId()) { throw new SqoopException(RepositoryError.JDBCREPO_0015); } - handler.createConnection(connection, conn); + handler.createLink(link, conn); return null; } }); @@ -267,28 +268,27 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public void updateConnection(final MConnection connection) { - updateConnection(connection, null); + public void updateLink(final MLink link) { + updateLink(link, null); } /** * {@inheritDoc} */ @Override - public void updateConnection(final MConnection connection, - RepositoryTransaction tx) { + public void updateLink(final MLink link, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(!connection.hasPersistenceId()) { + if (!link.hasPersistenceId()) { throw new SqoopException(RepositoryError.JDBCREPO_0016); } - if(!handler.existsConnection(connection.getPersistenceId(), conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0017, - "Invalid id: " + connection.getPersistenceId()); + if (!handler.existsLink(link.getPersistenceId(), conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + + link.getPersistenceId()); } - handler.updateConnection(connection, conn); + handler.updateLink(link, conn); return null; } }, (JdbcRepositoryTransaction) tx); @@ -298,16 +298,16 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public void enableConnection(final long connectionId, final boolean enabled) { + public void enableLink(final long linkId, final boolean enabled) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(!handler.existsConnection(connectionId, conn)) { + if(!handler.existsLink(linkId, conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0017, - "Invalid id: " + connectionId); + "Invalid id: " + linkId); } - handler.enableConnection(connectionId, enabled, conn); + handler.enableLink(linkId, enabled, conn); return null; } }); @@ -317,20 +317,20 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public void deleteConnection(final long connectionId) { + public void deleteLink(final long linkId) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(!handler.existsConnection(connectionId, conn)) { + if(!handler.existsLink(linkId, conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0017, - "Invalid id: " + connectionId); + "Invalid id: " + linkId); } - if(handler.inUseConnection(connectionId, conn)) { + if(handler.inUseLink(linkId, conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0021, - "Id in use: " + connectionId); + "Id in use: " + linkId); } - handler.deleteConnection(connectionId, conn); + handler.deleteLink(linkId, conn); return null; } }); @@ -340,11 +340,11 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public MConnection findConnection(final long connectionId) { - return (MConnection) doWithConnection(new DoWithConnection() { + public MLink findLink(final long connectionId) { + return (MLink) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - return handler.findConnection(connectionId, conn); + return handler.findLink(connectionId, conn); } }); } @@ -354,11 +354,11 @@ public class JdbcRepository extends Repository { */ @SuppressWarnings("unchecked") @Override - public List<MConnection> findConnections() { - return (List<MConnection>) doWithConnection(new DoWithConnection() { + public List<MLink> findLinks() { + return (List<MLink>) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - return handler.findConnections(conn); + return handler.findLinks(conn); } }); } @@ -601,12 +601,12 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public List<MConnection> findConnectionsForConnector(final long + public List<MLink> findLinksForConnector(final long connectorID) { - return (List<MConnection>) doWithConnection(new DoWithConnection() { + return (List<MLink>) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - return handler.findConnectionsForConnector(connectorID, conn); + return handler.findLinksForConnector(connectorID, conn); } }); } @@ -637,12 +637,11 @@ public class JdbcRepository extends Repository { } @Override - protected void deleteConnectionInputs(final long connectionID, - RepositoryTransaction tx) { + protected void deleteLinkInputs(final long linkId, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.deleteConnectionInputs(connectionID, conn); + handler.deleteLinkInputs(linkId, conn); return null; } }, (JdbcRepositoryTransaction) tx); @@ -665,12 +664,11 @@ public class JdbcRepository extends Repository { } - protected void updateFramework(final MFramework mFramework, - RepositoryTransaction tx) { + protected void updateDriverConfig(final MDriverConfig mDriverConfig, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.updateFramework(mFramework, conn); + handler.updateDriverConfig(mDriverConfig, conn); return null; } }, (JdbcRepositoryTransaction) tx); http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index 4de3134..a743491 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -21,9 +21,9 @@ import java.sql.Connection; import java.util.Date; import java.util.List; -import org.apache.sqoop.model.MConnection; +import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MConnector; -import org.apache.sqoop.model.MFramework; +import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MSubmission; @@ -41,11 +41,10 @@ public abstract class JdbcRepositoryHandler { /** * Search for connector with given name in repository. - * - * And return corresponding metadata structure. + * And return corresponding connector structure. * * @param shortName Connector unique name - * @param conn JDBC connection for querying repository. + * @param conn JDBC link for querying repository. * @return null if connector is not yet registered in repository or * loaded representation. */ @@ -65,26 +64,26 @@ public abstract class JdbcRepositoryHandler { * already registered or present in the repository. * * @param mc Connector that should be registered. - * @param conn JDBC connection for querying repository. + * @param conn JDBC link for querying repository. */ public abstract void registerConnector(MConnector mc, Connection conn); /** - * Retrieve connections which use the given connector. - * @param connectorID Connector ID whose connections should be fetched - * @param conn JDBC connection for querying repository - * @return List of MConnections that use <code>connectorID</code>. + * Retrieve links which use the given connector. + * @param connectorID Connector ID whose links should be fetched + * @param conn JDBC link for querying repository + * @return List of MLinks that use <code>connectorID</code>. */ - public abstract List<MConnection> findConnectionsForConnector(long + public abstract List<MLink> findLinksForConnector(long connectorID, Connection conn); /** - * Retrieve jobs which use the given connection. + * Retrieve jobs which use the given link. * * @param connectorID Connector ID whose jobs should be fetched - * @param conn JDBC connection for querying repository - * @return List of MJobs that use <code>connectionID</code>. + * @param conn JDBC link for querying repository + * @return List of MJobs that use <code>linkID</code>. */ public abstract List<MJob> findJobsForConnector(long connectorID, Connection conn); @@ -99,47 +98,47 @@ public abstract class JdbcRepositoryHandler { * * @param mConnector The new data to be inserted into the repository for * this connector. - * @param conn JDBC connection for querying repository + * @param conn JDBC link for querying repository */ public abstract void updateConnector(MConnector mConnector, Connection conn); /** - * Update the framework with the new data supplied in the - * <tt>mFramework</tt>. + * Update the driverConfig with the new data supplied in the + * <tt>mDriverConfig</tt>. * Also Update all forms in the repository - * with the forms specified in <tt>mFramework</tt>. <tt>mFramework </tt> must + * with the forms specified in <tt>mDriverConfig</tt>. <tt>mDriverConfig </tt> must * minimally have the connectorID and all required forms (including ones * which may not have changed). After this operation the repository is * guaranteed to only have the new forms specified in this object. * - * @param mFramework The new data to be inserted into the repository for - * the framework. - * @param conn JDBC connection for querying repository + * @param mDriverConfig The new data to be inserted into the repository for + * the driverConfig. + * @param conn JDBC link for querying repository */ - public abstract void updateFramework(MFramework mFramework, Connection conn); + public abstract void updateDriverConfig(MDriverConfig mDriverConfig, Connection conn); /** - * Search for framework metadata in the repository. + * Search for driverConfigin the repository. * - * @param conn JDBC connection for querying repository. - * @return null if framework metadata are not yet present in repository or + * @param conn JDBC link for querying repository. + * @return null if driverConfig are not yet present in repository or * loaded representation. */ - public abstract MFramework findFramework(Connection conn); + public abstract MDriverConfig findDriverConfig(Connection conn); /** - * Register framework metadata in repository. + * Register driver config in repository. * - * Save framework metadata into repository. Metadata should not be already + * Save driver config into repository. Driver config should not be already * registered or present in the repository. * - * @param mf Framework metadata that should be registered. - * @param conn JDBC connection for querying repository. + * @param driverConfig Driver config that should be registered. + * @param conn JDBC link for querying repository. */ - public abstract void registerFramework(MFramework mf, Connection conn); + public abstract void registerDriverConfig(MDriverConfig driverConfig, Connection conn); /** * Return true if repository tables exists and are suitable for use. @@ -169,95 +168,92 @@ public abstract class JdbcRepositoryHandler { public abstract void shutdown(); /** - * Specify query that Sqoop framework can use to validate connection to + * Specify query that Sqoop can use to validate link to * repository. This query should return at least one row. * * @return Query or NULL in case that this repository do not support or do not - * want to validate live connections. + * want to validate live links. */ public abstract String validationQuery(); /** - * Save given connection to repository. This connection must not be already + * Save given link to repository. This link must not be already * present in the repository otherwise exception will be thrown. * - * @param connection Connection object to serialize into repository. - * @param conn Connection to metadata repository + * @param link Link object to serialize into repository. + * @param conn Connection to the repository */ - public abstract void createConnection(MConnection connection, - Connection conn); + public abstract void createLink(MLink link, Connection conn); /** - * Update given connection representation in repository. This connection + * Update given link representation in repository. This link * object must already exists in the repository otherwise exception will be * thrown. * - * @param connection Connection object that should be updated in repository. - * @param conn Connection to metadata repository + * @param link Link object that should be updated in repository. + * @param conn Connection to the repository */ - public abstract void updateConnection(MConnection connection, - Connection conn); + public abstract void updateLink(MLink link, Connection conn); /** - * Check if given connection exists in metastore. + * Check if given link exists in repository. * - * @param connetionId Connection id - * @param conn Connection to metadata repository - * @return True if the connection exists + * @param linkId Link id + * @param conn Connection to the repository + * @return True if the link exists */ - public abstract boolean existsConnection(long connetionId, Connection conn); + public abstract boolean existsLink(long linkId, Connection conn); /** * Check if given Connection id is referenced somewhere and thus can't * be removed. * - * @param connectionId Connection id - * @param conn Connection to metadata repository + * @param linkId Link id + * @param conn Connection to the repository * @return */ - public abstract boolean inUseConnection(long connectionId, Connection conn); + public abstract boolean inUseLink(long linkId, Connection conn); /** - * Enable or disable connection with given id from metadata repository + * Enable or disable link with given id from the repository * - * @param connectionId Connection object that is going to be enabled or disabled + * @param linkId Link object that is going to be enabled or disabled * @param enabled Enable or disable - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ - public abstract void enableConnection(long connectionId, boolean enabled, Connection conn); + public abstract void enableLink(long linkId, boolean enabled, Connection conn); /** - * Delete connection with given id from metadata repository. + * Delete link with given id from the repository. * - * @param connectionId Connection object that should be removed from repository - * @param conn Connection to metadata repository + * @param linkId Link object that should be removed from repository + * @param conn Connection to the repository */ - public abstract void deleteConnection(long connectionId, Connection conn); + public abstract void deleteLink(long linkId, Connection conn); /** - * Delete the input values for the connection with given id from the + * Delete the input values for the link with given id from the * repository. - * @param id Connection object whose inputs should be removed from repository - * @param conn Connection to metadata repository + * @param id Link object whose inputs should be removed from repository + * @param conn Connection to the repository */ - public abstract void deleteConnectionInputs(long id, Connection conn); + public abstract void deleteLinkInputs(long id, Connection conn); /** - * Find connection with given id in repository. + * Find link with given id in repository. * - * @param connectionId Connection id - * @param conn Connection to metadata repository - * @return Deserialized form of the connection that is saved in repository + * @param linkId Link id + * @param conn Connection to the repository + * @return Deserialized form of the link that is saved in repository */ - public abstract MConnection findConnection(long connectionId, - Connection conn); + public abstract MLink findLink(long linkId, Connection conn); /** - * Get all connection objects. + * Get all link objects. * - * @param conn Connection to metadata repository - * @return List will all saved connection objects + * @param conn Connection to the repository + * @return List will all saved link objects */ - public abstract List<MConnection> findConnections(Connection conn); + public abstract List<MLink> findLinks(Connection conn); /** @@ -265,7 +261,7 @@ public abstract class JdbcRepositoryHandler { * present in the repository otherwise exception will be thrown. * * @param job Job object to serialize into repository. - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ public abstract void createJob(MJob job, Connection conn); @@ -275,15 +271,15 @@ public abstract class JdbcRepositoryHandler { * thrown. * * @param job Job object that should be updated in repository. - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ public abstract void updateJob(MJob job, Connection conn); /** - * Check if given job exists in metastore. + * Check if given job exists in the repository. * * @param jobId Job id - * @param conn Connection to metadata repository + * @param conn Connection to the repository * @return True if the job exists */ public abstract boolean existsJob(long jobId, Connection conn); @@ -293,7 +289,7 @@ public abstract class JdbcRepositoryHandler { * be removed. * * @param jobId Job id - * @param conn Connection to metadata repository + * @param conn Connection to the repository * @return */ public abstract boolean inUseJob(long jobId, Connection conn); @@ -303,22 +299,22 @@ public abstract class JdbcRepositoryHandler { * * @param jobId Job id * @param enabled Enable or disable - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ public abstract void enableJob(long jobId, boolean enabled, Connection conn); /** * Delete the input values for the job with given id from the repository. * @param id Job object whose inputs should be removed from repository - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ public abstract void deleteJobInputs(long id, Connection conn); /** - * Delete job with given id from metadata repository. This method will + * Delete job with given id from the repository. This method will * delete all inputs for this job also. * * @param jobId Job object that should be removed from repository - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ public abstract void deleteJob(long jobId, Connection conn); @@ -326,7 +322,7 @@ public abstract class JdbcRepositoryHandler { * Find job with given id in repository. * * @param jobId Job id - * @param conn Connection to metadata repository + * @param conn Connection to the repository * @return Deserialized form of the job that is present in the repository */ public abstract MJob findJob(long jobId, Connection conn); @@ -334,7 +330,7 @@ public abstract class JdbcRepositoryHandler { /** * Get all job objects. * - * @param conn Connection to metadata repository + * @param conn Connection to the repository * @return List will all saved job objects */ public abstract List<MJob> findJobs(Connection conn); @@ -343,16 +339,15 @@ public abstract class JdbcRepositoryHandler { * Save given submission in repository. * * @param submission Submission object - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ - public abstract void createSubmission(MSubmission submission, - Connection conn); + public abstract void createSubmission(MSubmission submission, Connection conn); /** * Check if submission with given id already exists in repository. * * @param submissionId Submission internal id - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ public abstract boolean existsSubmission(long submissionId, Connection conn); @@ -360,39 +355,38 @@ public abstract class JdbcRepositoryHandler { * Update given submission in repository. * * @param submission Submission object - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ - public abstract void updateSubmission(MSubmission submission, - Connection conn); + public abstract void updateSubmission(MSubmission submission, Connection conn); /** * Remove submissions older then threshold from repository. * * @param threshold Threshold date - * @param conn Connection to metadata repository + * @param conn Connection to the repository */ public abstract void purgeSubmissions(Date threshold, Connection conn); /** * Return list of unfinished submissions (as far as repository is concerned). * - * @param conn Connection to metadata repository + * @param conn Connection to the repository * @return List of unfinished submissions. */ public abstract List<MSubmission> findSubmissionsUnfinished(Connection conn); /** - * Return list of all submissions from metadata repository. + * Return list of all submissions from the repository. * - * @param conn Connection to metadata repository + * @param conn Connection to the repository * @return List of all submissions. */ public abstract List<MSubmission> findSubmissions(Connection conn); /** - * Return list of submissions from metadata repository for given jobId. + * Return list of submissions from the repository for given jobId. * @param jobId Job id - * @param conn Connection to metadata repository + * @param conn Connection to the repository * @return List of submissions */ public abstract List<MSubmission> findSubmissionsForJob(long jobId, Connection conn); @@ -401,7 +395,7 @@ public abstract class JdbcRepositoryHandler { * Find last submission for given jobId. * * @param jobId Job id - * @param conn Connection to metadata repository + * @param conn Connection to the repository * @return Most recent submission */ public abstract MSubmission findSubmissionLastForJob(long jobId,