[ 
https://issues.apache.org/jira/browse/BEAM-3714?focusedWorklogId=93446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93446
 ]

ASF GitHub Bot logged work on BEAM-3714:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/18 21:18
            Start Date: 20/Apr/18 21:18
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5109: [BEAM-3714]modified 
result set to be forward only and read only
URL: https://github.com/apache/beam/pull/5109
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 31481e28577..cd13a24d142 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -159,7 +159,9 @@
    * @param <T> Type of the data to be read.
    */
   public static <T> Read<T> read() {
-    return new AutoValue_JdbcIO_Read.Builder<T>().build();
+    return new AutoValue_JdbcIO_Read.Builder<T>()
+            .setFetchSize(DEFAULT_FETCH_SIZE)
+            .build();
   }
 
   /**
@@ -170,10 +172,13 @@
    * @param <OutputT> Type of the data to be read.
    */
   public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
-    return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>().build();
+    return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
+            .setFetchSize(DEFAULT_FETCH_SIZE)
+            .build();
   }
 
   private static final long DEFAULT_BATCH_SIZE = 1000L;
+  private static final int DEFAULT_FETCH_SIZE = 50_000;
 
   /**
    * Write data to a JDBC datasource.
@@ -372,6 +377,7 @@ DataSource buildDatasource() throws Exception {
     @Nullable abstract StatementPreparator getStatementPreparator();
     @Nullable abstract RowMapper<T> getRowMapper();
     @Nullable abstract Coder<T> getCoder();
+    abstract int getFetchSize();
 
     abstract Builder<T> toBuilder();
 
@@ -382,6 +388,7 @@ DataSource buildDatasource() throws Exception {
       abstract Builder<T> setStatementPreparator(StatementPreparator 
statementPreparator);
       abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
       abstract Builder<T> setCoder(Coder<T> coder);
+      abstract Builder<T> setFetchSize(int fetchSize);
       abstract Read<T> build();
     }
 
@@ -414,6 +421,16 @@ DataSource buildDatasource() throws Exception {
       return toBuilder().setCoder(coder).build();
     }
 
+    /**
+     * This method is used to set the size of the data that is going to be 
fetched and loaded in
+     * memory per every database call. Please refer to: {@link 
java.sql.Statement#setFetchSize(int)}
+     * It should ONLY be used if the default value throws memory errors.
+     */
+    public Read<T> withFetchSize(int fetchSize) {
+      checkArgument(fetchSize > 0, "fetch size must be > 0");
+      return toBuilder().setFetchSize(fetchSize).build();
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkArgument(getQuery() != null, "withQuery() is required");
@@ -430,6 +447,7 @@ DataSource buildDatasource() throws Exception {
                   .withQuery(getQuery())
                   .withCoder(getCoder())
                   .withRowMapper(getRowMapper())
+                  .withFetchSize(getFetchSize())
                   .withParameterSetter(
                       (element, preparedStatement) -> {
                         if (getStatementPreparator() != null) {
@@ -459,6 +477,7 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     @Nullable abstract PreparedStatementSetter<ParameterT> 
getParameterSetter();
     @Nullable abstract RowMapper<OutputT> getRowMapper();
     @Nullable abstract Coder<OutputT> getCoder();
+    abstract int getFetchSize();
 
     abstract Builder<ParameterT, OutputT> toBuilder();
 
@@ -471,6 +490,7 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
               PreparedStatementSetter<ParameterT> parameterSetter);
       abstract Builder<ParameterT, OutputT> setRowMapper(RowMapper<OutputT> 
rowMapper);
       abstract Builder<ParameterT, OutputT> setCoder(Coder<OutputT> coder);
+      abstract Builder<ParameterT, OutputT> setFetchSize(int fetchSize);
       abstract ReadAll<ParameterT, OutputT> build();
     }
 
@@ -508,6 +528,16 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return toBuilder().setCoder(coder).build();
     }
 
+    /**
+     * This method is used to set the size of the data that is going to be 
fetched and loaded in
+     * memory per every database call. Please refer to: {@link 
java.sql.Statement#setFetchSize(int)}
+     * It should ONLY be used if the default value throws memory errors.
+     */
+    public ReadAll<ParameterT, OutputT> withFetchSize(int fetchSize) {
+      checkArgument(fetchSize > 0, "fetch size must be >0");
+      return toBuilder().setFetchSize(fetchSize).build();
+    }
+
     @Override
     public PCollection<OutputT> expand(PCollection<ParameterT> input) {
       return input
@@ -517,7 +547,8 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
                       getDataSourceConfiguration(),
                       getQuery(),
                       getParameterSetter(),
-                      getRowMapper())))
+                      getRowMapper(),
+                      getFetchSize())))
           .setCoder(getCoder())
           .apply(new Reparallelize<>());
     }
@@ -538,6 +569,7 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     private final ValueProvider<String> query;
     private final PreparedStatementSetter<ParameterT> parameterSetter;
     private final RowMapper<OutputT> rowMapper;
+    private final int fetchSize;
 
     private DataSource dataSource;
     private Connection connection;
@@ -546,11 +578,12 @@ private ReadFn(
         DataSourceConfiguration dataSourceConfiguration,
         ValueProvider<String> query,
         PreparedStatementSetter<ParameterT> parameterSetter,
-        RowMapper<OutputT> rowMapper) {
+        RowMapper<OutputT> rowMapper, int fetchSize) {
       this.dataSourceConfiguration = dataSourceConfiguration;
       this.query = query;
       this.parameterSetter = parameterSetter;
       this.rowMapper = rowMapper;
+      this.fetchSize = fetchSize;
     }
 
     @Setup
@@ -561,7 +594,9 @@ public void setup() throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext context) throws Exception {
-      try (PreparedStatement statement = 
connection.prepareStatement(query.get())) {
+      try (PreparedStatement statement = 
connection.prepareStatement(query.get(),
+              ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+        statement.setFetchSize(fetchSize);
         parameterSetter.setParameters(context.element(), statement);
         try (ResultSet resultSet = statement.executeQuery()) {
           while (resultSet.next()) {
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index b53b2f7445b..f9ae21a9157 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -224,6 +224,7 @@ private static void addInitialData(DataSource dataSource, 
String tableName)
   public void testRead() throws Exception {
     PCollection<TestRow> rows = pipeline.apply(
         JdbcIO.<TestRow>read()
+            .withFetchSize(12)
             
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
             .withQuery("select name,id from " + readTableName)
             .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 93446)
    Time Spent: 3h 50m  (was: 3h 40m)

> JdbcIO.read() should create a forward-only, read-only result set
> ----------------------------------------------------------------
>
>                 Key: BEAM-3714
>                 URL: https://issues.apache.org/jira/browse/BEAM-3714
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jdbc
>            Reporter: Eugene Kirpichov
>            Assignee: Innocent
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> [https://stackoverflow.com/questions/48784889/streaming-data-from-cloudsql-into-dataflow/48819934#48819934]
>  - a user is trying to load a large table from MySQL, and the MySQL JDBC 
> driver requires special measures when loading large result sets.
> JdbcIO currently calls simply "connection.prepareStatement(query)" 
> https://github.com/apache/beam/blob/bb8c12c4956cbe3c6f2e57113e7c0ce2a5c05009/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L508
>  - it should specify type TYPE_FORWARD_ONLY and concurrency CONCUR_READ_ONLY 
> - these values should always be used.
> Seems that different databases have different requirements for streaming 
> result sets.
> E.g. MySQL requires setting fetch size; PostgreSQL says "The Connection must 
> not be in autocommit mode." 
> https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor . 
> Oracle, I think, doesn't have any special requirements but I don't know. 
> Fetch size should probably still be set to a reasonably large value.
> Seems that the common denominator of these requirements is: set fetch size to 
> a reasonably large but not maximum value; disable autocommit (there's nothing 
> to commit in read() anyway).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to