SQOOP-2573: Sqoop2: Use object name instead of object id for job, link and connector in Sqoop 2 server
(Dian Fu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a3c37472 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a3c37472 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a3c37472 Branch: refs/heads/sqoop2 Commit: a3c374723f7e55423be820c58a5bdc4127c0c414 Parents: f241f82 Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Sep 17 07:38:53 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Sep 17 07:38:53 2015 -0700 ---------------------------------------------------------------------- .../sqoop/connector/ConnectorManager.java | 20 +- .../org/apache/sqoop/driver/JobManager.java | 12 +- .../apache/sqoop/repository/JdbcRepository.java | 191 ++++------------- .../sqoop/repository/JdbcRepositoryHandler.java | 131 ++++-------- .../org/apache/sqoop/repository/Repository.java | 126 +++-------- .../sqoop/repository/RepositoryError.java | 12 +- .../sqoop/repository/TestJdbcRepository.java | 156 +++++++------- .../common/CommonRepositoryHandler.java | 209 ++++--------------- ...RepositoryInsertUpdateDeleteSelectQuery.java | 77 ++++--- .../sqoop/repository/derby/TestJobHandling.java | 173 ++------------- .../repository/derby/TestLinkHandling.java | 74 ++----- .../derby/TestSubmissionHandling.java | 10 +- .../repository/mysql/TestJobHandling.java | 34 +-- .../repository/mysql/TestLinkHandling.java | 40 ++-- .../mysql/TestSubmissionHandling.java | 6 +- .../repository/postgresql/TestJobHandling.java | 32 +-- .../repository/postgresql/TestLinkHandling.java | 39 ++-- .../postgresql/TestSubmissionHandling.java | 6 +- .../sqoop/handler/ConnectorRequestHandler.java | 7 +- .../apache/sqoop/handler/JobRequestHandler.java | 20 +- .../sqoop/handler/LinkRequestHandler.java | 26 ++- .../sqoop/handler/SubmissionRequestHandler.java | 3 +- 22 files changed, 426 insertions(+), 978 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/a3c37472/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java index 5f269aa..b0a6841 100644 --- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java +++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java @@ -127,30 +127,34 @@ public class ConnectorManager implements Reconfigurable { } public ResourceBundle getResourceBundle(long connectorId, Locale locale) { - ConnectorHandler handler = handlerMap.get(idToNameMap.get(connectorId)); + return getResourceBundle(idToNameMap.get(connectorId), locale); + } + + public ResourceBundle getResourceBundle(String connectorName, Locale locale) { + ConnectorHandler handler = handlerMap.get(connectorName); return handler.getSqoopConnector().getBundle(locale); } public MConnector getConnectorConfigurable(long connectorId) { - ConnectorHandler handler = handlerMap.get(idToNameMap.get(connectorId)); - if (handler == null) { + String connectorName = idToNameMap.get(connectorId); + if (connectorName == null) { throw new SqoopException(CommonRepositoryError.COMMON_0057, "Couldn't find" - + " connector with id " + connectorId); + + " connector with id " + connectorId); } - return handler.getConnectorConfigurable(); + return getConnectorConfigurable(connectorName); } public MConnector getConnectorConfigurable(String connectorName) { ConnectorHandler handler = handlerMap.get(connectorName); if (handler == null) { - return null; + throw new SqoopException(CommonRepositoryError.COMMON_0057, "Couldn't find" + + " connector with name " + connectorName); } return handler.getConnectorConfigurable(); } public SqoopConnector getSqoopConnector(long connectorId) { - ConnectorHandler handler = handlerMap.get(idToNameMap.get(connectorId)); - return handler.getSqoopConnector(); + return getSqoopConnector(idToNameMap.get(connectorId)); } public SqoopConnector getSqoopConnector(String uniqueName) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/a3c37472/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index dc90a0e..923df0d 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -277,7 +277,7 @@ public class JobManager implements Reconfigurable { LOG.info("Submission manager initialized: OK"); } - public MSubmission start(long jobId, HttpEventContext ctx) { + public MSubmission start(long jobId, String jobName, HttpEventContext ctx) { MSubmission mSubmission = createJobSubmission(ctx, jobId); JobRequest jobRequest = createJobRequest(jobId, mSubmission); @@ -287,7 +287,7 @@ public class JobManager implements Reconfigurable { // only if it's not. synchronized (JobManager.class) { MSubmission lastSubmission = RepositoryManager.getInstance().getRepository() - .findLastSubmissionForJob(jobId); + .findLastSubmissionForJob(jobName); if (lastSubmission != null && lastSubmission.getStatus().isRunning()) { throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId); } @@ -606,10 +606,10 @@ public class JobManager implements Reconfigurable { request.getJobConfig(Direction.TO)); } - public MSubmission stop(long jobId, HttpEventContext ctx) { + public MSubmission stop(long jobId, String jobName, HttpEventContext ctx) { Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission mSubmission = repository.findLastSubmissionForJob(jobId); + MSubmission mSubmission = repository.findLastSubmissionForJob(jobName); if (mSubmission == null || !mSubmission.getStatus().isRunning()) { throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId @@ -626,9 +626,9 @@ public class JobManager implements Reconfigurable { return mSubmission; } - public MSubmission status(long jobId) { + public MSubmission status(long jobId, String jobName) { Repository repository = RepositoryManager.getInstance().getRepository(); - MSubmission mSubmission = repository.findLastSubmissionForJob(jobId); + MSubmission mSubmission = repository.findLastSubmissionForJob(jobName); if (mSubmission == null) { return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED); http://git-wip-us.apache.org/repos/asf/sqoop/blob/a3c37472/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index d7aa8ef..0bddf5b 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -23,8 +23,6 @@ import java.util.List; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.model.MConfig; -import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MJob; @@ -307,9 +305,9 @@ public class JdbcRepository extends Repository { if (!link.hasPersistenceId()) { throw new SqoopException(RepositoryError.JDBCREPO_0016); } - if (!handler.existsLink(link.getPersistenceId(), conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " - + link.getPersistenceId()); + if (!handler.existsLink(link.getName(), conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid name: " + + link.getName()); } handler.updateLink(link, conn); @@ -322,16 +320,16 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public void enableLink(final long linkId, final boolean enabled) { + public void enableLink(final String linkName, final boolean enabled) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(!handler.existsLink(linkId, conn)) { + if(!handler.existsLink(linkName, conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0017, - "Invalid id: " + linkId); + "Invalid name: " + linkName); } - handler.enableLink(linkId, enabled, conn); + handler.enableLink(linkName, enabled, conn); return null; } }); @@ -341,20 +339,20 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public void deleteLink(final long linkId) { + public void deleteLink(final String linkName) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(!handler.existsLink(linkId, conn)) { + if(!handler.existsLink(linkName, conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0017, - "Invalid id: " + linkId); + "Invalid name: " + linkName); } - if(handler.inUseLink(linkId, conn)) { + if(handler.inUseLink(linkName, conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0021, - "Id in use: " + linkId); + "Name in use: " + linkName); } - handler.deleteLink(linkId, conn); + handler.deleteLink(linkName, conn); return null; } }); @@ -405,11 +403,11 @@ public class JdbcRepository extends Repository { */ @SuppressWarnings("unchecked") @Override - public List<MLink> findLinksForConnector(final long connectorId) { + public List<MLink> findLinksForConnector(final String connectorName) { return (List<MLink>) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - return handler.findLinksForConnector(connectorId, conn); + return handler.findLinksForConnector(connectorName, conn); } }); } @@ -451,7 +449,7 @@ public class JdbcRepository extends Repository { if(!job.hasPersistenceId()) { throw new SqoopException(RepositoryError.JDBCREPO_0019); } - if(!handler.existsJob(job.getPersistenceId(), conn)) { + if(!handler.existsJob(job.getName(), conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + job.getPersistenceId()); } @@ -466,16 +464,16 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public void enableJob(final long id, final boolean enabled) { + public void enableJob(final String jobName, final boolean enabled) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if(!handler.existsJob(id, conn)) { + if(!handler.existsJob(jobName, conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0020, - "Invalid id: " + id); + "Invalid name: " + jobName); } - handler.enableJob(id, enabled, conn); + handler.enableJob(jobName, enabled, conn); return null; } }); @@ -485,18 +483,18 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public void deleteJob(final long id) { + public void deleteJob(final String jobName) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if (!handler.existsJob(id, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + id); + if (!handler.existsJob(jobName, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobName); } - if (handler.inUseJob(id, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0022, "Id in use: " + id); + if (handler.inUseJob(jobName, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0022, "Name in use: " + jobName); } - handler.deleteJob(id, conn); + handler.deleteJob(jobName, conn); return null; } }); @@ -643,15 +641,15 @@ public class JdbcRepository extends Repository { */ @SuppressWarnings("unchecked") @Override - public List<MSubmission> findSubmissionsForJob(final long jobId) { + public List<MSubmission> findSubmissionsForJob(final String jobName) { return (List<MSubmission>) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - if(!handler.existsJob(jobId, conn)) { + if(!handler.existsJob(jobName, conn)) { throw new SqoopException(RepositoryError.JDBCREPO_0020, - "Invalid id: " + jobId); + "Invalid name: " + jobName); } - return handler.findSubmissionsForJob(jobId, conn); + return handler.findSubmissionsForJob(jobName, conn); } }); } @@ -660,148 +658,35 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - public MSubmission findLastSubmissionForJob(final long jobId) { + public MSubmission findLastSubmissionForJob(final String jobName) { return (MSubmission) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - if (!handler.existsJob(jobId, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); + if (!handler.existsJob(jobName, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid name: " + jobName); } - return handler.findLastSubmissionForJob(jobId, conn); + return handler.findLastSubmissionForJob(jobName, conn); } }); } - /** - * {@inheritDoc} - */ - @Override - public MConfig findFromJobConfig(final long jobId, final String configName) { - return (MConfig) doWithConnection(new DoWithConnection() { - @Override - public Object doIt(Connection conn) { - if (!handler.existsJob(jobId, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); - } - return handler.findFromJobConfig(jobId, configName, conn); - } - }); - } - - /** - * {@inheritDoc} - */ - @Override - public MConfig findToJobConfig(final long jobId, final String configName) { - return (MConfig) doWithConnection(new DoWithConnection() { - @Override - public Object doIt(Connection conn) { - if (!handler.existsJob(jobId, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); - } - return handler.findToJobConfig(jobId, configName, conn); - } - }); - } - - /** - * {@inheritDoc} - */ - @Override - public MConfig findDriverJobConfig(final long jobId, final String configName) { - return (MConfig) doWithConnection(new DoWithConnection() { - @Override - public Object doIt(Connection conn) { - if (!handler.existsJob(jobId, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); - } - return handler.findDriverJobConfig(jobId, configName, conn); - } - }); - } - - /** - * {@inheritDoc} - */ - @Override - public MConfig findLinkConfig(final long linkId, final String configName) { - return (MConfig) doWithConnection(new DoWithConnection() { - @Override - public Object doIt(Connection conn) { - if (!handler.existsLink(linkId, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + linkId); - } - return handler.findLinkConfig(linkId, configName, conn); - } - }); - } - - - /** - * {@inheritDoc} - */ - @Override - public void updateJobConfig(final long jobId, final MConfig config, final MConfigUpdateEntityType type) { - updateJobConfig(jobId, config, type, null); - } - /** - * {@inheritDoc} - */ - @Override - public void updateJobConfig(final long jobId, final MConfig config, final MConfigUpdateEntityType type, RepositoryTransaction tx) { - doWithConnection(new DoWithConnection() { - @Override - public Object doIt(Connection conn) { - if (!handler.existsJob(jobId, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); - } - handler.updateJobConfig(jobId, config, type, conn); - return null; - } - }, (JdbcRepositoryTransaction) tx); - } - - /** - * {@inheritDoc} - */ - @Override - public void updateLinkConfig(final long linkId, final MConfig config, final MConfigUpdateEntityType type) { - updateLinkConfig(linkId, config, type, null); - } - /** - * {@inheritDoc} - */ - @Override - public void updateLinkConfig(final long linkId, final MConfig config, final MConfigUpdateEntityType type, RepositoryTransaction tx) { - doWithConnection(new DoWithConnection() { - @Override - public Object doIt(Connection conn) { - if (!handler.existsLink(linkId, conn)) { - throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + linkId); - } - handler.updateLinkConfig(linkId, config, type, conn); - return null; - } - }, (JdbcRepositoryTransaction) tx); - } - @Override - protected void deleteJobInputs(final long jobID, RepositoryTransaction tx) { + protected void deleteJobInputs(final String jobName, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.deleteJobInputs(jobID, conn); + handler.deleteJobInputs(jobName, conn); return null; } }, (JdbcRepositoryTransaction) tx); } @Override - protected void deleteLinkInputs(final long linkId, RepositoryTransaction tx) { + protected void deleteLinkInputs(final String linkName, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.deleteLinkInputs(linkId, conn); + handler.deleteLinkInputs(linkName, conn); return null; } }, (JdbcRepositoryTransaction) tx); http://git-wip-us.apache.org/repos/asf/sqoop/blob/a3c37472/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index 5cbeda8..b4c3d9b 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -21,8 +21,6 @@ import java.sql.Connection; import java.util.Date; import java.util.List; -import org.apache.sqoop.model.MConfig; -import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MJob; @@ -84,11 +82,11 @@ public abstract class JdbcRepositoryHandler { /** * Retrieve links which use the given connector. - * @param connectorId Connector ID whose links should be fetched + * @param connectorName Connector name whose links should be fetched * @param conn JDBC link for querying repository * @return List of MLinks that use <code>connectorID</code>. */ - public abstract List<MLink> findLinksForConnector(long connectorId, Connection conn); + public abstract List<MLink> findLinksForConnector(String connectorName, Connection conn); /** * Retrieve jobs which use the given link. @@ -209,46 +207,46 @@ public abstract class JdbcRepositoryHandler { /** * Check if given link exists in repository. * - * @param linkId Link id + * @param linkName Link name * @param conn Connection to the repository * @return True if the link exists */ - public abstract boolean existsLink(long linkId, Connection conn); + public abstract boolean existsLink(String linkName, Connection conn); /** - * Check if given Connection id is referenced somewhere and thus can't + * Check if given link is referenced somewhere and thus can't * be removed. * - * @param linkId Link id + * @param linkName Link name * @param conn Connection to the repository * @return */ - public abstract boolean inUseLink(long linkId, Connection conn); + public abstract boolean inUseLink(String linkName, Connection conn); /** - * Enable or disable link with given id from the repository + * Enable or disable link with given name from the repository * - * @param linkId Link object that is going to be enabled or disabled + * @param linkName Link object that is going to be enabled or disabled * @param enabled Enable or disable * @param conn Connection to the repository */ - public abstract void enableLink(long linkId, boolean enabled, Connection conn); + public abstract void enableLink(String linkName, boolean enabled, Connection conn); /** - * Delete link with given id from the repository. + * Delete link with given name from the repository. * - * @param linkId Link object that should be removed from repository + * @param linkName Link object that should be removed from repository * @param conn Connection to the repository */ - public abstract void deleteLink(long linkId, Connection conn); + public abstract void deleteLink(String linkName, Connection conn); /** - * Delete the input values for the link with given id from the + * Delete the input values for the link with given name from the * repository. - * @param linkId Link object whose inputs should be removed from repository + * @param linkName Link object whose inputs should be removed from repository * @param conn Connection to the repository */ - protected abstract void deleteLinkInputs(long linkId, Connection conn); + protected abstract void deleteLinkInputs(String linkName, Connection conn); /** * Find link with given id in repository. @@ -299,45 +297,45 @@ public abstract class JdbcRepositoryHandler { /** * Check if given job exists in the repository. * - * @param jobId Job id + * @param jobName Job name * @param conn Connection to the repository * @return True if the job exists */ - public abstract boolean existsJob(long jobId, Connection conn); + public abstract boolean existsJob(String jobName, Connection conn); /** - * Check if given job id is referenced somewhere and thus can't + * Check if given job is referenced somewhere and thus can't * be removed. * - * @param jobId Job id + * @param jobName Job name * @param conn Connection to the repository * @return */ - public abstract boolean inUseJob(long jobId, Connection conn); + public abstract boolean inUseJob(String jobName, Connection conn); /** - * Enable or disable job with given id from the repository + * Enable or disable job with given name from the repository * - * @param jobId Job id + * @param jobName Job name * @param enabled Enable or disable * @param conn Connection to the repository */ - public abstract void enableJob(long jobId, boolean enabled, Connection conn); + public abstract void enableJob(String jobName, boolean enabled, Connection conn); /** - * Delete the input values for the job with given id from the repository. - * @param id Job object whose inputs should be removed from repository + * Delete the input values for the job with given name from the repository. + * @param jobName Job object whose inputs should be removed from repository * @param conn Connection to the repository */ - protected abstract void deleteJobInputs(long id, Connection conn); + protected abstract void deleteJobInputs(String jobName, Connection conn); /** - * Delete job with given id from the repository. This method will + * Delete job with given name from the repository. This method will * delete all inputs for this job also. * - * @param jobId Job object that should be removed from repository + * @param jobName Job object that should be removed from repository * @param conn Connection to the repository */ - public abstract void deleteJob(long jobId, Connection conn); + public abstract void deleteJob(String jobName, Connection conn); /** * Find job with given id in repository. @@ -414,77 +412,20 @@ public abstract class JdbcRepositoryHandler { public abstract List<MSubmission> findSubmissions(Connection conn); /** - * Return list of submissions from the repository for given jobId. - * @param jobId Job id + * Return list of submissions from the repository for given jobName. + * @param jobName Job name * @param conn Connection to the repository * @return List of submissions */ - public abstract List<MSubmission> findSubmissionsForJob(long jobId, Connection conn); + public abstract List<MSubmission> findSubmissionsForJob(String jobName, Connection conn); /** - * Find last submission for given jobId. + * Find last submission for given jobName. * - * @param jobId Job id + * @param jobName Job name * @param conn Connection to the repository * @return Most recent submission */ - public abstract MSubmission findLastSubmissionForJob(long jobId, Connection conn); - - /** - * fetch the job config for the FROM type for the given name - * @param jobId id of the job - * @param configName name of the config unique to this job and type - * @param conn Connection to the repository - * @return config object - */ - public abstract MConfig findFromJobConfig(long jobId, String configName, Connection con); - - - /** - * fetch the job config for the TO type for the given name - * @param jobId id of the job - * @param configName name of the config unique to this job and type - * @param conn Connection to the repository - * @return config object - */ - public abstract MConfig findToJobConfig(long jobId, String configName, Connection con); - - - /** - * fetch the job config for the DRIVER type for the given name - * @param jobId id of the job - * @param configName name of the config unique to this job and type - * @param conn Connection to the repository - * @return config object - */ - public abstract MConfig findDriverJobConfig(long jobId, String configName, Connection con); - - - /** - * fetch the link config for the link type for the given name - * @param linkId id of the link - * @param configName name of the config unique to this link and type - * @param conn Connection to the repository - * @return config object - */ - public abstract MConfig findLinkConfig(long linkId, String configName, Connection con); - - /** - * Update the config object for the job - * @param jobId id of the job - * @param config name of the config - * @param type entity type updating the link config - * @param conn Connection to the repository - */ - public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType type, Connection con); - - /** - * Update the config object for the link - * @param linkId id of the link - * @param config name of the config - * @param type entity type updating the link config - * @param conn Connection to the repository - */ - public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType type, Connection con); + public abstract MSubmission findLastSubmissionForJob(String jobName, Connection conn); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a3c37472/core/src/main/java/org/apache/sqoop/repository/Repository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 10af697..c15c96d 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -34,7 +34,6 @@ import org.apache.sqoop.driver.DriverUpgrader; import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.MConfig; -import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MDriverConfig; @@ -169,19 +168,19 @@ public abstract class Repository { public abstract void updateLink(final MLink link, RepositoryTransaction tx); /** - * Enable or disable Link with given id from the repository + * Enable or disable Link with given name from the repository * - * @param id Link object that is going to be enabled or disabled + * @param linkName Link object that is going to be enabled or disabled * @param enabled enable or disable */ - public abstract void enableLink(long id, boolean enabled); + public abstract void enableLink(String linkName, boolean enabled); /** - * Delete Link with given id from the repository. + * Delete Link with given name from the repository. * - * @param id Link object that should be removed from repository + * @param linkName Link object that should be removed from repository */ - public abstract void deleteLink(long id); + public abstract void deleteLink(String linkName); /** * Find link with given id in repository. @@ -201,10 +200,10 @@ public abstract class Repository { /** * Retrieve links which use the given connector. - * @param connectorId Connector id whose links should be fetched + * @param connectorName Connector name whose links should be fetched * @return List of MLink that use <code>connectorId</code>. */ - public abstract List<MLink> findLinksForConnector(long connectorId); + public abstract List<MLink> findLinksForConnector(String connectorName); /** * Get all Link objects. @@ -242,19 +241,19 @@ public abstract class Repository { public abstract void updateJob(MJob job, RepositoryTransaction tx); /** - * Enable or disable job with given id from entity repository + * Enable or disable job with given name from entity repository * - * @param id Job object that is going to be enabled or disabled + * @param jobName Job object that is going to be enabled or disabled * @param enabled Enable or disable */ - public abstract void enableJob(long id, boolean enabled); + public abstract void enableJob(String jobName, boolean enabled); /** - * Delete job with given id from entity repository. + * Delete job with given name from entity repository. * - * @param id Job id that should be removed + * @param jobName Job name that should be removed */ - public abstract void deleteJob(long id); + public abstract void deleteJob(String jobName); /** * Find job object with given id. @@ -323,90 +322,19 @@ public abstract class Repository { public abstract List<MSubmission> findSubmissions(); /** - * Return all submissions for given jobId. + * Return all submissions for given jobName. * * @return List of of submissions */ - public abstract List<MSubmission> findSubmissionsForJob(long jobId); + public abstract List<MSubmission> findSubmissionsForJob(String jobName); /** - * Find last submission for given jobId. + * Find last submission for given jobName. * - * @param jobId Job id + * @param jobName Job name * @return Most recent submission */ - public abstract MSubmission findLastSubmissionForJob(long jobId); - - /** - * fetch the job config for the FROM type for the given name - * @param jobId id of the job - * @param configName name of the config unique to this job and type - * @return config object - */ - public abstract MConfig findFromJobConfig(long jobId, String configName); - - - /** - * fetch the job config for the TO type for the given name - * @param jobId id of the job - * @param configName name of the config unique to this job and type - * @return config object - */ - public abstract MConfig findToJobConfig(long jobId, String configName); - - - /** - * fetch the job config for the DRIVER type for the given name - * @param jobId id of the job - * @param configName name of the config unique to this job and type - * @return config object - */ - public abstract MConfig findDriverJobConfig(long jobId, String configName); - - - /** - * fetch the link config for the link type for the given name - * @param linkId id of the link - * @param configName name of the config unique to this link and type - * @return config object - */ - public abstract MConfig findLinkConfig(long linkId, String configName); - - - /** - * Update the config object for the job - * @param jobId id of the job - * @param config name of the config - * @param updateEntityType entity type updating the link config - */ - public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType updateEntityType); - - /** - * Update the config object for the job - * @param jobId id of the job - * @param config name of the config - * @param updateEntityType entity type updating the link config - * @param tx database transaction - */ - public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType updateEntityType, RepositoryTransaction tx); - - - /** - * Update the config object for the link - * @param linkId id of the link - * @param config name of the config - * @param updateEntityType entity type updating the link config - */ - public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType updateEntityType); - - /** - * Update the config object for the link - * @param linkId id of the link - * @param config name of the config - * @param updateEntityType entity type updating the link config - * @param tx database transaction - */ - public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType updateEntityType, RepositoryTransaction tx); + public abstract MSubmission findLastSubmissionForJob(String jobName); /*********************Configurable Upgrade APIs ******************************/ @@ -449,40 +377,40 @@ public abstract class Repository { /** * Delete all inputs for a job - * @param jobId The id of the job whose inputs are to be deleted. + * @param jobName The name of the job whose inputs are to be deleted. * @param tx A transaction on the repository. This * method will not call <code>begin, commit, * rollback or close on this transaction.</code> */ - protected abstract void deleteJobInputs(long jobId, RepositoryTransaction tx); + protected abstract void deleteJobInputs(String jobName, RepositoryTransaction tx); /** * Delete all inputs for a link - * @param linkId The id of the link whose inputs are to be + * @param linkName The name of the link whose inputs are to be * deleted. * @param tx The repository transaction to use to push the data to the * repository. If this is null, a new transaction will be created. * method will not call begin, commit, * rollback or close on this transaction. */ - protected abstract void deleteLinkInputs(long linkId, RepositoryTransaction tx); + protected abstract void deleteLinkInputs(String linkName, RepositoryTransaction tx); private void deletelinksAndJobInputs(List<MLink> links, List<MJob> jobs, RepositoryTransaction tx) { if (jobs != null) { for (MJob job : jobs) { - deleteJobInputs(job.getPersistenceId(), tx); + deleteJobInputs(job.getName(), tx); } } if (links != null) { for (MLink link : links) { - deleteLinkInputs(link.getPersistenceId(), tx); + deleteLinkInputs(link.getName(), tx); } } } private void deleteJobInputsOnly(List<MJob> jobs, RepositoryTransaction tx) { for (MJob job : jobs) { - deleteJobInputs(job.getPersistenceId(), tx); + deleteJobInputs(job.getName(), tx); } } @@ -513,7 +441,7 @@ public abstract class Repository { // 1. Get an upgrader for the connector ConnectorConfigurableUpgrader upgrader = connector.getConfigurableUpgrader(); // 2. Get all links associated with the connector. - List<MLink> existingLinksByConnector = findLinksForConnector(connectorId); + List<MLink> existingLinksByConnector = findLinksForConnector(connectorName); // 3. Get all jobs associated with the connector. List<MJob> existingJobsByConnector = findJobsForConnector(connectorId); // -- BEGIN TXN -- http://git-wip-us.apache.org/repos/asf/sqoop/blob/a3c37472/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java index f684e85..90cd78a 100644 --- a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java +++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java @@ -91,8 +91,8 @@ public enum RepositoryError implements ErrorCode { /** link that we're trying to update is not yet saved **/ JDBCREPO_0016("Cannot update link that was not yet created"), - /** Invalid link id **/ - JDBCREPO_0017("Given link id is invalid"), + /** Invalid link name **/ + JDBCREPO_0017("Given link name is invalid"), /** Job that we're trying to create is already saved in repository **/ JDBCREPO_0018("Cannot create job that was already created"), @@ -100,11 +100,11 @@ public enum RepositoryError implements ErrorCode { /** Job that we're trying to update is not yet saved **/ JDBCREPO_0019("Cannot update job that was not yet created"), - /** Invalid job id **/ - JDBCREPO_0020("Given job id is invalid"), + /** Invalid job name **/ + JDBCREPO_0020("Given job name is invalid"), - /** link ID is in use **/ - JDBCREPO_0021("Given link id is in use"), + /** link name is in use **/ + JDBCREPO_0021("Given link name is in use"), /** Job ID is in use **/ JDBCREPO_0022("Given job id is in use"), http://git-wip-us.apache.org/repos/asf/sqoop/blob/a3c37472/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java index 4999159..7ab1182 100644 --- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java +++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java @@ -220,11 +220,11 @@ public class TestJdbcRepository { // prepare the links and jobs // the connector Id for both are the same - List<MLink> linkList = links(link(1,1), link(2,1)); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,2)); + List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 2)); // mock necessary methods for upgradeConnector() procedure - doReturn(linkList).when(repoSpy).findLinksForConnector(anyLong()); + doReturn(linkList).when(repoSpy).findLinksForConnector(anyString()); doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong()); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); @@ -236,13 +236,13 @@ public class TestJdbcRepository { InOrder txOrder = inOrder(repoTransactionMock); InOrder upgraderOrder = inOrder(connectorUpgraderMock); - repoOrder.verify(repoSpy, times(1)).findLinksForConnector(anyLong()); + repoOrder.verify(repoSpy, times(1)).findLinksForConnector(anyString()); repoOrder.verify(repoSpy, times(1)).findJobsForConnector(anyLong()); repoOrder.verify(repoSpy, times(1)).getTransaction(); - repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock); + repoOrder.verify(repoSpy, times(1)).deleteJobInputs("JA", repoTransactionMock); + repoOrder.verify(repoSpy, times(1)).deleteJobInputs("JB", repoTransactionMock); + repoOrder.verify(repoSpy, times(1)).deleteLinkInputs("LA", repoTransactionMock); + repoOrder.verify(repoSpy, times(1)).deleteLinkInputs("LB", repoTransactionMock); repoOrder.verify(repoSpy, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); @@ -269,7 +269,7 @@ public class TestJdbcRepository { when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); when(driverMock.getDriverJobConfigurationClass()).thenReturn(ValidConfiguration.class); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); doReturn(jobList).when(repoSpy).findJobs(); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); @@ -284,8 +284,8 @@ public class TestJdbcRepository { repoOrder.verify(repoSpy, times(1)).findJobs(); repoOrder.verify(repoSpy, times(1)).getTransaction(); - repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); + repoOrder.verify(repoSpy, times(1)).deleteJobInputs("JA", repoTransactionMock); + repoOrder.verify(repoSpy, times(1)).deleteJobInputs("JB", repoTransactionMock); repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); @@ -307,7 +307,7 @@ public class TestJdbcRepository { when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); when(driverMock.getDriverJobConfigurationClass()).thenReturn(InvalidConfiguration.class); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); doReturn(jobList).when(repoSpy).findJobs(); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); @@ -324,8 +324,8 @@ public class TestJdbcRepository { repoOrder.verify(repoSpy, times(1)).findJobs(); repoOrder.verify(repoSpy, times(1)).getTransaction(); - repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); + repoOrder.verify(repoSpy, times(1)).deleteJobInputs("JA", repoTransactionMock); + repoOrder.verify(repoSpy, times(1)).deleteJobInputs("JB", repoTransactionMock); repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); txOrder.verify(repoTransactionMock, times(1)).begin(); @@ -355,13 +355,13 @@ public class TestJdbcRepository { SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "find links for connector error."); - doThrow(exception).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).findLinksForConnector(anyString(), any(Connection.class)); try { repoSpy.upgradeConnector(oldConnector, newConnector); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findLinksForConnector(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).findLinksForConnector(anyString(), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -382,8 +382,8 @@ public class TestJdbcRepository { when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); - List<MLink> linkList = links(link(1,1), link(2,1)); - doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); + List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1)); + doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyString(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "find jobs for connector error."); @@ -393,7 +393,7 @@ public class TestJdbcRepository { repoSpy.upgradeConnector(oldConnector, newConnector); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findLinksForConnector(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).findLinksForConnector(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; @@ -415,22 +415,22 @@ public class TestJdbcRepository { when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); - List<MLink> linkList = links(link(1,1), link(2,1)); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); - doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); + List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); + doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyString(), any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "delete job inputs for connector error."); - doThrow(exception).when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class)); try { repoSpy.upgradeConnector(oldConnector, newConnector); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findLinksForConnector(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).findLinksForConnector(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).deleteJobInputs(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).deleteJobInputs(anyString(), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -451,24 +451,24 @@ public class TestJdbcRepository { when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); - List<MLink> linkList = links(link(1,1), link(2,1)); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); - doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); + List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); + doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyString(), any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "delete link inputs for connector error."); - doThrow(exception).when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class)); try { repoSpy.upgradeConnector(oldConnector, newConnector); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findLinksForConnector(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).findLinksForConnector(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).deleteLinkInputs(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class)); + verify(repoHandlerMock, times(1)).deleteLinkInputs(anyString(), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -489,12 +489,12 @@ public class TestJdbcRepository { when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); - List<MLink> linkList = links(link(1,1), link(2,1)); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); - doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); + List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); + doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyString(), any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update connector error."); @@ -504,10 +504,10 @@ public class TestJdbcRepository { repoSpy.upgradeConnector(oldConnector, newConnector); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findLinksForConnector(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).findLinksForConnector(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteLinkInputs(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; @@ -531,14 +531,14 @@ public class TestJdbcRepository { when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(ValidConfiguration.class); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); - List<MLink> linkList = links(link(1,1), link(2,1)); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); - doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); + List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); + doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyString(), any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class)); doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); - doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); + doReturn(true).when(repoHandlerMock).existsLink(anyString(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update link error."); @@ -548,12 +548,12 @@ public class TestJdbcRepository { repoSpy.upgradeConnector(oldConnector, newConnector); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findLinksForConnector(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).findLinksForConnector(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteLinkInputs(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); - verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).existsLink(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; @@ -577,16 +577,16 @@ public class TestJdbcRepository { when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(ValidConfiguration.class); when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector); - List<MLink> linkList = links(link(1,1), link(2,1)); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); - doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class)); + List<MLink> linkList = links(link(1, "LA", 1), link(2, "LB", 1));; + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); + doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyString(), any(Connection.class)); doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class)); doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class)); - doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); - doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); + doReturn(true).when(repoHandlerMock).existsLink(anyString(), any(Connection.class)); + doReturn(true).when(repoHandlerMock).existsJob(anyString(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update job error."); @@ -596,14 +596,14 @@ public class TestJdbcRepository { repoSpy.upgradeConnector(oldConnector, newConnector); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findLinksForConnector(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).findLinksForConnector(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteLinkInputs(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); - verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(2)).existsLink(anyString(), any(Connection.class)); verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class)); - verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).existsJob(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; @@ -648,19 +648,19 @@ public class TestJdbcRepository { when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "delete job inputs error."); - doThrow(exception).when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class)); try { repoSpy.upgradeDriver(newDriverConfig); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); - verify(repoHandlerMock, times(1)).deleteJobInputs(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).deleteJobInputs(anyString(), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -678,10 +678,10 @@ public class TestJdbcRepository { when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); - doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update driverConfig entity error."); @@ -692,7 +692,7 @@ public class TestJdbcRepository { } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; @@ -712,11 +712,11 @@ public class TestJdbcRepository { when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock); when(driverMock.getDriverJobConfigurationClass()).thenReturn(ValidConfiguration.class); - List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); + List<MJob> jobList = jobs(job(1, "JA", 1, 1, 1, 1), job(2, "JB", 1, 1, 2, 1)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); - doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); + doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class)); doNothing().when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); - doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); + doReturn(true).when(repoHandlerMock).existsJob(anyString(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update job error."); @@ -727,9 +727,9 @@ public class TestJdbcRepository { } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); - verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); - verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); + verify(repoHandlerMock, times(1)).existsJob(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; @@ -764,18 +764,20 @@ public class TestJdbcRepository { return driver; } - private MLink link(long linkId, long connectorId) { + private MLink link(long linkId, String linkName, long connectorId) { MLink link = new MLink(connectorId, new MLinkConfig(new LinkedList<MConfig>())); link.setPersistenceId(linkId); + link.setName(linkName); return link; } - private MJob job(long id, long fromConnectorId, long toConnectorId, long fromLinkId, long toLinkId) { + private MJob job(long id, String jobName, long fromConnectorId, long toConnectorId, long fromLinkId, long toLinkId) { MJob job = new MJob(fromConnectorId, toConnectorId, fromLinkId, toLinkId, new MFromConfig(new LinkedList<MConfig>()), new MToConfig(new LinkedList<MConfig>()), new MDriverConfig(new LinkedList<MConfig>())); job.setPersistenceId(id); + job.setName(jobName); return job; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/a3c37472/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java index c2ba6e3..9c45fe7 100644 --- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java @@ -46,7 +46,6 @@ import org.apache.sqoop.model.InputEditable; import org.apache.sqoop.model.MBooleanInput; import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfigType; -import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MConfigurableType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDateTimeInput; @@ -68,7 +67,6 @@ import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.MToConfig; import org.apache.sqoop.model.SubmissionError; import org.apache.sqoop.repository.JdbcRepositoryHandler; -import org.apache.sqoop.repository.RepositoryError; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counter; import org.apache.sqoop.submission.counter.CounterGroup; @@ -396,7 +394,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { */ @Override public void updateLink(MLink link, Connection conn) { - try (PreparedStatement deleteStmt = conn.prepareStatement(crudQueries.getStmtDeleteLinkInput()); + try (PreparedStatement deleteStmt = conn.prepareStatement(crudQueries.getStmtDeleteLinkInputByLinkId()); PreparedStatement updateStmt = conn.prepareStatement(crudQueries.getStmtUpdateLink());) { // Firstly remove old values deleteStmt.setLong(1, link.getPersistenceId()); @@ -426,9 +424,9 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public boolean existsLink(long linkId, Connection conn) { - try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectLinkCheckById())) { - stmt.setLong(1, linkId); + public boolean existsLink(String linkName, Connection conn) { + try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectLinkCheckByName())) { + stmt.setString(1, linkName); try (ResultSet rs = stmt.executeQuery()) { // Should be always valid in query with count @@ -437,7 +435,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { return rs.getLong(1) == 1; } } catch (SQLException ex) { - logException(ex, linkId); + logException(ex, linkName); throw new SqoopException(CommonRepositoryError.COMMON_0022, ex); } } @@ -446,10 +444,10 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public boolean inUseLink(long linkId, Connection conn) { + public boolean inUseLink(String linkName, Connection conn) { try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectJobsForLinkCheck())) { - stmt.setLong(1, linkId); + stmt.setString(1, linkName); try (ResultSet rs = stmt.executeQuery()) { // Should be always valid in case of count(*) query @@ -458,7 +456,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { return rs.getLong(1) != 0; } } catch (SQLException e) { - logException(e, linkId); + logException(e, linkName); throw new SqoopException(CommonRepositoryError.COMMON_0029, e); } } @@ -467,14 +465,14 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public void enableLink(long linkId, boolean enabled, Connection conn) { + public void enableLink(String linkName, boolean enabled, Connection conn) { try (PreparedStatement enableConn = conn.prepareStatement(crudQueries.getStmtEnableLink())) { enableConn.setBoolean(1, enabled); - enableConn.setLong(2, linkId); + enableConn.setString(2, linkName); enableConn.executeUpdate(); } catch (SQLException ex) { - logException(ex, linkId); + logException(ex, linkName); throw new SqoopException(CommonRepositoryError.COMMON_0038, ex); } } @@ -483,13 +481,13 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public void deleteLink(long linkId, Connection conn) { + public void deleteLink(String linkName, Connection conn) { try (PreparedStatement dltConn = conn.prepareStatement(crudQueries.getStmtDeleteLink())) { - deleteLinkInputs(linkId, conn); - dltConn.setLong(1, linkId); + deleteLinkInputs(linkName, conn); + dltConn.setString(1, linkName); dltConn.executeUpdate(); } catch (SQLException ex) { - logException(ex, linkId); + logException(ex, linkName); throw new SqoopException(CommonRepositoryError.COMMON_0019, ex); } } @@ -498,12 +496,12 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public void deleteLinkInputs(long id, Connection conn) { - try (PreparedStatement dltConnInput = conn.prepareStatement(crudQueries.getStmtDeleteLinkInput())) { - dltConnInput.setLong(1, id); + public void deleteLinkInputs(String linkName, Connection conn) { + try (PreparedStatement dltConnInput = conn.prepareStatement(crudQueries.getStmtDeleteLinkInputByLinkName())) { + dltConnInput.setString(1, linkName); dltConnInput.executeUpdate(); } catch (SQLException ex) { - logException(ex, id); + logException(ex, linkName); throw new SqoopException(CommonRepositoryError.COMMON_0019, ex); } } @@ -576,13 +574,13 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public List<MLink> findLinksForConnector(long connectorId, Connection conn) { + public List<MLink> findLinksForConnector(String connectorName, Connection conn) { try (PreparedStatement linkByConnectorFetchStmt = conn.prepareStatement(crudQueries.getStmtSelectLinkForConnectorConfigurable())) { - linkByConnectorFetchStmt.setLong(1, connectorId); + linkByConnectorFetchStmt.setString(1, connectorName); return loadLinks(linkByConnectorFetchStmt, conn); } catch (SQLException ex) { - logException(ex, connectorId); + logException(ex, connectorName); throw new SqoopException(CommonRepositoryError.COMMON_0020, ex); } } @@ -656,7 +654,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { updateStmt.executeUpdate(); // Secondly remove old values - deleteStmt.setLong(1, job.getPersistenceId()); + deleteStmt.setString(1, job.getName()); deleteStmt.executeUpdate(); // And reinsert new values @@ -683,9 +681,9 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public boolean existsJob(long jobId, Connection conn) { - try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectJobCheckById())) { - stmt.setLong(1, jobId); + public boolean existsJob(String jobName, Connection conn) { + try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectJobCheckByName())) { + stmt.setString(1, jobName); try (ResultSet rs = stmt.executeQuery()) { // Should be always valid in query with count @@ -694,7 +692,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { return rs.getLong(1) == 1; } } catch (SQLException ex) { - logException(ex, jobId); + logException(ex, jobName); throw new SqoopException(CommonRepositoryError.COMMON_0026, ex); } } @@ -703,8 +701,8 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public boolean inUseJob(long jobId, Connection conn) { - MSubmission submission = findLastSubmissionForJob(jobId, conn); + public boolean inUseJob(String jobName, Connection conn) { + MSubmission submission = findLastSubmissionForJob(jobName, conn); // We have no submissions and thus job can't be in use if (submission == null) { @@ -723,13 +721,13 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public void enableJob(long jobId, boolean enabled, Connection conn) { + public void enableJob(String jobName, boolean enabled, Connection conn) { try (PreparedStatement enableConn = conn.prepareStatement(crudQueries.getStmtEnableJob())) { enableConn.setBoolean(1, enabled); - enableConn.setLong(2, jobId); + enableConn.setString(2, jobName); enableConn.executeUpdate(); } catch (SQLException ex) { - logException(ex, jobId); + logException(ex, jobName); throw new SqoopException(CommonRepositoryError.COMMON_0039, ex); } } @@ -738,12 +736,12 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public void deleteJobInputs(long id, Connection conn) { + public void deleteJobInputs(String jobName, Connection conn) { try (PreparedStatement dltInput = conn.prepareStatement(crudQueries.getStmtDeleteJobInput())) { - dltInput.setLong(1, id); + dltInput.setString(1, jobName); dltInput.executeUpdate(); } catch (SQLException ex) { - logException(ex, id); + logException(ex, jobName); throw new SqoopException(CommonRepositoryError.COMMON_0025, ex); } } @@ -752,13 +750,13 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public void deleteJob(long jobId, Connection conn) { + public void deleteJob(String jobName, Connection conn) { try (PreparedStatement dlt = conn.prepareStatement(crudQueries.getStmtDeleteJob())) { - deleteJobInputs(jobId, conn); - dlt.setLong(1, jobId); + deleteJobInputs(jobName, conn); + dlt.setString(1, jobName); dlt.executeUpdate(); } catch (SQLException ex) { - logException(ex, jobId); + logException(ex, jobName); throw new SqoopException(CommonRepositoryError.COMMON_0025, ex); } } @@ -993,10 +991,10 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public List<MSubmission> findSubmissionsForJob(long jobId, Connection conn) { + public List<MSubmission> findSubmissionsForJob(String jobName, Connection conn) { List<MSubmission> submissions = new LinkedList<MSubmission>(); try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectSubmissionsForJob())) { - stmt.setLong(1, jobId); + stmt.setString(1, jobName); try (ResultSet rs = stmt.executeQuery()) { while (rs.next()) { @@ -1015,10 +1013,10 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public MSubmission findLastSubmissionForJob(long jobId, Connection conn) { + public MSubmission findLastSubmissionForJob(String jobName, Connection conn) { try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectSubmissionsForJob())) { - stmt.setLong(1, jobId); + stmt.setString(1, jobName); stmt.setMaxRows(1); try (ResultSet rs = stmt.executeQuery()) { @@ -1029,7 +1027,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { return loadSubmission(rs, conn); } } catch (SQLException ex) { - logException(ex, jobId); + logException(ex, jobName); throw new SqoopException(CommonRepositoryError.COMMON_0037, ex); } } @@ -1783,125 +1781,6 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { return children; } - @Override - public MConfig findFromJobConfig(long jobId, String configName, Connection conn) { - MJob job = findJob(jobId, conn); - if (job == null) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); - } - MFromConfig fromConfigs = job.getFromJobConfig(); - if (fromConfigs != null) { - MConfig config = fromConfigs.getConfig(configName); - if (config == null) { - throw new SqoopException(CommonRepositoryError.COMMON_0049, "for configName :" + configName); - } - return config; - } - throw new SqoopException(CommonRepositoryError.COMMON_0049, "for configName :" + configName); - } - - @Override - public MConfig findToJobConfig(long jobId, String configName, Connection conn) { - MJob job = findJob(jobId, conn); - if (job == null) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); - } - MToConfig toConfigs = job.getToJobConfig(); - if (toConfigs != null) { - MConfig config = toConfigs.getConfig(configName); - if (config == null) { - throw new SqoopException(CommonRepositoryError.COMMON_0050, "for configName :" + configName); - } - return config; - } - throw new SqoopException(CommonRepositoryError.COMMON_0050, "for configName :" + configName); - } - - @Override - public MConfig findDriverJobConfig(long jobId, String configName, Connection conn) { - MJob job = findJob(jobId, conn); - if (job == null) { - throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); - } - MDriverConfig driverConfigs = job.getDriverConfig(); - if (driverConfigs != null) { - MConfig config = driverConfigs.getConfig(configName); - if (config == null) { - throw new SqoopException(CommonRepositoryError.COMMON_0051, "for configName :" + configName); - } - return config; - } - throw new SqoopException(CommonRepositoryError.COMMON_0051, "for configName :" + configName); - } - - @Override - public MConfig findLinkConfig(long linkId, String configName, Connection conn) { - MLink link = findLink(linkId, conn); - if (link == null) { - throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + linkId); - } - MConfig driverConfig = link.getConnectorLinkConfig(configName); - if (driverConfig == null) { - throw new SqoopException(CommonRepositoryError.COMMON_0052, "for configName :" + configName); - } - return driverConfig; - } - - @SuppressWarnings("resource") - @Override - public void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType type, - Connection conn) { - List<MInput<?>> inputs = config.getInputs(); - - try (PreparedStatement updateStmt = conn.prepareStatement(crudQueries.getStmtUpdateJobInput())) { - for (MInput<?> input : inputs) { - if (input.isEmpty()) { - continue; - } - validateEditableConstraints(type, input); - updateStmt.setString(1, input.getUrlSafeValueString()); - updateStmt.setLong(2, input.getPersistenceId()); - updateStmt.setLong(3, jobId); - updateStmt.executeUpdate(); - } - } catch (SQLException ex) { - logException(ex, jobId); - throw new SqoopException(CommonRepositoryError.COMMON_0053, ex); - } - } - - private void validateEditableConstraints(MConfigUpdateEntityType type, MInput<?> input) { - if (input.getEditable().equals(InputEditable.CONNECTOR_ONLY) - && type.equals(MConfigUpdateEntityType.USER)) { - throw new SqoopException(CommonRepositoryError.COMMON_0055); - } - if (input.getEditable().equals(InputEditable.USER_ONLY) - && type.equals(MConfigUpdateEntityType.CONNECTOR)) { - throw new SqoopException(CommonRepositoryError.COMMON_0056); - } - } - - @Override - public void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType type, - Connection conn) { - List<MInput<?>> inputs = config.getInputs(); - try (PreparedStatement updateStmt = conn.prepareStatement(crudQueries.getStmtUpdateLinkInput());) { - for (MInput<?> input : inputs) { - if (input.isEmpty()) { - continue; - } - validateEditableConstraints(type, input); - updateStmt.setString(1, input.getUrlSafeValueString()); - updateStmt.setLong(2, input.getPersistenceId()); - updateStmt.setLong(3, linkId); - updateStmt.executeUpdate(); - } - } catch (SQLException ex) { - logException(ex, linkId); - throw new SqoopException(CommonRepositoryError.COMMON_0054, ex); - } - } - /** * {@inheritDoc} */
