Repository: sqoop Updated Branches: refs/heads/sqoop2 d69bd34e0 -> 72a9d4383
SQOOP-2352: Sqoop2: Generic JDBC Connector support for fetch size (Abraham Fine via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/72a9d438 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/72a9d438 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/72a9d438 Branch: refs/heads/sqoop2 Commit: 72a9d4383156517f2d335f23e23e327d10e01dd1 Parents: d69bd34 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Oct 14 14:11:20 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Oct 14 14:11:20 2015 -0700 ---------------------------------------------------------------------- .../connector/jdbc/GenericJdbcExecutor.java | 36 ++++++++++++++------ .../jdbc/GenericJdbcFromInitializer.java | 8 ++--- .../jdbc/GenericJdbcToInitializer.java | 3 +- .../jdbc/configuration/LinkConfig.java | 4 +++ .../generic-jdbc-connector-config.properties | 5 +++ .../connector/jdbc/GenericJdbcExecutorTest.java | 9 ++++- .../jdbc/GenericJdbcTestConstants.java | 1 + .../apache/sqoop/connector/jdbc/TestLoader.java | 3 +- 8 files changed, 49 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java index ad6f649..ff33a4b 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java @@ -130,15 +130,34 @@ public class GenericJdbcExecutor { return connection; } - public PreparedStatement createStatement(String sql) { - try { - return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + public Statement createStatement() { + try { + Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + setFetchSize(statement); + return statement; } catch (SQLException e) { logSQLException(e); throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e); } } + public PreparedStatement prepareStatement(String sql) { + try { + PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + setFetchSize(preparedStatement); + return preparedStatement; + } catch (SQLException e) { + logSQLException(e); + throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e); + } + } + + private void setFetchSize(Statement statement) throws SQLException { + if(link.linkConfig.fetchSize != null) { + statement.setFetchSize(link.linkConfig.fetchSize); + } + } + public void setAutoCommit(boolean autoCommit) { try { connection.setAutoCommit(autoCommit); @@ -217,7 +236,7 @@ public class GenericJdbcExecutor { final long expectedInsertCount = getTableRowCount(fromTable); oldAutoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); - stmt = connection.createStatement(); + stmt = createStatement(); final int actualInsertCount = stmt.executeUpdate(insertQuery); if(expectedInsertCount == actualInsertCount) { LOG.info("Transferred " + actualInsertCount + " rows of staged data " + @@ -255,8 +274,7 @@ public class GenericJdbcExecutor { } public long getTableRowCount(String tableName) { - try (Statement statement = connection.createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try (Statement statement = createStatement(); ResultSet resultSet = statement.executeQuery("SELECT COUNT(1) FROM " + encloseIdentifier(tableName));) { resultSet.next(); return resultSet.getLong(1); @@ -267,8 +285,7 @@ public class GenericJdbcExecutor { } public void executeUpdate(String sql) { - try (Statement statement = connection.createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + try (Statement statement = createStatement()) { statement.executeUpdate(sql); } catch (SQLException e) { logSQLException(e); @@ -425,8 +442,7 @@ public class GenericJdbcExecutor { } public String[] getQueryColumns(String query) { - try (Statement statement = connection.createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try (Statement statement = createStatement(); ResultSet rs = statement.executeQuery(query);) { ResultSetMetaData rsmd = rs.getMetaData(); int count = rsmd.getColumnCount(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java index 5a357bd..c2d22f7 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java @@ -82,8 +82,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F Schema schema = new Schema(schemaName); ResultSetMetaData rsmt = null; - try (Statement statement = executor.getConnection().createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try (Statement statement = executor.createStatement(); ResultSet rs = statement.executeQuery(context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL) .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));) { @@ -172,8 +171,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F String incrementalNewMaxValueQuery = sb.toString(); LOG.info("Incremental new max value query: " + incrementalNewMaxValueQuery); - try (Statement statement = executor.getConnection().createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try (Statement statement = executor.createStatement(); ResultSet rs = statement.executeQuery(incrementalNewMaxValueQuery);) { if (!rs.next()) { throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022); @@ -208,7 +206,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F PreparedStatement ps = null; ResultSet rs = null; try { - ps = executor.createStatement(minMaxQuery); + ps = executor.prepareStatement(minMaxQuery); if (incrementalImport) { ps.setString(1, jobConf.incrementalRead.lastValue); ps.setString(2, incrementalMaxValue); http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java index ed215ea..fc49061 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java @@ -69,8 +69,7 @@ public class GenericJdbcToInitializer extends Initializer<LinkConfiguration, ToJ assert schemaName != null; Schema schema = new Schema(schemaName); - try (Statement statement = executor.getConnection().createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try (Statement statement = executor.createStatement(); ResultSet rs = statement.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0");) { ResultSetMetaData rsmt = rs.getMetaData(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java index 885c6f5..ea6b85e 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java @@ -22,6 +22,7 @@ import org.apache.sqoop.model.Input; import org.apache.sqoop.model.Validator; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.validators.AbstractValidator; +import org.apache.sqoop.validation.validators.InRange; import org.apache.sqoop.validation.validators.NotEmpty; import org.apache.sqoop.validation.validators.ClassAvailable; import org.apache.sqoop.validation.validators.StartsWith; @@ -48,6 +49,9 @@ public class LinkConfig { @Input(size = 40, sensitive = true) public String password; + @Input(validators = {@Validator(value = InRange.class, strArg = "0," + Integer.MAX_VALUE)}) + public Integer fetchSize; + @Input public Map<String, String> jdbcProperties; http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties index 73fa308..6defb26 100644 --- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties +++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties @@ -43,6 +43,11 @@ linkConfig.password.label = Password linkConfig.password.help = Enter the password to be used for connecting to the \ database. +# fetch size int +linkConfig.fetchSize.label = Fetch Size +linkConfig.fetchSize.help = Optional hint for JDBC fetch size. See \ + http://docs.oracle.com/javase/7/docs/api/java/sql/Statement.html#setFetchSize(int) + # jdbc properties linkConfig.jdbcProperties.label = JDBC Connection Properties linkConfig.jdbcProperties.help = Enter any JDBC properties that should be \ http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java index 5587840..3e756a1 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java @@ -28,7 +28,6 @@ import java.sql.SQLException; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; public class GenericJdbcExecutorTest { @@ -158,4 +157,12 @@ public class GenericJdbcExecutorTest { assertEquals(NUMBER_OF_ROWS, executor.getTableRowCount(table), "Table " + table + " is expected to be empty."); } + + @Test + public void testFetchSize() throws Exception { + assertEquals((int) GenericJdbcTestConstants.LINK_CONFIGURATION.linkConfig.fetchSize, + executor.createStatement().getFetchSize()); + assertEquals((int) GenericJdbcTestConstants.LINK_CONFIGURATION.linkConfig.fetchSize, + executor.prepareStatement("SELECT * FROM " + table).getFetchSize()); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java index e16c631..4c313a6 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java @@ -43,5 +43,6 @@ public class GenericJdbcTestConstants { static { LINK_CONFIGURATION.linkConfig.jdbcDriver = DRIVER; LINK_CONFIGURATION.linkConfig.connectionString = URL; + LINK_CONFIGURATION.linkConfig.fetchSize = 25; } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java index dfacc20..83411fb 100644 --- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java +++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java @@ -111,8 +111,7 @@ public class TestLoader { loader.load(loaderContext, linkConfig, jobConfig); int index = START; - try (Statement statement = executor.getConnection().createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + try (Statement statement = executor.createStatement(); ResultSet rs = statement.executeQuery("SELECT * FROM " + executor.encloseIdentifier(tableName) + " ORDER BY ICOL");) { while (rs.next()) {
