This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f24cedf4edb Support JdbcIO autosharding in Python (#16921)
f24cedf4edb is described below

commit f24cedf4edb1312f2d07df00d0f29569dfcb4b39
Author: Pablo Estrada <[email protected]>
AuthorDate: Wed Jun 1 18:43:44 2022 -0400

    Support JdbcIO autosharding in Python (#16921)
    
    * Support JdbcIO autosharding in Python
    
    * moving autosharding to proper spot
    
    * fix spotless
    
    * fix encoding issue
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java   | 9 +++++++--
 sdks/python/apache_beam/io/jdbc.py                               | 6 ++++++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
index dac1e78548d..bd0c665be3c 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
@@ -68,6 +68,7 @@ public class JdbcSchemaIOProvider implements SchemaIOProvider 
{
         .addNullableField("writeStatement", FieldType.STRING)
         .addNullableField("fetchSize", FieldType.INT16)
         .addNullableField("outputParallelization", FieldType.BOOLEAN)
+        .addNullableField("autosharding", FieldType.BOOLEAN)
         .build();
   }
 
@@ -139,11 +140,15 @@ public class JdbcSchemaIOProvider implements 
SchemaIOProvider {
       return new PTransform<PCollection<Row>, PDone>() {
         @Override
         public PDone expand(PCollection<Row> input) {
-          return input.apply(
+          JdbcIO.Write<Row> writeRows =
               JdbcIO.<Row>write()
                   .withDataSourceConfiguration(getDataSourceConfiguration())
                   .withStatement(generateWriteStatement(input.getSchema()))
-                  .withPreparedStatementSetter(new 
JdbcUtil.BeamRowPreparedStatementSetter()));
+                  .withPreparedStatementSetter(new 
JdbcUtil.BeamRowPreparedStatementSetter());
+          if (config.getBoolean("autosharding") != null && 
config.getBoolean("autosharding")) {
+            writeRows = writeRows.withAutoSharding();
+          }
+          return input.apply(writeRows);
         }
       };
     }
diff --git a/sdks/python/apache_beam/io/jdbc.py 
b/sdks/python/apache_beam/io/jdbc.py
index 5c6c5ac6ffd..85b80fdea0e 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -124,6 +124,7 @@ Config = typing.NamedTuple(
         ('write_statement', typing.Optional[str]),
         ('fetch_size', typing.Optional[int]),
         ('output_parallelization', typing.Optional[bool]),
+        ('autosharding', typing.Optional[bool]),
     ],
 )
 
@@ -175,6 +176,7 @@ class WriteToJdbc(ExternalTransform):
       statement=None,
       connection_properties=None,
       connection_init_sqls=None,
+      autosharding=False,
       expansion_service=None,
       classpath=None,
   ):
@@ -191,6 +193,8 @@ class WriteToJdbc(ExternalTransform):
                                   [propertyName=property;]*
     :param connection_init_sqls: required only for MySql and MariaDB.
                                  passed as list of strings
+    :param autosharding: enable automatic re-sharding of bundles to scale the
+                         number of shards with the number of workers.
     :param expansion_service: The address (host:port) of the ExpansionService.
     :param classpath: A list of JARs or Java packages to include in the
                       classpath for the expansion service. This option is
@@ -221,6 +225,7 @@ class WriteToJdbc(ExternalTransform):
                             read_query=None,
                             fetch_size=None,
                             output_parallelization=None,
+                            autosharding=autosharding,
                         ))),
         ),
         expansion_service or default_io_expansion_service(classpath),
@@ -318,6 +323,7 @@ class ReadFromJdbc(ExternalTransform):
                             read_query=query,
                             fetch_size=fetch_size,
                             output_parallelization=output_parallelization,
+                            autosharding=None,
                         ))),
         ),
         expansion_service or default_io_expansion_service(classpath),

Reply via email to