This is an automated email from the ASF dual-hosted git repository.
svetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 17054575ae9 Feature: Yaml JdbcIO partitioning for ReadRows (#34417)
17054575ae9 is described below
commit 17054575ae9ed5fd6a8fd9c4a80d79fe17672f1d
Author: Shubham Jaiswal <[email protected]>
AuthorDate: Mon Mar 31 23:55:32 2025 +0530
Feature: Yaml JdbcIO partitioning for ReadRows (#34417)
* extend JdbcReadSchemaTransformProvider to take readRowParitions
* adding UT, comments and partition check boolean
* adding changes to yaml mapping
* updating standard_io mapping
* using validate() and removing playground changes
---
.../io/jdbc/JdbcReadSchemaTransformProvider.java | 65 ++++++++++++++++++---
.../jdbc/JdbcReadSchemaTransformProviderTest.java | 67 ++++++++++++++++++++++
sdks/python/apache_beam/yaml/standard_io.yaml | 2 +
3 files changed, 127 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index b4765f0392c..b136b3e3e05 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -213,21 +214,52 @@ public class JdbcReadSchemaTransformProvider
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
- String query = config.getReadQuery();
- if (query == null) {
- query = String.format("SELECT * FROM %s", config.getLocation());
+ config.validate();
+ // If we define a partition column, we follow a different route.
+ @Nullable String partitionColumn = config.getPartitionColumn();
+ @Nullable String location = config.getLocation();
+ if (partitionColumn != null) {
+ JdbcIO.ReadWithPartitions<Row, ?> readRowsWithParitions =
+ JdbcIO.<Row>readWithPartitions()
+ .withDataSourceConfiguration(dataSourceConfiguration())
+ .withTable(location)
+ .withPartitionColumn(partitionColumn)
+ .withRowOutput();
+
+ @Nullable Integer partitions = config.getNumPartitions();
+ if (partitions != null) {
+ readRowsWithParitions =
readRowsWithParitions.withNumPartitions(partitions);
+ }
+
+ @Nullable Integer fetchSize = config.getFetchSize();
+ if (fetchSize != null && fetchSize > 0) {
+ readRowsWithParitions =
readRowsWithParitions.withFetchSize(fetchSize);
+ }
+
+ @Nullable Boolean disableAutoCommit = config.getDisableAutoCommit();
+ if (disableAutoCommit != null) {
+ readRowsWithParitions =
readRowsWithParitions.withDisableAutoCommit(disableAutoCommit);
+ }
+ return PCollectionRowTuple.of("output",
input.getPipeline().apply(readRowsWithParitions));
+ }
+ @Nullable String readQuery = config.getReadQuery();
+ if (readQuery == null) {
+ readQuery = String.format("SELECT * FROM %s", location);
}
JdbcIO.ReadRows readRows =
-
JdbcIO.readRows().withDataSourceConfiguration(dataSourceConfiguration()).withQuery(query);
- Integer fetchSize = config.getFetchSize();
+ JdbcIO.readRows()
+ .withDataSourceConfiguration(dataSourceConfiguration())
+ .withQuery(readQuery);
+
+ @Nullable Integer fetchSize = config.getFetchSize();
if (fetchSize != null && fetchSize > 0) {
readRows = readRows.withFetchSize(fetchSize);
}
- Boolean outputParallelization = config.getOutputParallelization();
+ @Nullable Boolean outputParallelization =
config.getOutputParallelization();
if (outputParallelization != null) {
readRows = readRows.withOutputParallelization(outputParallelization);
}
- Boolean disableAutoCommit = config.getDisableAutoCommit();
+ @Nullable Boolean disableAutoCommit = config.getDisableAutoCommit();
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}
@@ -294,6 +326,14 @@ public class JdbcReadSchemaTransformProvider
@Nullable
public abstract String getLocation();
+ @SchemaFieldDescription("Name of a column of numeric type that will be
used for partitioning.")
+ @Nullable
+ public abstract String getPartitionColumn();
+
+ @SchemaFieldDescription("The number of partitions")
+ @Nullable
+ public abstract Integer getNumPartitions();
+
@SchemaFieldDescription(
"Whether to reshuffle the resulting PCollection so results are
distributed to all workers.")
@Nullable
@@ -340,13 +380,20 @@ public class JdbcReadSchemaTransformProvider
boolean readQueryPresent = (getReadQuery() != null &&
!"".equals(getReadQuery()));
boolean locationPresent = (getLocation() != null &&
!"".equals(getLocation()));
+ boolean partitionColumnPresent =
+ (getPartitionColumn() != null && !"".equals(getPartitionColumn()));
+ // If you specify a readQuery, it is to be used instead of a table.
if (readQueryPresent && locationPresent) {
throw new IllegalArgumentException("Query and Table are mutually
exclusive configurations");
}
if (!readQueryPresent && !locationPresent) {
throw new IllegalArgumentException("Either Query or Table must be
specified.");
}
+ // Reading with partitions only supports table argument.
+ if (partitionColumnPresent && !locationPresent) {
+ throw new IllegalArgumentException("Table must be specified to read
with partitions.");
+ }
}
public static Builder builder() {
@@ -368,6 +415,10 @@ public class JdbcReadSchemaTransformProvider
public abstract Builder setLocation(String value);
+ public abstract Builder setPartitionColumn(String value);
+
+ public abstract Builder setNumPartitions(Integer value);
+
public abstract Builder setReadQuery(String value);
public abstract Builder setConnectionProperties(String value);
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
index ca7690ac9a0..0837ea686dd 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
@@ -122,6 +122,17 @@ public class JdbcReadSchemaTransformProviderTest {
.build()
.validate();
});
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setDriverClassName("ClassName")
+ .setJdbcUrl("JdbcUrl")
+ .setReadQuery("Query")
+ .setPartitionColumn("Id")
+ .build()
+ .validate();
+ });
}
@Test
@@ -188,6 +199,62 @@ public class JdbcReadSchemaTransformProviderTest {
pipeline.run();
}
+ @Test
+ public void testReadWithPartitions() {
+ JdbcReadSchemaTransformProvider provider = null;
+ for (SchemaTransformProvider p :
ServiceLoader.load(SchemaTransformProvider.class)) {
+ if (p instanceof JdbcReadSchemaTransformProvider) {
+ provider = (JdbcReadSchemaTransformProvider) p;
+ break;
+ }
+ }
+ assertNotNull(provider);
+
+ PCollection<Row> output =
+ PCollectionRowTuple.empty(pipeline)
+ .apply(
+ provider.from(
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+
.setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
+ .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+ .setLocation(READ_TABLE_NAME)
+ .setPartitionColumn("Id")
+ .setNumPartitions(6)
+ .build()))
+ .get("output");
+ Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
+ PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadWithPartitionsWithJdbcTypeSpecified() {
+ JdbcReadSchemaTransformProvider provider = null;
+ for (SchemaTransformProvider p :
ServiceLoader.load(SchemaTransformProvider.class)) {
+ if (p instanceof JdbcReadSchemaTransformProvider) {
+ provider = (JdbcReadSchemaTransformProvider) p;
+ break;
+ }
+ }
+ assertNotNull(provider);
+
+ PCollection<Row> output =
+ PCollectionRowTuple.empty(pipeline)
+ .apply(
+ provider.from(
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+ .setJdbcType("derby")
+ .setLocation(READ_TABLE_NAME)
+ .setPartitionColumn("Id")
+ .setNumPartitions(6)
+ .build()))
+ .get("output");
+ Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
+ PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
/** Create test data that is consistent with that generated by TestRow. */
private static void addInitialData(DataSource dataSource, String tableName)
throws SQLException {
try (Connection connection = dataSource.getConnection()) {
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml
b/sdks/python/apache_beam/yaml/standard_io.yaml
index b2544e77355..271b4add429 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -218,6 +218,8 @@
password: 'password'
query: 'read_query'
table: 'location'
+ partition_column : 'partition_column'
+ partitions: 'partitions'
type: 'jdbc_type'
username: 'username'
'WriteToJdbc':