[FLINK-6781] Make statement fetch size configurable in JDBCInputFormat. This closes #4036.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5605107b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5605107b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5605107b Branch: refs/heads/master Commit: 5605107bd59f10fc9ca7d9ac53fa8d60ecdcfdbc Parents: c041dd8 Author: Maximilian Bode <[email protected]> Authored: Wed May 31 18:46:55 2017 +0200 Committer: zentol <[email protected]> Committed: Fri Jun 2 15:13:55 2017 +0200 ---------------------------------------------------------------------- .../flink/api/java/io/jdbc/JDBCInputFormat.java | 17 ++++++++ .../api/java/io/jdbc/JDBCInputFormatTest.java | 43 ++++++++++++++++++++ 2 files changed, 60 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java index 835fb23..b7ac744 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.io.jdbc; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; @@ -30,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +115,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements private transient Connection dbConn; private transient PreparedStatement statement; private transient ResultSet resultSet; + private int fetchSize; private boolean hasNext; private Object[][] parameterValues; @@ -141,6 +144,9 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements dbConn = DriverManager.getConnection(dbURL, username, password); } statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); + if (fetchSize > 0) { + statement.setFetchSize(fetchSize); + } } catch (SQLException se) { throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } catch (ClassNotFoundException cnfe) { @@ -312,6 +318,11 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements return new DefaultInputSplitAssigner(inputSplits); } + @VisibleForTesting + PreparedStatement getStatement() { + return statement; + } + /** * A builder used to set parameters to the output format's configuration in a fluent way. * @return builder @@ -378,6 +389,12 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements return this; } + public JDBCInputFormatBuilder setFetchSize(int fetchSize) { + Preconditions.checkArgument(fetchSize > 0, "Illegal value %s for fetchSize, has to be positive.", fetchSize); + format.fetchSize = fetchSize; + return this; + } + public JDBCInputFormat finish() { if (format.username == null) { LOG.info("Username was not supplied separately."); http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java index b1416ea..f7a86e5 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -30,7 +30,9 @@ import org.junit.Test; import java.io.IOException; import java.io.Serializable; +import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.SQLException; /** * Tests for the {@link JDBCInputFormat}. @@ -100,6 +102,47 @@ public class JDBCInputFormatTest extends JDBCTestBase { .finish(); } + @Test(expected = IllegalArgumentException.class) + public void testInvalidFetchSize() { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(ROW_TYPE_INFO) + .setFetchSize(-7) + .finish(); + } + + @Test + public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(ROW_TYPE_INFO) + .finish(); + jdbcInputFormat.openInputFormat(); + + Class.forName(DRIVER_CLASS); + final int defaultFetchSize = DriverManager.getConnection(DB_URL).createStatement().getFetchSize(); + + Assert.assertEquals(defaultFetchSize, jdbcInputFormat.getStatement().getFetchSize()); + } + + @Test + public void testFetchSizeCanBeConfigured() throws SQLException { + final int desiredFetchSize = 10_000; + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(ROW_TYPE_INFO) + .setFetchSize(desiredFetchSize) + .finish(); + jdbcInputFormat.openInputFormat(); + Assert.assertEquals(desiredFetchSize, jdbcInputFormat.getStatement().getFetchSize()); + } + @Test public void testJDBCInputFormatWithoutParallelism() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
