This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9394f8561d2 [JdbcIO] Adding disableAutoCommit flag (#32988)
9394f8561d2 is described below

commit 9394f8561d29509a6e67fcd66254197bd0d46b2e
Author: Chris Ashcraft <[email protected]>
AuthorDate: Tue Nov 12 18:58:38 2024 -0600

    [JdbcIO] Adding disableAutoCommit flag (#32988)
    
    * adding disableAutoCommit flag to ReadFn
    
    ---------
    
    Co-authored-by: Chris Ashcraft <[email protected]>
---
 CHANGES.md                                         |  1 +
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 86 ++++++++++++++++++++--
 .../io/jdbc/JdbcReadSchemaTransformProvider.java   |  9 +++
 .../beam/sdk/io/jdbc/JdbcSchemaIOProvider.java     | 11 +++
 sdks/python/apache_beam/io/jdbc.py                 |  5 ++
 5 files changed, 106 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 261fafc024f..c5731bcff31 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -94,6 +94,7 @@
 * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 * (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners 
([#18592](https://github.com/apache/beam/issues/18592), 
[#31381](https://github.com/apache/beam/issues/31381)).
 * (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming 
Java Legacy Runner without Streaming Engine 
([#32892](https://github.com/apache/beam/issues/32892)).
+* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn 
([#31111](https://github.com/apache/beam/issues/31111))
 
 ## Security Fixes
 * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] 
(Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 2f164fa3bb7..946c07f5576 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -333,6 +333,7 @@ public class JdbcIO {
     return new AutoValue_JdbcIO_Read.Builder<T>()
         .setFetchSize(DEFAULT_FETCH_SIZE)
         .setOutputParallelization(true)
+        .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
         .build();
   }
 
@@ -341,6 +342,7 @@ public class JdbcIO {
     return new AutoValue_JdbcIO_ReadRows.Builder()
         .setFetchSize(DEFAULT_FETCH_SIZE)
         .setOutputParallelization(true)
+        .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
         .setStatementPreparator(ignored -> {})
         .build();
   }
@@ -356,6 +358,7 @@ public class JdbcIO {
     return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
         .setFetchSize(DEFAULT_FETCH_SIZE)
         .setOutputParallelization(true)
+        .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
         .build();
   }
 
@@ -372,6 +375,7 @@ public class JdbcIO {
         .setPartitionColumnType(partitioningColumnType)
         .setNumPartitions(DEFAULT_NUM_PARTITIONS)
         .setFetchSize(DEFAULT_FETCH_SIZE)
+        .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
         .setUseBeamSchema(false)
         .build();
   }
@@ -389,6 +393,7 @@ public class JdbcIO {
         .setPartitionsHelper(partitionsHelper)
         .setNumPartitions(DEFAULT_NUM_PARTITIONS)
         .setFetchSize(DEFAULT_FETCH_SIZE)
+        .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
         .setUseBeamSchema(false)
         .build();
   }
@@ -400,6 +405,7 @@ public class JdbcIO {
   private static final long DEFAULT_BATCH_SIZE = 1000L;
   private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L;
   private static final int DEFAULT_FETCH_SIZE = 50_000;
+  private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true;
   // Default values used from fluent backoff.
   private static final Duration DEFAULT_INITIAL_BACKOFF = 
Duration.standardSeconds(1);
   private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = 
Duration.standardDays(1000);
@@ -733,6 +739,9 @@ public class JdbcIO {
     @Pure
     abstract boolean getOutputParallelization();
 
+    @Pure
+    abstract boolean getDisableAutoCommit();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -748,6 +757,8 @@ public class JdbcIO {
 
       abstract Builder setOutputParallelization(boolean outputParallelization);
 
+      abstract Builder setDisableAutoCommit(boolean disableAutoCommit);
+
       abstract ReadRows build();
     }
 
@@ -799,6 +810,15 @@ public class JdbcIO {
       return 
toBuilder().setOutputParallelization(outputParallelization).build();
     }
 
+    /**
+     * Whether to disable auto commit on read. Defaults to true if not 
provided. The need for this
+     * config varies depending on the database platform. Informix requires 
this to be set to false
+     * while Postgres requires this to be set to true.
+     */
+    public ReadRows withDisableAutoCommit(boolean disableAutoCommit) {
+      return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+    }
+
     @Override
     public PCollection<Row> expand(PBegin input) {
       ValueProvider<String> query = checkStateNotNull(getQuery(), "withQuery() 
is required");
@@ -816,6 +836,7 @@ public class JdbcIO {
                   .withCoder(RowCoder.of(schema))
                   .withRowMapper(SchemaUtil.BeamRowMapper.of(schema))
                   .withFetchSize(getFetchSize())
+                  .withDisableAutoCommit(getDisableAutoCommit())
                   .withOutputParallelization(getOutputParallelization())
                   
.withStatementPreparator(checkStateNotNull(getStatementPreparator())));
       rows.setRowSchema(schema);
@@ -872,6 +893,9 @@ public class JdbcIO {
     @Pure
     abstract boolean getOutputParallelization();
 
+    @Pure
+    abstract boolean getDisableAutoCommit();
+
     @Pure
     abstract Builder<T> toBuilder();
 
@@ -892,6 +916,8 @@ public class JdbcIO {
 
       abstract Builder<T> setOutputParallelization(boolean 
outputParallelization);
 
+      abstract Builder<T> setDisableAutoCommit(boolean disableAutoCommit);
+
       abstract Read<T> build();
     }
 
@@ -958,6 +984,15 @@ public class JdbcIO {
       return 
toBuilder().setOutputParallelization(outputParallelization).build();
     }
 
+    /**
+     * Whether to disable auto commit on read. Defaults to true if not 
provided. The need for this
+     * config varies depending on the database platform. Informix requires 
this to be set to false
+     * while Postgres requires this to be set to true.
+     */
+    public Read<T> withDisableAutoCommit(boolean disableAutoCommit) {
+      return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
       ValueProvider<String> query = checkArgumentNotNull(getQuery(), 
"withQuery() is required");
@@ -974,6 +1009,7 @@ public class JdbcIO {
               .withRowMapper(rowMapper)
               .withFetchSize(getFetchSize())
               .withOutputParallelization(getOutputParallelization())
+              .withDisableAutoCommit(getDisableAutoCommit())
               .withParameterSetter(
                   (element, preparedStatement) -> {
                     if (getStatementPreparator() != null) {
@@ -1029,6 +1065,8 @@ public class JdbcIO {
 
     abstract boolean getOutputParallelization();
 
+    abstract boolean getDisableAutoCommit();
+
     abstract Builder<ParameterT, OutputT> toBuilder();
 
     @AutoValue.Builder
@@ -1049,6 +1087,8 @@ public class JdbcIO {
 
       abstract Builder<ParameterT, OutputT> setOutputParallelization(boolean 
outputParallelization);
 
+      abstract Builder<ParameterT, OutputT> setDisableAutoCommit(boolean 
disableAutoCommit);
+
       abstract ReadAll<ParameterT, OutputT> build();
     }
 
@@ -1127,6 +1167,15 @@ public class JdbcIO {
       return 
toBuilder().setOutputParallelization(outputParallelization).build();
     }
 
+    /**
+     * Whether to disable auto commit on read. Defaults to true if not 
provided. The need for this
+     * config varies depending on the database platform. Informix requires 
this to be set to false
+     * while Postgres requires this to be set to true.
+     */
+    public ReadAll<ParameterT, OutputT> withDisableAutoCommit(boolean 
disableAutoCommit) {
+      return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+    }
+
     private @Nullable Coder<OutputT> inferCoder(
         CoderRegistry registry, SchemaRegistry schemaRegistry) {
       if (getCoder() != null) {
@@ -1173,7 +1222,8 @@ public class JdbcIO {
                           checkStateNotNull(getQuery()),
                           checkStateNotNull(getParameterSetter()),
                           checkStateNotNull(getRowMapper()),
-                          getFetchSize())))
+                          getFetchSize(),
+                          getDisableAutoCommit())))
               .setCoder(coder);
 
       if (getOutputParallelization()) {
@@ -1254,6 +1304,9 @@ public class JdbcIO {
     @Pure
     abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT> 
getPartitionsHelper();
 
+    @Pure
+    abstract boolean getDisableAutoCommit();
+
     @Pure
     abstract Builder<T, PartitionColumnT> toBuilder();
 
@@ -1287,6 +1340,8 @@ public class JdbcIO {
       abstract Builder<T, PartitionColumnT> setPartitionsHelper(
           JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper);
 
+      abstract Builder<T, PartitionColumnT> setDisableAutoCommit(boolean 
disableAutoCommit);
+
       abstract ReadWithPartitions<T, PartitionColumnT> build();
     }
 
@@ -1337,6 +1392,16 @@ public class JdbcIO {
       return toBuilder().setFetchSize(fetchSize).build();
     }
 
+    /**
+     * Whether to disable auto commit on read. Defaults to true if not 
provided. The need for this
+     * config varies depending on the database platform. Informix requires 
this to be set to false
+     * while Postgres requires this to be set to true.
+     */
+    public ReadWithPartitions<T, PartitionColumnT> withDisableAutoCommit(
+        boolean disableAutoCommit) {
+      return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+    }
+
     /** Data output type is {@link Row}, and schema is auto-inferred from the 
database. */
     public ReadWithPartitions<T, PartitionColumnT> withRowOutput() {
       return toBuilder().setUseBeamSchema(true).build();
@@ -1419,7 +1484,8 @@ public class JdbcIO {
                         .withQuery(query)
                         .withDataSourceProviderFn(dataSourceProviderFn)
                         .withRowMapper(checkStateNotNull(partitionsHelper))
-                        .withFetchSize(getFetchSize()))
+                        .withFetchSize(getFetchSize())
+                        .withDisableAutoCommit(getDisableAutoCommit()))
                 .apply(
                     MapElements.via(
                         new SimpleFunction<
@@ -1487,7 +1553,8 @@ public class JdbcIO {
               .withRowMapper(rowMapper)
               .withFetchSize(getFetchSize())
               .withParameterSetter(checkStateNotNull(partitionsHelper))
-              .withOutputParallelization(false);
+              .withOutputParallelization(false)
+              .withDisableAutoCommit(getDisableAutoCommit());
 
       if (getUseBeamSchema()) {
         checkStateNotNull(schema);
@@ -1537,6 +1604,7 @@ public class JdbcIO {
     private final PreparedStatementSetter<ParameterT> parameterSetter;
     private final RowMapper<OutputT> rowMapper;
     private final int fetchSize;
+    private final boolean disableAutoCommit;
 
     private @Nullable DataSource dataSource;
     private @Nullable Connection connection;
@@ -1546,12 +1614,14 @@ public class JdbcIO {
         ValueProvider<String> query,
         PreparedStatementSetter<ParameterT> parameterSetter,
         RowMapper<OutputT> rowMapper,
-        int fetchSize) {
+        int fetchSize,
+        boolean disableAutoCommit) {
       this.dataSourceProviderFn = dataSourceProviderFn;
       this.query = query;
       this.parameterSetter = parameterSetter;
       this.rowMapper = rowMapper;
       this.fetchSize = fetchSize;
+      this.disableAutoCommit = disableAutoCommit;
     }
 
     @Setup
@@ -1577,8 +1647,12 @@ public class JdbcIO {
       Connection connection = getConnection();
       // PostgreSQL requires autocommit to be disabled to enable cursor 
streaming
       // see 
https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
-      LOG.info("Autocommit has been disabled");
-      connection.setAutoCommit(false);
+      // This option is configurable as Informix will error
+      // if calling setAutoCommit on a non-logged database
+      if (disableAutoCommit) {
+        LOG.info("Autocommit has been disabled");
+        connection.setAutoCommit(false);
+      }
       try (PreparedStatement statement =
           connection.prepareStatement(
               query.get(), ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)) {
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index 0139207235a..435bfc138b5 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -117,6 +117,10 @@ public class JdbcReadSchemaTransformProvider
       if (outputParallelization != null) {
         readRows = readRows.withOutputParallelization(outputParallelization);
       }
+      Boolean disableAutoCommit = config.getDisableAutoCommit();
+      if (disableAutoCommit != null) {
+        readRows = readRows.withDisableAutoCommit(disableAutoCommit);
+      }
       return PCollectionRowTuple.of("output", 
input.getPipeline().apply(readRows));
     }
   }
@@ -174,6 +178,9 @@ public class JdbcReadSchemaTransformProvider
     @Nullable
     public abstract Boolean getOutputParallelization();
 
+    @Nullable
+    public abstract Boolean getDisableAutoCommit();
+
     @Nullable
     public abstract String getDriverJars();
 
@@ -238,6 +245,8 @@ public class JdbcReadSchemaTransformProvider
 
       public abstract Builder setOutputParallelization(Boolean value);
 
+      public abstract Builder setDisableAutoCommit(Boolean value);
+
       public abstract Builder setDriverJars(String value);
 
       public abstract JdbcReadSchemaTransformConfiguration build();
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
index 4b5dc0d7e24..30012465eb9 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
@@ -65,6 +65,7 @@ public class JdbcSchemaIOProvider implements SchemaIOProvider 
{
         .addNullableField("readQuery", FieldType.STRING)
         .addNullableField("writeStatement", FieldType.STRING)
         .addNullableField("fetchSize", FieldType.INT16)
+        .addNullableField("disableAutoCommit", FieldType.BOOLEAN)
         .addNullableField("outputParallelization", FieldType.BOOLEAN)
         .addNullableField("autosharding", FieldType.BOOLEAN)
         // Partitioning support. If you specify a partition column we will use 
that instead of
@@ -140,6 +141,11 @@ public class JdbcSchemaIOProvider implements 
SchemaIOProvider {
               readRows = readRows.withFetchSize(fetchSize);
             }
 
+            @Nullable Boolean disableAutoCommit = 
config.getBoolean("disableAutoCommit");
+            if (disableAutoCommit != null) {
+              readRows = readRows.withDisableAutoCommit(disableAutoCommit);
+            }
+
             return input.apply(readRows);
           } else {
 
@@ -163,6 +169,11 @@ public class JdbcSchemaIOProvider implements 
SchemaIOProvider {
               readRows = 
readRows.withOutputParallelization(outputParallelization);
             }
 
+            @Nullable Boolean disableAutoCommit = 
config.getBoolean("disableAutoCommit");
+            if (disableAutoCommit != null) {
+              readRows = readRows.withDisableAutoCommit(disableAutoCommit);
+            }
+
             return input.apply(readRows);
           }
         }
diff --git a/sdks/python/apache_beam/io/jdbc.py 
b/sdks/python/apache_beam/io/jdbc.py
index 3fef1f5fee3..d4ece0c7bc2 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -125,6 +125,7 @@ Config = typing.NamedTuple(
      ('read_query', typing.Optional[str]),
      ('write_statement', typing.Optional[str]),
      ('fetch_size', typing.Optional[np.int16]),
+     ('disable_autocommit', typing.Optional[bool]),
      ('output_parallelization', typing.Optional[bool]),
      ('autosharding', typing.Optional[bool]),
      ('partition_column', typing.Optional[str]),
@@ -236,6 +237,7 @@ class WriteToJdbc(ExternalTransform):
                             write_statement=statement,
                             read_query=None,
                             fetch_size=None,
+                            disable_autocommit=None,
                             output_parallelization=None,
                             autosharding=autosharding,
                             max_connections=max_connections,
@@ -286,6 +288,7 @@ class ReadFromJdbc(ExternalTransform):
       username,
       password,
       query=None,
+      disable_autocommit=None,
       output_parallelization=None,
       fetch_size=None,
       partition_column=None,
@@ -305,6 +308,7 @@ class ReadFromJdbc(ExternalTransform):
     :param username: database username
     :param password: database password
     :param query: sql query to be executed
+    :param disable_autocommit: disable autocommit on read
     :param output_parallelization: is output parallelization on
     :param fetch_size: how many rows to fetch
     :param partition_column: enable partitioned reads by splitting on this
@@ -350,6 +354,7 @@ class ReadFromJdbc(ExternalTransform):
                             write_statement=None,
                             read_query=query,
                             fetch_size=fetch_size,
+                            disable_autocommit=disable_autocommit,
                             output_parallelization=output_parallelization,
                             autosharding=None,
                             max_connections=max_connections,

Reply via email to