Updated Branches: refs/heads/sqoop2 59c2188be -> 96a02dfee
SQOOP-994: Sqoop2: Upgrade: Add calling validation to the upgrade method (Mengwei Ding via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/96a02dfe Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/96a02dfe Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/96a02dfe Branch: refs/heads/sqoop2 Commit: 96a02dfee99132c867424091ad114626b61c0894 Parents: 59c2188 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Jul 3 16:33:52 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Jul 3 16:33:52 2013 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/repository/Repository.java | 102 +++++++++++++++++-- .../sqoop/repository/RepositoryError.java | 5 +- 2 files changed, 99 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/96a02dfe/core/src/main/java/org/apache/sqoop/repository/Repository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 46cb7e6..0bedcbb 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -38,6 +38,8 @@ import org.apache.sqoop.model.MMapInput; import org.apache.sqoop.model.MStringInput; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.model.ModelError; +import org.apache.sqoop.validation.Validation; +import org.apache.sqoop.validation.Validator; import java.util.ArrayList; import java.util.Date; @@ -374,7 +376,10 @@ public abstract class Repository { * register the new forms and inputs. * 6. Create new connections and jobs with connector part being the ones * returned by the upgrader. - * 7. Insert the connection inputs followed by job inputs (using + * 7. Validate new connections and jobs with connector's validator + * 8. If any invalid connections or jobs detected, throw an exception + * and stop the bootup of Sqoop server + * 9. Otherwise, Insert the connection inputs followed by job inputs (using * updateJob and updateConnection) */ RepositoryTransaction tx = null; @@ -382,6 +387,13 @@ public abstract class Repository { SqoopConnector connector = ConnectorManager.getInstance().getConnector(newConnector .getUniqueName()); + + Validator validator = connector.getValidator(); + + // lists to buffer invalid connections and jobs + List<MConnection> invalidConnections = new ArrayList<MConnection>(); + List<MJob> invalidJobs = new ArrayList<MJob>(); + MetadataUpgrader upgrader = connector.getMetadataUpgrader(); List<MConnection> connections = findConnectionsForConnector( connectorID); @@ -402,7 +414,13 @@ public abstract class Repository { MConnection newConnection = new MConnection(connectorID, newConnectionForms, connection.getFrameworkPart()); newConnection.setPersistenceId(connectionID); - updateConnection(newConnection, tx); + + Validation validation = validator.validateConnection(newConnection); + if (validation.getStatus().canProceed()) { + updateConnection(newConnection, tx); + } else { + invalidConnections.add(newConnection); + } } for (MJob job : jobs) { // Make a new copy of the forms from the connector, @@ -414,9 +432,38 @@ public abstract class Repository { MJob newJob = new MJob(connectorID, job.getConnectionId(), job.getType(), newJobForms, job.getFrameworkPart()); newJob.setPersistenceId(job.getPersistenceId()); - updateJob(newJob, tx); + + Validation validation = validator.validateJob(newJob.getType(), newJob); + if (validation.getStatus().canProceed()) { + updateJob(newJob, tx); + } else { + invalidJobs.add(newJob); + } + } + + if (invalidConnections.size() == 0 && invalidJobs.size() == 0) { + tx.commit(); + } else { + String msg = "Metadata upgrade for connector failed because of invalid Connections or Jobs.\n"; + + if (invalidConnections.size() > 0) { + msg += "Connections: "; + for (MConnection connection : invalidConnections) { + msg += connection.getPersistenceId() + ", "; + } + msg += "\n"; + } + + if (invalidJobs.size() > 0) { + msg += "Jobs: "; + for (MJob job : invalidJobs) { + msg += job.getPersistenceId() + ", "; + } + msg += "\n"; + } + + throw new SqoopException(RepositoryError.JDBCREPO_0027, msg); } - tx.commit(); } catch (Exception ex) { if(tx != null) { tx.rollback(); @@ -439,6 +486,12 @@ public abstract class Repository { List<MConnection> connections = findConnections(); List<MJob> jobs = findJobs(); + Validator validator = FrameworkManager.getInstance().getValidator(); + + // lists to buffer invalid connections and jobs + List<MConnection> invalidConnections = new ArrayList<MConnection>(); + List<MJob> invalidJobs = new ArrayList<MJob>(); + // -- BEGIN TXN -- tx = getTransaction(); tx.begin(); @@ -455,7 +508,13 @@ public abstract class Repository { MConnection newConnection = new MConnection(connection.getConnectorId(), connection.getConnectorPart(), newConnectionForms); newConnection.setPersistenceId(connectionID); - updateConnection(newConnection, tx); + + Validation validation = validator.validateConnection(newConnection); + if (validation.getStatus().canProceed()) { + updateConnection(newConnection, tx); + } else { + invalidConnections.add(newConnection); + } } for (MJob job : jobs) { // Make a new copy of the forms from the framework, @@ -467,9 +526,38 @@ public abstract class Repository { MJob newJob = new MJob(job.getConnectorId(), job.getConnectionId(), job.getType(), job.getConnectorPart(), newJobForms); newJob.setPersistenceId(job.getPersistenceId()); - updateJob(newJob, tx); + + Validation validation = validator.validateJob(newJob.getType(), newJob); + if (validation.getStatus().canProceed()) { + updateJob(newJob, tx); + } else { + invalidJobs.add(newJob); + } + } + + if (invalidConnections.size() == 0 && invalidJobs.size() == 0) { + tx.commit(); + } else { + String msg = "Metadata upgrade for job failed because of invalid Connections or Jobs.\n"; + + if (invalidConnections.size() > 0) { + msg += "Connections: "; + for (MConnection connection : invalidConnections) { + msg += connection.getPersistenceId() + ", "; + } + msg += "\n"; + } + + if (invalidJobs.size() > 0) { + msg += "Jobs: "; + for (MJob job : invalidJobs) { + msg += job.getPersistenceId() + ", "; + } + msg += "\n"; + } + + throw new SqoopException(RepositoryError.JDBCREPO_0027, msg); } - tx.commit(); } catch (Exception ex) { if(tx != null) { tx.rollback(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/96a02dfe/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java index c616889..3f3a9e6 100644 --- a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java +++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java @@ -119,7 +119,10 @@ public enum RepositoryError implements ErrorCode { JDBCREPO_0025("Given submission id is invalid"), /** Upgrade required but not allowed **/ - JDBCREPO_0026("Upgrade required but not allowed"); + JDBCREPO_0026("Upgrade required but not allowed"), + + /** Invalid connections or jobs when upgrading connector **/ + JDBCREPO_0027("Invalid connections or jobs when upgrading connector"); ;
