This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f24cedf4edb Support JdbcIO autosharding in Python (#16921)
f24cedf4edb is described below
commit f24cedf4edb1312f2d07df00d0f29569dfcb4b39
Author: Pablo Estrada <[email protected]>
AuthorDate: Wed Jun 1 18:43:44 2022 -0400
Support JdbcIO autosharding in Python (#16921)
* Support JdbcIO autosharding in Python
* moving autosharding to proper spot
* fix spotless
* fix encoding issue
---
.../java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java | 9 +++++++--
sdks/python/apache_beam/io/jdbc.py | 6 ++++++
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
index dac1e78548d..bd0c665be3c 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
@@ -68,6 +68,7 @@ public class JdbcSchemaIOProvider implements SchemaIOProvider
{
.addNullableField("writeStatement", FieldType.STRING)
.addNullableField("fetchSize", FieldType.INT16)
.addNullableField("outputParallelization", FieldType.BOOLEAN)
+ .addNullableField("autosharding", FieldType.BOOLEAN)
.build();
}
@@ -139,11 +140,15 @@ public class JdbcSchemaIOProvider implements
SchemaIOProvider {
return new PTransform<PCollection<Row>, PDone>() {
@Override
public PDone expand(PCollection<Row> input) {
- return input.apply(
+ JdbcIO.Write<Row> writeRows =
JdbcIO.<Row>write()
.withDataSourceConfiguration(getDataSourceConfiguration())
.withStatement(generateWriteStatement(input.getSchema()))
- .withPreparedStatementSetter(new
JdbcUtil.BeamRowPreparedStatementSetter()));
+ .withPreparedStatementSetter(new
JdbcUtil.BeamRowPreparedStatementSetter());
+ if (config.getBoolean("autosharding") != null &&
config.getBoolean("autosharding")) {
+ writeRows = writeRows.withAutoSharding();
+ }
+ return input.apply(writeRows);
}
};
}
diff --git a/sdks/python/apache_beam/io/jdbc.py
b/sdks/python/apache_beam/io/jdbc.py
index 5c6c5ac6ffd..85b80fdea0e 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -124,6 +124,7 @@ Config = typing.NamedTuple(
('write_statement', typing.Optional[str]),
('fetch_size', typing.Optional[int]),
('output_parallelization', typing.Optional[bool]),
+ ('autosharding', typing.Optional[bool]),
],
)
@@ -175,6 +176,7 @@ class WriteToJdbc(ExternalTransform):
statement=None,
connection_properties=None,
connection_init_sqls=None,
+ autosharding=False,
expansion_service=None,
classpath=None,
):
@@ -191,6 +193,8 @@ class WriteToJdbc(ExternalTransform):
[propertyName=property;]*
:param connection_init_sqls: required only for MySql and MariaDB.
passed as list of strings
+ :param autosharding: enable automatic re-sharding of bundles to scale the
+ number of shards with the number of workers.
:param expansion_service: The address (host:port) of the ExpansionService.
:param classpath: A list of JARs or Java packages to include in the
classpath for the expansion service. This option is
@@ -221,6 +225,7 @@ class WriteToJdbc(ExternalTransform):
read_query=None,
fetch_size=None,
output_parallelization=None,
+ autosharding=autosharding,
))),
),
expansion_service or default_io_expansion_service(classpath),
@@ -318,6 +323,7 @@ class ReadFromJdbc(ExternalTransform):
read_query=query,
fetch_size=fetch_size,
output_parallelization=output_parallelization,
+ autosharding=None,
))),
),
expansion_service or default_io_expansion_service(classpath),