[FLINK-6781] Make statement fetch size configurable in JDBCInputFormat.

This closes #4036.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5605107b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5605107b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5605107b

Branch: refs/heads/master
Commit: 5605107bd59f10fc9ca7d9ac53fa8d60ecdcfdbc
Parents: c041dd8
Author: Maximilian Bode <[email protected]>
Authored: Wed May 31 18:46:55 2017 +0200
Committer: zentol <[email protected]>
Committed: Fri Jun 2 15:13:55 2017 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/jdbc/JDBCInputFormat.java | 17 ++++++++
 .../api/java/io/jdbc/JDBCInputFormatTest.java   | 43 ++++++++++++++++++++
 2 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index 835fb23..b7ac744 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
@@ -30,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,6 +115,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, 
InputSplit> implements
        private transient Connection dbConn;
        private transient PreparedStatement statement;
        private transient ResultSet resultSet;
+       private int fetchSize;
 
        private boolean hasNext;
        private Object[][] parameterValues;
@@ -141,6 +144,9 @@ public class JDBCInputFormat extends RichInputFormat<Row, 
InputSplit> implements
                                dbConn = DriverManager.getConnection(dbURL, 
username, password);
                        }
                        statement = dbConn.prepareStatement(queryTemplate, 
resultSetType, resultSetConcurrency);
+                       if (fetchSize > 0) {
+                               statement.setFetchSize(fetchSize);
+                       }
                } catch (SQLException se) {
                        throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
                } catch (ClassNotFoundException cnfe) {
@@ -312,6 +318,11 @@ public class JDBCInputFormat extends RichInputFormat<Row, 
InputSplit> implements
                return new DefaultInputSplitAssigner(inputSplits);
        }
 
+       @VisibleForTesting
+       PreparedStatement getStatement() {
+               return statement;
+       }
+
        /**
         * A builder used to set parameters to the output format's 
configuration in a fluent way.
         * @return builder
@@ -378,6 +389,12 @@ public class JDBCInputFormat extends RichInputFormat<Row, 
InputSplit> implements
                        return this;
                }
 
+               public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
+                       Preconditions.checkArgument(fetchSize > 0, "Illegal 
value %s for fetchSize, has to be positive.", fetchSize);
+                       format.fetchSize = fetchSize;
+                       return this;
+               }
+
                public JDBCInputFormat finish() {
                        if (format.username == null) {
                                LOG.info("Username was not supplied 
separately.");

http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index b1416ea..f7a86e5 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -30,7 +30,9 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 
 /**
  * Tests for the {@link JDBCInputFormat}.
@@ -100,6 +102,47 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                                .finish();
        }
 
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidFetchSize() {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                       .setDrivername(DRIVER_CLASS)
+                       .setDBUrl(DB_URL)
+                       .setQuery(SELECT_ALL_BOOKS)
+                       .setRowTypeInfo(ROW_TYPE_INFO)
+                       .setFetchSize(-7)
+                       .finish();
+       }
+
+       @Test
+       public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws 
SQLException, ClassNotFoundException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                       .setDrivername(DRIVER_CLASS)
+                       .setDBUrl(DB_URL)
+                       .setQuery(SELECT_ALL_BOOKS)
+                       .setRowTypeInfo(ROW_TYPE_INFO)
+                       .finish();
+               jdbcInputFormat.openInputFormat();
+
+               Class.forName(DRIVER_CLASS);
+               final int defaultFetchSize = 
DriverManager.getConnection(DB_URL).createStatement().getFetchSize();
+
+               Assert.assertEquals(defaultFetchSize, 
jdbcInputFormat.getStatement().getFetchSize());
+       }
+
+       @Test
+       public void testFetchSizeCanBeConfigured() throws SQLException {
+               final int desiredFetchSize = 10_000;
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                       .setDrivername(DRIVER_CLASS)
+                       .setDBUrl(DB_URL)
+                       .setQuery(SELECT_ALL_BOOKS)
+                       .setRowTypeInfo(ROW_TYPE_INFO)
+                       .setFetchSize(desiredFetchSize)
+                       .finish();
+               jdbcInputFormat.openInputFormat();
+               Assert.assertEquals(desiredFetchSize, 
jdbcInputFormat.getStatement().getFetchSize());
+       }
+
        @Test
        public void testJDBCInputFormatWithoutParallelism() throws IOException {
                jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()

Reply via email to