This is an automated email from the ASF dual-hosted git repository.

svetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 17054575ae9 Feature: Yaml JdbcIO partitioning for ReadRows (#34417)
17054575ae9 is described below

commit 17054575ae9ed5fd6a8fd9c4a80d79fe17672f1d
Author: Shubham Jaiswal <[email protected]>
AuthorDate: Mon Mar 31 23:55:32 2025 +0530

    Feature: Yaml JdbcIO partitioning for ReadRows (#34417)
    
    * extend JdbcReadSchemaTransformProvider to take readRowParitions
    
    * adding UT, comments and partition check boolean
    
    * adding changes to yaml mapping
    
    * updating standard_io mapping
    
    * using validate() and removing playground changes
---
 .../io/jdbc/JdbcReadSchemaTransformProvider.java   | 65 ++++++++++++++++++---
 .../jdbc/JdbcReadSchemaTransformProviderTest.java  | 67 ++++++++++++++++++++++
 sdks/python/apache_beam/yaml/standard_io.yaml      |  2 +
 3 files changed, 127 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index b4765f0392c..b136b3e3e05 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
@@ -213,21 +214,52 @@ public class JdbcReadSchemaTransformProvider
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      String query = config.getReadQuery();
-      if (query == null) {
-        query = String.format("SELECT * FROM %s", config.getLocation());
+      config.validate();
+      // If we define a partition column, we follow a different route.
+      @Nullable String partitionColumn = config.getPartitionColumn();
+      @Nullable String location = config.getLocation();
+      if (partitionColumn != null) {
+        JdbcIO.ReadWithPartitions<Row, ?> readRowsWithParitions =
+            JdbcIO.<Row>readWithPartitions()
+                .withDataSourceConfiguration(dataSourceConfiguration())
+                .withTable(location)
+                .withPartitionColumn(partitionColumn)
+                .withRowOutput();
+
+        @Nullable Integer partitions = config.getNumPartitions();
+        if (partitions != null) {
+          readRowsWithParitions = 
readRowsWithParitions.withNumPartitions(partitions);
+        }
+
+        @Nullable Integer fetchSize = config.getFetchSize();
+        if (fetchSize != null && fetchSize > 0) {
+          readRowsWithParitions = 
readRowsWithParitions.withFetchSize(fetchSize);
+        }
+
+        @Nullable Boolean disableAutoCommit = config.getDisableAutoCommit();
+        if (disableAutoCommit != null) {
+          readRowsWithParitions = 
readRowsWithParitions.withDisableAutoCommit(disableAutoCommit);
+        }
+        return PCollectionRowTuple.of("output", 
input.getPipeline().apply(readRowsWithParitions));
+      }
+      @Nullable String readQuery = config.getReadQuery();
+      if (readQuery == null) {
+        readQuery = String.format("SELECT * FROM %s", location);
       }
       JdbcIO.ReadRows readRows =
-          
JdbcIO.readRows().withDataSourceConfiguration(dataSourceConfiguration()).withQuery(query);
-      Integer fetchSize = config.getFetchSize();
+          JdbcIO.readRows()
+              .withDataSourceConfiguration(dataSourceConfiguration())
+              .withQuery(readQuery);
+
+      @Nullable Integer fetchSize = config.getFetchSize();
       if (fetchSize != null && fetchSize > 0) {
         readRows = readRows.withFetchSize(fetchSize);
       }
-      Boolean outputParallelization = config.getOutputParallelization();
+      @Nullable Boolean outputParallelization = 
config.getOutputParallelization();
       if (outputParallelization != null) {
         readRows = readRows.withOutputParallelization(outputParallelization);
       }
-      Boolean disableAutoCommit = config.getDisableAutoCommit();
+      @Nullable Boolean disableAutoCommit = config.getDisableAutoCommit();
       if (disableAutoCommit != null) {
         readRows = readRows.withDisableAutoCommit(disableAutoCommit);
       }
@@ -294,6 +326,14 @@ public class JdbcReadSchemaTransformProvider
     @Nullable
     public abstract String getLocation();
 
+    @SchemaFieldDescription("Name of a column of numeric type that will be 
used for partitioning.")
+    @Nullable
+    public abstract String getPartitionColumn();
+
+    @SchemaFieldDescription("The number of partitions")
+    @Nullable
+    public abstract Integer getNumPartitions();
+
     @SchemaFieldDescription(
         "Whether to reshuffle the resulting PCollection so results are 
distributed to all workers.")
     @Nullable
@@ -340,13 +380,20 @@ public class JdbcReadSchemaTransformProvider
 
       boolean readQueryPresent = (getReadQuery() != null && 
!"".equals(getReadQuery()));
       boolean locationPresent = (getLocation() != null && 
!"".equals(getLocation()));
+      boolean partitionColumnPresent =
+          (getPartitionColumn() != null && !"".equals(getPartitionColumn()));
 
+      // If you specify a readQuery, it is to be used instead of a table.
       if (readQueryPresent && locationPresent) {
         throw new IllegalArgumentException("Query and Table are mutually 
exclusive configurations");
       }
       if (!readQueryPresent && !locationPresent) {
         throw new IllegalArgumentException("Either Query or Table must be 
specified.");
       }
+      // Reading with partitions only supports table argument.
+      if (partitionColumnPresent && !locationPresent) {
+        throw new IllegalArgumentException("Table must be specified to read 
with partitions.");
+      }
     }
 
     public static Builder builder() {
@@ -368,6 +415,10 @@ public class JdbcReadSchemaTransformProvider
 
       public abstract Builder setLocation(String value);
 
+      public abstract Builder setPartitionColumn(String value);
+
+      public abstract Builder setNumPartitions(Integer value);
+
       public abstract Builder setReadQuery(String value);
 
       public abstract Builder setConnectionProperties(String value);
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
index ca7690ac9a0..0837ea686dd 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
@@ -122,6 +122,17 @@ public class JdbcReadSchemaTransformProviderTest {
               .build()
               .validate();
         });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .setReadQuery("Query")
+              .setPartitionColumn("Id")
+              .build()
+              .validate();
+        });
   }
 
   @Test
@@ -188,6 +199,62 @@ public class JdbcReadSchemaTransformProviderTest {
     pipeline.run();
   }
 
+  @Test
+  public void testReadWithPartitions() {
+    JdbcReadSchemaTransformProvider provider = null;
+    for (SchemaTransformProvider p : 
ServiceLoader.load(SchemaTransformProvider.class)) {
+      if (p instanceof JdbcReadSchemaTransformProvider) {
+        provider = (JdbcReadSchemaTransformProvider) p;
+        break;
+      }
+    }
+    assertNotNull(provider);
+
+    PCollection<Row> output =
+        PCollectionRowTuple.empty(pipeline)
+            .apply(
+                provider.from(
+                    
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+                        
.setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
+                        .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+                        .setLocation(READ_TABLE_NAME)
+                        .setPartitionColumn("Id")
+                        .setNumPartitions(6)
+                        .build()))
+            .get("output");
+    Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
+    PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadWithPartitionsWithJdbcTypeSpecified() {
+    JdbcReadSchemaTransformProvider provider = null;
+    for (SchemaTransformProvider p : 
ServiceLoader.load(SchemaTransformProvider.class)) {
+      if (p instanceof JdbcReadSchemaTransformProvider) {
+        provider = (JdbcReadSchemaTransformProvider) p;
+        break;
+      }
+    }
+    assertNotNull(provider);
+
+    PCollection<Row> output =
+        PCollectionRowTuple.empty(pipeline)
+            .apply(
+                provider.from(
+                    
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+                        .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+                        .setJdbcType("derby")
+                        .setLocation(READ_TABLE_NAME)
+                        .setPartitionColumn("Id")
+                        .setNumPartitions(6)
+                        .build()))
+            .get("output");
+    Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
+    PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
   /** Create test data that is consistent with that generated by TestRow. */
   private static void addInitialData(DataSource dataSource, String tableName) 
throws SQLException {
     try (Connection connection = dataSource.getConnection()) {
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml 
b/sdks/python/apache_beam/yaml/standard_io.yaml
index b2544e77355..271b4add429 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -218,6 +218,8 @@
         password: 'password'
         query: 'read_query'
         table: 'location'
+        partition_column : 'partition_column'
+        partitions: 'partitions'
         type: 'jdbc_type'
         username: 'username'
       'WriteToJdbc':

Reply via email to