SQOOP-1557: Sqoop2: SQ_CONFIGURABLE ( for entities who own configs) (Veena Basavaraj via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/151a0a12 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/151a0a12 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/151a0a12 Branch: refs/heads/sqoop2 Commit: 151a0a12a96b32c7f9f08c0199fbd907ac6097da Parents: 39a2200 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Oct 21 20:47:46 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Oct 21 20:47:46 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/sqoop/model/MConnector.java | 4 + .../java/org/apache/sqoop/model/MDriver.java | 18 +- .../java/org/apache/sqoop/driver/Driver.java | 4 + .../apache/sqoop/repository/JdbcRepository.java | 28 +- .../sqoop/repository/JdbcRepositoryHandler.java | 18 +- .../org/apache/sqoop/repository/Repository.java | 16 +- .../sqoop/repository/TestJdbcRepository.java | 52 +- .../repository/derby/DerbyRepoConstants.java | 8 +- .../sqoop/repository/derby/DerbyRepoError.java | 8 +- .../derby/DerbyRepositoryHandler.java | 310 +++++----- .../repository/derby/DerbySchemaConstants.java | 26 +- .../repository/derby/DerbySchemaQuery.java | 583 +++++++++++-------- .../sqoop/repository/derby/DerbyTestCase.java | 89 ++- .../repository/derby/TestConnectorHandling.java | 25 +- .../repository/derby/TestDriverHandling.java | 70 +-- .../sqoop/repository/derby/TestJobHandling.java | 3 +- .../sqoop/tools/tool/RepositoryLoadTool.java | 223 ++++--- 17 files changed, 821 insertions(+), 664 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/common/src/main/java/org/apache/sqoop/model/MConnector.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java index 174d0b9..1b9462e 100644 --- a/common/src/main/java/org/apache/sqoop/model/MConnector.java +++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java @@ -181,6 +181,10 @@ public final class MConnector extends Configurable { return version; } + public MConfigurableType getType() { + return MConfigurableType.CONNECTOR; + } + public SupportedDirections getSupportedDirections() { return new SupportedDirections(this.getConfig(Direction.FROM) != null, this.getConfig(Direction.TO) != null); http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/common/src/main/java/org/apache/sqoop/model/MDriver.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MDriver.java b/common/src/main/java/org/apache/sqoop/model/MDriver.java index 4241a31..cc47511 100644 --- a/common/src/main/java/org/apache/sqoop/model/MDriver.java +++ b/common/src/main/java/org/apache/sqoop/model/MDriver.java @@ -17,15 +17,17 @@ */ package org.apache.sqoop.model; -import java.sql.Driver; /** * Describes the configs associated with the {@link Driver} for executing sqoop jobs. */ public final class MDriver extends Configurable { + public static final String DRIVER_NAME = "SqoopDriver"; private final MDriverConfig driverConfig; - private final String version; + private String version; + // Since there is only one Driver in the system, the name is not user specified + private static final String uniqueName = DRIVER_NAME; public MDriver(MDriverConfig driverConfig, String version) { this.driverConfig = driverConfig; @@ -68,6 +70,14 @@ public final class MDriver extends Configurable { return driverConfig; } + public MConfigurableType getType() { + return MConfigurableType.DRIVER; + } + + public String getUniqueName() { + return uniqueName; + } + @Override public MDriver clone(boolean cloneWithValue) { cloneWithValue = false; @@ -79,4 +89,8 @@ public final class MDriver extends Configurable { public String getVersion() { return version; } + + public void setVersion(String version) { + this.version = version; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/driver/Driver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java index 46a16ac..6942891 100644 --- a/core/src/main/java/org/apache/sqoop/driver/Driver.java +++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java @@ -158,6 +158,10 @@ public class Driver implements Reconfigurable { return mDriver; } + public static String getClassName() { + return Driver.getInstance().getClass().getSimpleName(); + } + public ResourceBundle getBundle(Locale locale) { return ResourceBundle.getBundle(DriverConstants.DRIVER_CONFIG_BUNDLE, locale); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index 476830d..d7b526a 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -23,10 +23,11 @@ import java.util.List; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.model.MLink; +import org.apache.sqoop.driver.Driver; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MSubmission; public class JdbcRepository extends Repository { @@ -220,7 +221,7 @@ public class JdbcRepository extends Repository { return (MDriver) doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) { - MDriver existingDriverConfig = handler.findDriver(conn); + MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn); if (existingDriverConfig == null) { handler.registerDriver(mDriver, conn); return mDriver; @@ -233,7 +234,7 @@ public class JdbcRepository extends Repository { return mDriver; } else { throw new SqoopException(RepositoryError.JDBCREPO_0026, - "DriverConfig: " + mDriver.getPersistenceId()); + "Driver: " + mDriver.getPersistenceId()); } } return existingDriverConfig; @@ -246,6 +247,19 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override + public MDriver findDriver(final String shortName) { + return (MDriver) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + return handler.findDriver(shortName, conn); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override public void createLink(final MLink link) { doWithConnection(new DoWithConnection() { @Override @@ -648,23 +662,23 @@ public class JdbcRepository extends Repository { * {@inheritDoc} */ @Override - protected void upgradeConnectorConfigs(final MConnector newConnector, + protected void upgradeConnectorAndConfigs(final MConnector newConnector, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.upgradeConnectorConfigs(newConnector, conn); + handler.upgradeConnectorAndConfigs(newConnector, conn); return null; } }, (JdbcRepositoryTransaction) tx); } - protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) { + protected void upgradeDriverAndConfigs(final MDriver mDriver, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @Override public Object doIt(Connection conn) throws Exception { - handler.upgradeDriverConfigs(mDriver, conn); + handler.upgradeDriverAndConfigs(mDriver, conn); return null; } }, (JdbcRepositoryTransaction) tx); http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index 4c5229f..7d78826 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -21,10 +21,10 @@ import java.sql.Connection; import java.util.Date; import java.util.List; -import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MSubmission; /** @@ -41,7 +41,7 @@ public abstract class JdbcRepositoryHandler { /** * Search for connector with given name in repository. - * And return corresponding connector structure. + * And return corresponding connector entity. * * @param shortName Connector unique name * @param conn JDBC link for querying repository. @@ -101,8 +101,7 @@ public abstract class JdbcRepositoryHandler { * @param conn JDBC link for querying repository */ - public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn); - + public abstract void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn); /** * Upgrade the driver with the new data supplied in the @@ -117,17 +116,16 @@ public abstract class JdbcRepositoryHandler { * the driverConfig. * @param conn JDBC link for querying repository */ - public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn); - + public abstract void upgradeDriverAndConfigs(MDriver mDriver, Connection conn); /** - * Search for driverConfigin the repository. - * + * Search for driver in the repository. + * @params shortName the name for the driver * @param conn JDBC link for querying repository. - * @return null if driverConfig are not yet present in repository or + * @return null if driver are not yet present in repository or * loaded representation. */ - public abstract MDriver findDriver(Connection conn); + public abstract MDriver findDriver(String shortName, Connection conn); /** * Register driver config in repository. http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/Repository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 8f78052..bd2a3be 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -119,6 +119,14 @@ public abstract class Repository { public abstract List<MConnector> findConnectors(); /** + * Search for driver in the repository. + * @param shortName Driver unique name + * @return null if driver are not yet present in repository or + * loaded representation. + */ + public abstract MDriver findDriver(String shortName); + + /** * Save given link to repository. This link must not be already * present in the repository otherwise exception will be thrown. * @@ -317,7 +325,7 @@ public abstract class Repository { * method will not call begin, commit, * rollback or close on this transaction. */ - protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx); + protected abstract void upgradeConnectorAndConfigs(MConnector newConnector, RepositoryTransaction tx); /** * Upgrade the driver with the new data supplied in the @@ -335,7 +343,7 @@ public abstract class Repository { * method will not call begin, commit, * rollback or close on this transaction. */ - protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx); + protected abstract void upgradeDriverAndConfigs(MDriver newDriver, RepositoryTransaction tx); /** * Delete all inputs for a job @@ -410,7 +418,7 @@ public abstract class Repository { deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx); // 5. Delete all inputs and configs associated with the connector, and // insert the new configs and inputs for this connector - upgradeConnectorConfigs(newConnector, tx); + upgradeConnectorAndConfigs(newConnector, tx); // 6. Run upgrade logic for the configs related to the link objects // dont always rely on the repository implementation to return empty list for links if (existingLinksByConnector != null) { @@ -514,7 +522,7 @@ public abstract class Repository { deleteJobs(existingJobs, tx); // 4. Delete all inputs and configs associated with the driver, and // insert the new configs and inputs for this driver - upgradeDriverConfigs(driver, tx); + upgradeDriverAndConfigs(driver, tx); for (MJob job : existingJobs) { // Make a new copy of the configs http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java index ff9e0c3..ae0e922 100644 --- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java +++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java @@ -167,10 +167,10 @@ public class TestJdbcRepository { */ @Test public void testDriverConfigEnableAutoUpgrade() { - MDriver newDriverConfig = driver(); - MDriver oldDriverConfig = anotherDriver(); + MDriver newDriver = driver(); + MDriver oldDriver = anotherDriver(); - when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig); + when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver); // make the upgradeDriverConfig to throw an exception to prove that it has been called SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, @@ -178,10 +178,10 @@ public class TestJdbcRepository { doThrow(exception).when(repoHandlerMock).findJobs(any(Connection.class)); try { - repoSpy.registerDriver(newDriverConfig, true); + repoSpy.registerDriver(newDriver, true); } catch (SqoopException ex) { assertEquals(ex.getMessage(), exception.getMessage()); - verify(repoHandlerMock, times(1)).findDriver(any(Connection.class)); + verify(repoHandlerMock, times(1)).findDriver(anyString(), any(Connection.class)); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; @@ -195,16 +195,16 @@ public class TestJdbcRepository { */ @Test public void testDriverConfigDisableAutoUpgrade() { - MDriver newDriverConfig = driver(); - MDriver oldDriverConfig = anotherDriver(); + MDriver newDriver = driver(); + MDriver oldDriver = anotherDriver(); - when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig); + when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver); try { - repoSpy.registerDriver(newDriverConfig, false); + repoSpy.registerDriver(newDriver, false); } catch (SqoopException ex) { assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0026); - verify(repoHandlerMock, times(1)).findDriver(any(Connection.class)); + verify(repoHandlerMock, times(1)).findDriver(anyString(),any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -242,7 +242,7 @@ public class TestJdbcRepository { doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong()); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class)); repoSpy.upgradeConnector(oldConnector, newConnector); @@ -258,7 +258,7 @@ public class TestJdbcRepository { repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); @@ -296,7 +296,7 @@ public class TestJdbcRepository { doReturn(jobList).when(repoSpy).findJobs(); doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class)); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoSpy.upgradeDriver(newDriverConfig); @@ -309,7 +309,7 @@ public class TestJdbcRepository { repoOrder.verify(repoSpy, times(1)).getTransaction(); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); txOrder.verify(repoTransactionMock, times(1)).begin(); @@ -339,7 +339,7 @@ public class TestJdbcRepository { doReturn(jobList).when(repoSpy).findJobs(); doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class)); - doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); + doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); try { repoSpy.upgradeDriver(newDriverConfig); @@ -355,7 +355,7 @@ public class TestJdbcRepository { repoOrder.verify(repoSpy, times(1)).getTransaction(); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock); repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock); - repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class)); + repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class)); repoOrder.verifyNoMoreInteractions(); txOrder.verify(repoTransactionMock, times(1)).begin(); txOrder.verify(repoTransactionMock, times(1)).rollback(); @@ -535,7 +535,7 @@ public class TestJdbcRepository { SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update connector error."); - doThrow(exception).when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); try { repoSpy.upgradeConnector(oldConnector, newConnector); @@ -545,7 +545,7 @@ public class TestJdbcRepository { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -577,7 +577,7 @@ public class TestJdbcRepository { doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, @@ -592,7 +592,7 @@ public class TestJdbcRepository { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); @@ -626,7 +626,7 @@ public class TestJdbcRepository { doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); @@ -643,7 +643,7 @@ public class TestJdbcRepository { verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class)); verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); @@ -731,7 +731,7 @@ public class TestJdbcRepository { SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, "update driverConfig entity error."); - doThrow(exception).when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); + doThrow(exception).when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); try { repoSpy.upgradeDriver(newDriverConfig); @@ -739,7 +739,7 @@ public class TestJdbcRepository { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); return ; } @@ -764,7 +764,7 @@ public class TestJdbcRepository { List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1)); doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class)); doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class)); - doNothing().when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); + doNothing().when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class)); SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000, @@ -777,7 +777,7 @@ public class TestJdbcRepository { assertEquals(ex.getMessage(), exception.getMessage()); verify(repoHandlerMock, times(1)).findJobs(any(Connection.class)); verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class)); - verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class)); + verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class)); verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class)); verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class)); verifyNoMoreInteractions(repoHandlerMock); http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java index 40dcc49..8fbf47f 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java @@ -22,13 +22,9 @@ public final class DerbyRepoConstants { public static final String CONF_PREFIX_DERBY = "derby."; @Deprecated - // use only for the upgrade code should be removed soon + // use only for the upgrade code public static final String SYSKEY_VERSION = "version"; - - public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "version"; - - // TOOD(VB): SQOOP-1557 move the driver config version to the SQ_CONFIGURABLE, IT SHOULD NOT BE HERE, nor stored in SYSTEM table - public static final String SYSKEY_DRIVER_CONFIG_VERSION = "driver.config.version"; + public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "repository.version"; /** * Expected version of the repository structures. http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java index 3e4a4a9..aad219e 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java @@ -188,8 +188,12 @@ public enum DerbyRepoError implements ErrorCode { DERBYREPO_0048("Could not register config direction"), - DERBYREPO_0049("Could not set connector direction") - ; + DERBYREPO_0049("Could not set connector direction"), + + /** The system was unable to register driver due to a server error **/ + DERBYREPO_0050("Registration of driver failed"), + + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index aa58850..633e9df 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -45,9 +45,11 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.SupportedDirections; import org.apache.sqoop.connector.ConnectorHandler; import org.apache.sqoop.connector.ConnectorManagerUtils; +import org.apache.sqoop.driver.Driver; import org.apache.sqoop.model.MBooleanInput; import org.apache.sqoop.model.MConfig; import org.apache.sqoop.model.MConfigType; +import org.apache.sqoop.model.MConfigurableType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MDriverConfig; @@ -102,7 +104,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { throw new SqoopException(DerbyRepoError.DERBYREPO_0011, mc.getUniqueName()); } - mc.setPersistenceId(getConnectorId(mc, conn)); + mc.setPersistenceId(insertAndGetConnectorId(mc, conn)); insertConfigsForConnector(mc, conn); } @@ -116,10 +118,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { PreparedStatement baseConfigStmt = null; PreparedStatement baseInputStmt = null; try{ - baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, + baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG, Statement.RETURN_GENERATED_KEYS); - baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT, Statement.RETURN_GENERATED_KEYS); // Register the job config type, since driver config is per job @@ -145,15 +147,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { PreparedStatement baseConfigStmt = null; PreparedStatement baseInputStmt = null; try{ - baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, + baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG, Statement.RETURN_GENERATED_KEYS); - baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT, Statement.RETURN_GENERATED_KEYS); - // Register link type config for connector - // NOTE: The direction is null for LINK type - registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(), + // Register link type config + registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(), MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn); // Register both from/to job type config for connector @@ -202,19 +203,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } } - private long getConnectorId(MConnector mc, Connection conn) { + private long insertAndGetConnectorId(MConnector mc, Connection conn) { PreparedStatement baseConnectorStmt = null; try { - baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE, - Statement.RETURN_GENERATED_KEYS); + baseConnectorStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE, + Statement.RETURN_GENERATED_KEYS); baseConnectorStmt.setString(1, mc.getUniqueName()); baseConnectorStmt.setString(2, mc.getClassName()); baseConnectorStmt.setString(3, mc.getVersion()); + baseConnectorStmt.setString(4, mc.getType().name()); int baseConnectorCount = baseConnectorStmt.executeUpdate(); if (baseConnectorCount != 1) { throw new SqoopException(DerbyRepoError.DERBYREPO_0012, - Integer.toString(baseConnectorCount)); + Integer.toString(baseConnectorCount)); } ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys(); @@ -222,19 +224,44 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { if (!rsetConnectorId.next()) { throw new SqoopException(DerbyRepoError.DERBYREPO_0013); } - - insertConnectorDirections(rsetConnectorId.getLong(1), - mc.getSupportedDirections(), conn); - + // connector configurable also have directions + insertConnectorDirections(rsetConnectorId.getLong(1), mc.getSupportedDirections(), conn); return rsetConnectorId.getLong(1); } catch (SQLException ex) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0014, - mc.toString(), ex); + throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mc.toString(), ex); } finally { closeStatements(baseConnectorStmt); } } + private long insertAndGetDriverId(MDriver mDriver, Connection conn) { + PreparedStatement baseDriverStmt = null; + try { + baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE, + Statement.RETURN_GENERATED_KEYS); + baseDriverStmt.setString(1, mDriver.getUniqueName()); + baseDriverStmt.setString(2, Driver.getClassName()); + baseDriverStmt.setString(3, mDriver.getVersion()); + baseDriverStmt.setString(4, mDriver.getType().name()); + + int baseDriverCount = baseDriverStmt.executeUpdate(); + if (baseDriverCount != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(baseDriverCount)); + } + + ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys(); + + if (!rsetDriverId.next()) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0013); + } + return rsetDriverId.getLong(1); + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0050, mDriver.toString(), ex); + } finally { + closeStatements(baseDriverStmt); + } + } + /** * {@inheritDoc} */ @@ -351,59 +378,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } /** - * Detect version of the driver - * - * @param conn Connection to the repository - * @return Version of the Driver - */ - private String detectDriverVersion (Connection conn) { - ResultSet rs = null; - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement(DerbySchemaQuery.STMT_SELECT_SYSTEM); - stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION); - rs = stmt.executeQuery(); - if(!rs.next()) { - return null; - } - return rs.getString(1); - } catch (SQLException e) { - LOG.info("Can't fetch driver version.", e); - return null; - } finally { - closeResultSets(rs); - closeStatements(stmt); - } - } - - /** - * Create or update driver version - * @param conn Connection to the the repository - * @param mDriver - */ - private void createOrUpdateDriverSystemVersion(Connection conn, String version) { - ResultSet rs = null; - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement(STMT_DELETE_SYSTEM); - stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION); - stmt.executeUpdate(); - closeStatements(stmt); - - stmt = conn.prepareStatement(STMT_INSERT_SYSTEM); - stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION); - stmt.setString(2, version); - stmt.executeUpdate(); - } catch (SQLException e) { - logException(e); - throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e); - } finally { - closeResultSets(rs); - closeStatements(stmt); - } - } - - /** * {@inheritDoc} */ @Override @@ -460,9 +434,11 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn); // SQOOP-1498 rename entities - renameEntitiesForUpgrade(conn); + renameEntitiesForConnectionAndForm(conn); // Change direction from VARCHAR to BIGINT + foreign key. updateDirections(conn, insertDirections(conn)); + + renameConnectorToConfigurable(conn); } // Add unique constraints on job and links for version 4 onwards if (repositoryVersion > 3) { @@ -474,7 +450,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } // SQOOP-1498 refactoring related upgrades for table and column names - void renameEntitiesForUpgrade(Connection conn) { + void renameEntitiesForConnectionAndForm(Connection conn) { // LINK // drop the constraint before rename runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1, conn); @@ -491,6 +467,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7, conn); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8, conn); + // rename constraints + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT, conn); + LOG.info("LINK TABLE altered"); // LINK_INPUT @@ -511,6 +491,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4, conn); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5, conn); runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6, conn); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn); LOG.info("CONFIG TABLE altered"); @@ -528,7 +510,24 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO, conn); LOG.info("JOB TABLE altered and constraints added"); + } + + private void renameConnectorToConfigurable(Connection conn) { + // SQ_CONNECTOR to SQ_CONFIGURABLE upgrade + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn); + + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE, conn); + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1, conn); + runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1, conn); + runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT, conn); + runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn); + + LOG.info("CONNECTOR TABLE altered and constraints added for CONFIGURABLE"); } private void upgradeRepositoryVersion(Connection conn) { @@ -538,7 +537,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { runQuery(STMT_INSERT_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION, "" + DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION); } - /** * Insert directions: FROM and TO. * @param conn @@ -643,6 +641,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } } + /** * Upgrade job data from IMPORT/EXPORT to FROM/TO. * Since the framework is no longer responsible for HDFS, @@ -712,13 +711,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn, new Long(0), "throttling"); - Long linkId = createHdfsConnection(conn, connectorId); + Long connectionId = createHdfsConnection(conn, connectorId); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn, "EXPORT"); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn, - new Long(linkId), "EXPORT"); + new Long(connectionId), "EXPORT"); runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn, - new Long(linkId), "IMPORT"); + new Long(connectionId), "IMPORT"); runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn, "fromJobConfig", "table", Direction.FROM.toString()); @@ -738,6 +737,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { * Pre-register HDFS Connector so that config upgrade will work. * NOTE: This should be used only in the upgrade path */ + @Deprecated protected long registerHdfsConnector(Connection conn) { if (LOG.isTraceEnabled()) { LOG.trace("Begin HDFS Connector pre-loading."); @@ -760,7 +760,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { if (handler.getUniqueName().equals(CONNECTOR_HDFS)) { try { PreparedStatement baseConnectorStmt = conn.prepareStatement( - STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS, + STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS, Statement.RETURN_GENERATED_KEYS); baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName()); baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName()); @@ -854,21 +854,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { LOG.debug("Looking up connector: " + shortName); } MConnector mc = null; - PreparedStatement baseConnectorFetchStmt = null; + PreparedStatement connectorFetchStmt = null; try { - baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR); - baseConnectorFetchStmt.setString(1, shortName); + connectorFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE); + connectorFetchStmt.setString(1, shortName); - List<MConnector> connectors = loadConnectors(baseConnectorFetchStmt, conn); + List<MConnector> connectors = loadConnectors(connectorFetchStmt, conn); - if (connectors.size()==0) { + if (connectors.size() == 0) { LOG.debug("No connector found by name: " + shortName); return null; - } else if (connectors.size()==1) { + } else if (connectors.size() == 1) { LOG.debug("Looking up connector: " + shortName + ", found: " + mc); return connectors.get(0); - } - else { + } else { throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName); } @@ -876,7 +875,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { logException(ex, shortName); throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex); } finally { - closeStatements(baseConnectorFetchStmt); + closeStatements(connectorFetchStmt); } } @@ -887,7 +886,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { public List<MConnector> findConnectors(Connection conn) { PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(STMT_SELECT_CONNECTOR_ALL); + stmt = conn.prepareStatement(STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE); + stmt.setString(1, MConfigurableType.CONNECTOR.name()); + return loadConnectors(stmt,conn); } catch (SQLException ex) { logException(ex); @@ -897,84 +898,101 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } } - /** * {@inheritDoc} */ @Override public void registerDriver(MDriver mDriver, Connection conn) { if (mDriver.hasPersistenceId()) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0011, - "Driver"); + throw new SqoopException(DerbyRepoError.DERBYREPO_0011, mDriver.getUniqueName()); } + mDriver.setPersistenceId(insertAndGetDriverId(mDriver, conn)); + insertConfigsforDriver(mDriver, conn); + } + private void insertConfigsforDriver(MDriver mDriver, Connection conn) { PreparedStatement baseConfigStmt = null; PreparedStatement baseInputStmt = null; try { - baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE, + baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG, Statement.RETURN_GENERATED_KEYS); - baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT, Statement.RETURN_GENERATED_KEYS); // Register a driver config as a job type with no owner/connector and direction - registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(), + registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(), MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn); - // We're using hardcoded value for driver config as they are - // represented as NULL in the database. - mDriver.setPersistenceId(1); } catch (SQLException ex) { logException(ex, mDriver); throw new SqoopException(DerbyRepoError.DERBYREPO_0014, ex); } finally { closeStatements(baseConfigStmt, baseInputStmt); } - createOrUpdateDriverSystemVersion(conn, mDriver.getVersion()); } /** * {@inheritDoc} */ @Override - public MDriver findDriver(Connection conn) { - LOG.debug("Looking up Driver config to create a driver "); - MDriver mDriver = null; + public MDriver findDriver(String shortName, Connection conn) { + LOG.debug("Looking up Driver and config "); + PreparedStatement driverFetchStmt = null; PreparedStatement driverConfigFetchStmt = null; PreparedStatement driverConfigInputFetchStmt = null; + + MDriver mDriver; try { - driverConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER); - driverConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); + driverFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE); + driverFetchStmt.setString(1, shortName); + + ResultSet rsDriverSet = driverFetchStmt.executeQuery(); + if (!rsDriverSet.next()) { + return null; + } + Long driverId = rsDriverSet.getLong(1); + String driverVersion = rsDriverSet.getString(4); + + driverConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); + driverConfigFetchStmt.setLong(1, driverId); + + driverConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT); List<MConfig> driverConfigs = new ArrayList<MConfig>(); loadDriverConfigs(driverConfigs, driverConfigFetchStmt, driverConfigInputFetchStmt, 1); - if(driverConfigs.isEmpty()) { + if (driverConfigs.isEmpty()) { return null; } - - mDriver = new MDriver(new MDriverConfig(driverConfigs), detectDriverVersion(conn)); - mDriver.setPersistenceId(1); + mDriver = new MDriver(new MDriverConfig(driverConfigs), driverVersion); + mDriver.setPersistenceId(driverId); } catch (SQLException ex) { - throw new SqoopException(DerbyRepoError.DERBYREPO_0004, - "Driver config", ex); + throw new SqoopException(DerbyRepoError.DERBYREPO_0004, "Driver", ex); } finally { if (driverConfigFetchStmt != null) { try { driverConfigFetchStmt.close(); } catch (SQLException ex) { - LOG.error("Unable to close config fetch statement", ex); + LOG.error("Unable to close driver config fetch statement", ex); } } if (driverConfigInputFetchStmt != null) { try { driverConfigInputFetchStmt.close(); } catch (SQLException ex) { - LOG.error("Unable to close input fetch statement", ex); + LOG.error("Unable to close driver input fetch statement", ex); + } + } + if (driverFetchStmt != null) { + try { + driverFetchStmt.close(); + } catch (SQLException ex) { + LOG.error("Unable to close driver fetch statement", ex); } } } - LOG.debug("Looking up Driver config and created driver:" + mDriver); + LOG.debug("Looked up Driver and config"); return mDriver; } @@ -1228,7 +1246,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { public List<MLink> findLinksForConnector(long connectorID, Connection conn) { PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR); + stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE); stmt.setLong(1, connectorID); return loadLinks(stmt, conn); } catch (SQLException ex) { @@ -1243,7 +1261,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public void upgradeConnectorConfigs(MConnector mConnector, Connection conn) { + public void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn) { updateConnectorAndDeleteConfigs(mConnector, conn); insertConfigsForConnector(mConnector, conn); } @@ -1253,13 +1271,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { PreparedStatement deleteConfig = null; PreparedStatement deleteInput = null; try { - updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR); - deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR); - deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONNECTOR); + updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE); + deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE); + deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE); updateConnectorStatement.setString(1, mConnector.getUniqueName()); updateConnectorStatement.setString(2, mConnector.getClassName()); updateConnectorStatement.setString(3, mConnector.getVersion()); - updateConnectorStatement.setLong(4, mConnector.getPersistenceId()); + updateConnectorStatement.setString(4, mConnector.getType().name()); + updateConnectorStatement.setLong(5, mConnector.getPersistenceId()); if (updateConnectorStatement.executeUpdate() != 1) { throw new SqoopException(DerbyRepoError.DERBYREPO_0038); @@ -1281,19 +1300,30 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override - public void upgradeDriverConfigs(MDriver mDriver, Connection conn) { + public void upgradeDriverAndConfigs(MDriver mDriver, Connection conn) { updateDriverAndDeleteConfigs(mDriver, conn); - createOrUpdateDriverSystemVersion(conn, mDriver.getVersion()); insertConfigsForDriver(mDriver, conn); } private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) { + PreparedStatement updateDriverStatement = null; PreparedStatement deleteConfig = null; PreparedStatement deleteInput = null; try { - deleteInput = conn.prepareStatement(STMT_DELETE_DRIVER_INPUTS); - deleteConfig = conn.prepareStatement(STMT_DELETE_DRIVER_CONFIGS); - + updateDriverStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE); + deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE); + deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE); + updateDriverStatement.setString(1, mDriver.getUniqueName()); + updateDriverStatement.setString(2, Driver.getClassName()); + updateDriverStatement.setString(3, mDriver.getVersion()); + updateDriverStatement.setString(4, mDriver.getType().name()); + updateDriverStatement.setLong(5, mDriver.getPersistenceId()); + + if (updateDriverStatement.executeUpdate() != 1) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0038); + } + deleteInput.setLong(1, mDriver.getPersistenceId()); + deleteConfig.setLong(1, mDriver.getPersistenceId()); deleteInput.executeUpdate(); deleteConfig.executeUpdate(); @@ -1301,7 +1331,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { logException(e, mDriver); throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e); } finally { - closeStatements(deleteConfig, deleteInput); + closeStatements(updateDriverStatement, deleteConfig, deleteInput); } } @@ -1557,7 +1587,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { public List<MJob> findJobsForConnector(long connectorId, Connection conn) { PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR); + stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE); stmt.setLong(1, connectorId); stmt.setLong(2, connectorId); return loadJobs(stmt, conn); @@ -2080,8 +2110,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { try { rsConnectors = stmt.executeQuery(); - connectorConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); - connectorConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); + connectorConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); + connectorConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT); while(rsConnectors.next()) { long connectorId = rsConnectors.getLong(1); @@ -2116,9 +2146,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } } finally { closeResultSets(rsConnectors); - closeStatements(connectorConfigFetchStmt,connectorConfigInputFetchStmt); + closeStatements(connectorConfigFetchStmt, connectorConfigInputFetchStmt); } - return connectors; } @@ -2134,7 +2163,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { rsConnection = stmt.executeQuery(); // - connectorConfigFetchStatement = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); + connectorConfigFetchStatement = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); connectorConfigInputStatement = conn.prepareStatement(STMT_FETCH_LINK_INPUT); while(rsConnection.next()) { @@ -2189,14 +2218,15 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { try { rsJob = stmt.executeQuery(); - - fromConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); - toConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR); - driverConfigfetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER); + // Note: Job does not hold a explicit reference to the driver since every + // job has the same driver + long driverId = this.findDriver(MDriver.DRIVER_NAME, conn).getPersistenceId(); + fromConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); + toConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); + driverConfigfetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE); jobInputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT); while(rsJob.next()) { - // why use connector? why cant it be link id? long fromConnectorId = rsJob.getLong(1); long toConnectorId = rsJob.getLong(2); long id = rsJob.getLong(3); @@ -2211,9 +2241,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { fromConfigFetchStmt.setLong(1, fromConnectorId); toConfigFetchStmt.setLong(1,toConnectorId); + driverConfigfetchStmt.setLong(1, driverId); jobInputFetchStmt.setLong(1, id); - //inputFetchStmt.setLong(1, XXX); // Will be filled by loadFrameworkConfigs jobInputFetchStmt.setLong(3, id); // FROM entity configs @@ -2283,7 +2313,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { * * Use given prepared statements to create entire config structure in database. * - * @param connectorId + * @param configurableId * @param configs * @param type * @param baseConfigStmt @@ -2292,17 +2322,17 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { * @return short number of configs registered. * @throws SQLException */ - private short registerConfigs(Long connectorId, Direction direction, + private short registerConfigs(Long configurableId, Direction direction, List<MConfig> configs, String type, PreparedStatement baseConfigStmt, PreparedStatement baseInputStmt, Connection conn) throws SQLException { short configIndex = 0; for (MConfig config : configs) { - if(connectorId == null) { + if (configurableId == null) { baseConfigStmt.setNull(1, Types.BIGINT); } else { - baseConfigStmt.setLong(1, connectorId); + baseConfigStmt.setLong(1, configurableId); } baseConfigStmt.setString(2, config.getName()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java index 59773e1..de08261 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java @@ -49,9 +49,14 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQD_NAME = "SQD_NAME"; // SQ_CONNECTOR + @Deprecated // used only for upgrade public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR"; + // SQ_CONFIGURABLE + public static final String TABLE_SQ_CONFIGURABLE_NAME = "SQ_CONFIGURABLE"; + @Deprecated // used only for upgrade public static final String TABLE_SQ_CONNECTOR = SCHEMA_PREFIX + TABLE_SQ_CONNECTOR_NAME; + public static final String TABLE_SQ_CONFIGURABLE = SCHEMA_PREFIX + TABLE_SQ_CONFIGURABLE_NAME; public static final String COLUMN_SQC_ID = "SQC_ID"; @@ -61,6 +66,8 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQC_VERSION = "SQC_VERSION"; + public static final String COLUMN_SQC_TYPE = "SQC_TYPE"; + // SQ_CONNECTOR_DIRECTIONS public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS"; @@ -75,12 +82,10 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION"; public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC"; - // FK to the SQ_CONNECTOR table public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME; public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD"; - // FK to the SQ_DIRECTION able public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME; @@ -99,7 +104,10 @@ public final class DerbySchemaConstants { @Deprecated // used only for upgrade public static final String COLUMN_SQF_CONNECTOR = "SQF_CONNECTOR"; + @Deprecated // used only for upgrade path public static final String COLUMN_SQ_CFG_CONNECTOR = "SQ_CFG_CONNECTOR"; + // note this column was renamed again + public static final String COLUMN_SQ_CFG_CONFIGURABLE = "SQ_CFG_CONFIGURABLE"; @Deprecated // used only for upgrade public static final String COLUMN_SQF_OPERATION = "SQF_OPERATION"; @@ -125,8 +133,8 @@ public final class DerbySchemaConstants { @Deprecated // used only for upgrade public static final String CONSTRAINT_SQF_SQC = SCHEMA_PREFIX + CONSTRAINT_SQF_SQC_NAME; + // FK constraint on configurable public static final String CONSTRAINT_SQ_CFG_SQC_NAME = CONSTRAINT_PREFIX + "SQ_CFG_SQC"; - public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME; // SQ_CONFIG_DIRECTIONS @@ -202,7 +210,11 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQ_LNK_NAME = "SQ_LNK_NAME"; @Deprecated // used only for upgrade public static final String COLUMN_SQN_CONNECTOR = "SQN_CONNECTOR"; + @Deprecated // used only for upgrade public static final String COLUMN_SQ_LNK_CONNECTOR = "SQ_LNK_CONNECTOR"; + // Note this column has been renamed twice + public static final String COLUMN_SQ_LNK_CONFIGURABLE = "SQ_LNK_CONFIGURABLE"; + @Deprecated // used only for upgrade public static final String COLUMN_SQN_CREATION_USER = "SQN_CREATION_USER"; public static final String COLUMN_SQ_LNK_CREATION_USER = "SQ_LNK_CREATION_USER"; @@ -225,10 +237,10 @@ public final class DerbySchemaConstants { @Deprecated public static final String CONSTRAINT_SQN_SQC = SCHEMA_PREFIX + CONSTRAINT_SQN_SQC_NAME; + // FK constraint on the connector configurable public static final String CONSTRAINT_SQ_LNK_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_SQC_NAME; public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME = CONSTRAINT_PREFIX + "SQ_LNK_NAME_UNIQUE"; - public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME; // SQ_JOB @@ -437,12 +449,12 @@ public final class DerbySchemaConstants { static { tablesV1 = new HashSet<String>(); tablesV1.add(TABLE_SQ_CONNECTOR_NAME); - tablesV1.add(TABLE_SQ_LINK_NAME); - tablesV1.add(TABLE_SQ_LINK_INPUT_NAME); + tablesV1.add(TABLE_SQ_CONNECTION_NAME); + tablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME); tablesV1.add(TABLE_SQ_COUNTER_NAME); tablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME); tablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME); - tablesV1.add(TABLE_SQ_CONFIG_NAME); + tablesV1.add(TABLE_SQ_FORM_NAME); tablesV1.add(TABLE_SQ_INPUT_NAME); tablesV1.add(TABLE_SQ_JOB_NAME); tablesV1.add(TABLE_SQ_JOB_INPUT_NAME);
