This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9394f8561d2 [JdbcIO] Adding disableAutoCommit flag (#32988)
9394f8561d2 is described below
commit 9394f8561d29509a6e67fcd66254197bd0d46b2e
Author: Chris Ashcraft <[email protected]>
AuthorDate: Tue Nov 12 18:58:38 2024 -0600
[JdbcIO] Adding disableAutoCommit flag (#32988)
* adding disableAutoCommit flag to ReadFn
---------
Co-authored-by: Chris Ashcraft <[email protected]>
---
CHANGES.md | 1 +
.../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 86 ++++++++++++++++++++--
.../io/jdbc/JdbcReadSchemaTransformProvider.java | 9 +++
.../beam/sdk/io/jdbc/JdbcSchemaIOProvider.java | 11 +++
sdks/python/apache_beam/io/jdbc.py | 5 ++
5 files changed, 106 insertions(+), 6 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 261fafc024f..c5731bcff31 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -94,6 +94,7 @@
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners
([#18592](https://github.com/apache/beam/issues/18592),
[#31381](https://github.com/apache/beam/issues/31381)).
* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming
Java Legacy Runner without Streaming Engine
([#32892](https://github.com/apache/beam/issues/32892)).
+* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn
([#31111](https://github.com/apache/beam/issues/31111))
## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN]
(Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 2f164fa3bb7..946c07f5576 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -333,6 +333,7 @@ public class JdbcIO {
return new AutoValue_JdbcIO_Read.Builder<T>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
+ .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.build();
}
@@ -341,6 +342,7 @@ public class JdbcIO {
return new AutoValue_JdbcIO_ReadRows.Builder()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
+ .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setStatementPreparator(ignored -> {})
.build();
}
@@ -356,6 +358,7 @@ public class JdbcIO {
return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
+ .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.build();
}
@@ -372,6 +375,7 @@ public class JdbcIO {
.setPartitionColumnType(partitioningColumnType)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
+ .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
@@ -389,6 +393,7 @@ public class JdbcIO {
.setPartitionsHelper(partitionsHelper)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
+ .setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
@@ -400,6 +405,7 @@ public class JdbcIO {
private static final long DEFAULT_BATCH_SIZE = 1000L;
private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L;
private static final int DEFAULT_FETCH_SIZE = 50_000;
+ private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true;
// Default values used from fluent backoff.
private static final Duration DEFAULT_INITIAL_BACKOFF =
Duration.standardSeconds(1);
private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF =
Duration.standardDays(1000);
@@ -733,6 +739,9 @@ public class JdbcIO {
@Pure
abstract boolean getOutputParallelization();
+ @Pure
+ abstract boolean getDisableAutoCommit();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -748,6 +757,8 @@ public class JdbcIO {
abstract Builder setOutputParallelization(boolean outputParallelization);
+ abstract Builder setDisableAutoCommit(boolean disableAutoCommit);
+
abstract ReadRows build();
}
@@ -799,6 +810,15 @@ public class JdbcIO {
return
toBuilder().setOutputParallelization(outputParallelization).build();
}
+ /**
+ * Whether to disable auto commit on read. Defaults to true if not
provided. The need for this
+ * config varies depending on the database platform. Informix requires
this to be set to false
+ * while Postgres requires this to be set to true.
+ */
+ public ReadRows withDisableAutoCommit(boolean disableAutoCommit) {
+ return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+ }
+
@Override
public PCollection<Row> expand(PBegin input) {
ValueProvider<String> query = checkStateNotNull(getQuery(), "withQuery()
is required");
@@ -816,6 +836,7 @@ public class JdbcIO {
.withCoder(RowCoder.of(schema))
.withRowMapper(SchemaUtil.BeamRowMapper.of(schema))
.withFetchSize(getFetchSize())
+ .withDisableAutoCommit(getDisableAutoCommit())
.withOutputParallelization(getOutputParallelization())
.withStatementPreparator(checkStateNotNull(getStatementPreparator())));
rows.setRowSchema(schema);
@@ -872,6 +893,9 @@ public class JdbcIO {
@Pure
abstract boolean getOutputParallelization();
+ @Pure
+ abstract boolean getDisableAutoCommit();
+
@Pure
abstract Builder<T> toBuilder();
@@ -892,6 +916,8 @@ public class JdbcIO {
abstract Builder<T> setOutputParallelization(boolean
outputParallelization);
+ abstract Builder<T> setDisableAutoCommit(boolean disableAutoCommit);
+
abstract Read<T> build();
}
@@ -958,6 +984,15 @@ public class JdbcIO {
return
toBuilder().setOutputParallelization(outputParallelization).build();
}
+ /**
+ * Whether to disable auto commit on read. Defaults to true if not
provided. The need for this
+ * config varies depending on the database platform. Informix requires
this to be set to false
+ * while Postgres requires this to be set to true.
+ */
+ public Read<T> withDisableAutoCommit(boolean disableAutoCommit) {
+ return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+ }
+
@Override
public PCollection<T> expand(PBegin input) {
ValueProvider<String> query = checkArgumentNotNull(getQuery(),
"withQuery() is required");
@@ -974,6 +1009,7 @@ public class JdbcIO {
.withRowMapper(rowMapper)
.withFetchSize(getFetchSize())
.withOutputParallelization(getOutputParallelization())
+ .withDisableAutoCommit(getDisableAutoCommit())
.withParameterSetter(
(element, preparedStatement) -> {
if (getStatementPreparator() != null) {
@@ -1029,6 +1065,8 @@ public class JdbcIO {
abstract boolean getOutputParallelization();
+ abstract boolean getDisableAutoCommit();
+
abstract Builder<ParameterT, OutputT> toBuilder();
@AutoValue.Builder
@@ -1049,6 +1087,8 @@ public class JdbcIO {
abstract Builder<ParameterT, OutputT> setOutputParallelization(boolean
outputParallelization);
+ abstract Builder<ParameterT, OutputT> setDisableAutoCommit(boolean
disableAutoCommit);
+
abstract ReadAll<ParameterT, OutputT> build();
}
@@ -1127,6 +1167,15 @@ public class JdbcIO {
return
toBuilder().setOutputParallelization(outputParallelization).build();
}
+ /**
+ * Whether to disable auto commit on read. Defaults to true if not
provided. The need for this
+ * config varies depending on the database platform. Informix requires
this to be set to false
+ * while Postgres requires this to be set to true.
+ */
+ public ReadAll<ParameterT, OutputT> withDisableAutoCommit(boolean
disableAutoCommit) {
+ return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+ }
+
private @Nullable Coder<OutputT> inferCoder(
CoderRegistry registry, SchemaRegistry schemaRegistry) {
if (getCoder() != null) {
@@ -1173,7 +1222,8 @@ public class JdbcIO {
checkStateNotNull(getQuery()),
checkStateNotNull(getParameterSetter()),
checkStateNotNull(getRowMapper()),
- getFetchSize())))
+ getFetchSize(),
+ getDisableAutoCommit())))
.setCoder(coder);
if (getOutputParallelization()) {
@@ -1254,6 +1304,9 @@ public class JdbcIO {
@Pure
abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT>
getPartitionsHelper();
+ @Pure
+ abstract boolean getDisableAutoCommit();
+
@Pure
abstract Builder<T, PartitionColumnT> toBuilder();
@@ -1287,6 +1340,8 @@ public class JdbcIO {
abstract Builder<T, PartitionColumnT> setPartitionsHelper(
JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper);
+ abstract Builder<T, PartitionColumnT> setDisableAutoCommit(boolean
disableAutoCommit);
+
abstract ReadWithPartitions<T, PartitionColumnT> build();
}
@@ -1337,6 +1392,16 @@ public class JdbcIO {
return toBuilder().setFetchSize(fetchSize).build();
}
+ /**
+ * Whether to disable auto commit on read. Defaults to true if not
provided. The need for this
+ * config varies depending on the database platform. Informix requires
this to be set to false
+ * while Postgres requires this to be set to true.
+ */
+ public ReadWithPartitions<T, PartitionColumnT> withDisableAutoCommit(
+ boolean disableAutoCommit) {
+ return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+ }
+
/** Data output type is {@link Row}, and schema is auto-inferred from the
database. */
public ReadWithPartitions<T, PartitionColumnT> withRowOutput() {
return toBuilder().setUseBeamSchema(true).build();
@@ -1419,7 +1484,8 @@ public class JdbcIO {
.withQuery(query)
.withDataSourceProviderFn(dataSourceProviderFn)
.withRowMapper(checkStateNotNull(partitionsHelper))
- .withFetchSize(getFetchSize()))
+ .withFetchSize(getFetchSize())
+ .withDisableAutoCommit(getDisableAutoCommit()))
.apply(
MapElements.via(
new SimpleFunction<
@@ -1487,7 +1553,8 @@ public class JdbcIO {
.withRowMapper(rowMapper)
.withFetchSize(getFetchSize())
.withParameterSetter(checkStateNotNull(partitionsHelper))
- .withOutputParallelization(false);
+ .withOutputParallelization(false)
+ .withDisableAutoCommit(getDisableAutoCommit());
if (getUseBeamSchema()) {
checkStateNotNull(schema);
@@ -1537,6 +1604,7 @@ public class JdbcIO {
private final PreparedStatementSetter<ParameterT> parameterSetter;
private final RowMapper<OutputT> rowMapper;
private final int fetchSize;
+ private final boolean disableAutoCommit;
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
@@ -1546,12 +1614,14 @@ public class JdbcIO {
ValueProvider<String> query,
PreparedStatementSetter<ParameterT> parameterSetter,
RowMapper<OutputT> rowMapper,
- int fetchSize) {
+ int fetchSize,
+ boolean disableAutoCommit) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.query = query;
this.parameterSetter = parameterSetter;
this.rowMapper = rowMapper;
this.fetchSize = fetchSize;
+ this.disableAutoCommit = disableAutoCommit;
}
@Setup
@@ -1577,8 +1647,12 @@ public class JdbcIO {
Connection connection = getConnection();
// PostgreSQL requires autocommit to be disabled to enable cursor
streaming
// see
https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
- LOG.info("Autocommit has been disabled");
- connection.setAutoCommit(false);
+ // This option is configurable as Informix will error
+ // if calling setAutoCommit on a non-logged database
+ if (disableAutoCommit) {
+ LOG.info("Autocommit has been disabled");
+ connection.setAutoCommit(false);
+ }
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)) {
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index 0139207235a..435bfc138b5 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -117,6 +117,10 @@ public class JdbcReadSchemaTransformProvider
if (outputParallelization != null) {
readRows = readRows.withOutputParallelization(outputParallelization);
}
+ Boolean disableAutoCommit = config.getDisableAutoCommit();
+ if (disableAutoCommit != null) {
+ readRows = readRows.withDisableAutoCommit(disableAutoCommit);
+ }
return PCollectionRowTuple.of("output",
input.getPipeline().apply(readRows));
}
}
@@ -174,6 +178,9 @@ public class JdbcReadSchemaTransformProvider
@Nullable
public abstract Boolean getOutputParallelization();
+ @Nullable
+ public abstract Boolean getDisableAutoCommit();
+
@Nullable
public abstract String getDriverJars();
@@ -238,6 +245,8 @@ public class JdbcReadSchemaTransformProvider
public abstract Builder setOutputParallelization(Boolean value);
+ public abstract Builder setDisableAutoCommit(Boolean value);
+
public abstract Builder setDriverJars(String value);
public abstract JdbcReadSchemaTransformConfiguration build();
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
index 4b5dc0d7e24..30012465eb9 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
@@ -65,6 +65,7 @@ public class JdbcSchemaIOProvider implements SchemaIOProvider
{
.addNullableField("readQuery", FieldType.STRING)
.addNullableField("writeStatement", FieldType.STRING)
.addNullableField("fetchSize", FieldType.INT16)
+ .addNullableField("disableAutoCommit", FieldType.BOOLEAN)
.addNullableField("outputParallelization", FieldType.BOOLEAN)
.addNullableField("autosharding", FieldType.BOOLEAN)
// Partitioning support. If you specify a partition column we will use
that instead of
@@ -140,6 +141,11 @@ public class JdbcSchemaIOProvider implements
SchemaIOProvider {
readRows = readRows.withFetchSize(fetchSize);
}
+ @Nullable Boolean disableAutoCommit =
config.getBoolean("disableAutoCommit");
+ if (disableAutoCommit != null) {
+ readRows = readRows.withDisableAutoCommit(disableAutoCommit);
+ }
+
return input.apply(readRows);
} else {
@@ -163,6 +169,11 @@ public class JdbcSchemaIOProvider implements
SchemaIOProvider {
readRows =
readRows.withOutputParallelization(outputParallelization);
}
+ @Nullable Boolean disableAutoCommit =
config.getBoolean("disableAutoCommit");
+ if (disableAutoCommit != null) {
+ readRows = readRows.withDisableAutoCommit(disableAutoCommit);
+ }
+
return input.apply(readRows);
}
}
diff --git a/sdks/python/apache_beam/io/jdbc.py
b/sdks/python/apache_beam/io/jdbc.py
index 3fef1f5fee3..d4ece0c7bc2 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -125,6 +125,7 @@ Config = typing.NamedTuple(
('read_query', typing.Optional[str]),
('write_statement', typing.Optional[str]),
('fetch_size', typing.Optional[np.int16]),
+ ('disable_autocommit', typing.Optional[bool]),
('output_parallelization', typing.Optional[bool]),
('autosharding', typing.Optional[bool]),
('partition_column', typing.Optional[str]),
@@ -236,6 +237,7 @@ class WriteToJdbc(ExternalTransform):
write_statement=statement,
read_query=None,
fetch_size=None,
+ disable_autocommit=None,
output_parallelization=None,
autosharding=autosharding,
max_connections=max_connections,
@@ -286,6 +288,7 @@ class ReadFromJdbc(ExternalTransform):
username,
password,
query=None,
+ disable_autocommit=None,
output_parallelization=None,
fetch_size=None,
partition_column=None,
@@ -305,6 +308,7 @@ class ReadFromJdbc(ExternalTransform):
:param username: database username
:param password: database password
:param query: sql query to be executed
+ :param disable_autocommit: disable autocommit on read
:param output_parallelization: is output parallelization on
:param fetch_size: how many rows to fetch
:param partition_column: enable partitioned reads by splitting on this
@@ -350,6 +354,7 @@ class ReadFromJdbc(ExternalTransform):
write_statement=None,
read_query=query,
fetch_size=fetch_size,
+ disable_autocommit=disable_autocommit,
output_parallelization=output_parallelization,
autosharding=None,
max_connections=max_connections,