svetakvsundhar commented on code in PR #34393:
URL: https://github.com/apache/beam/pull/34393#discussion_r2009269295


##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java:
##########
@@ -889,6 +927,305 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
+  /** Implementation of {@link #readRowsWithPartitions}. */
+  @AutoValue
+  public abstract static class ReadRowsWithPartitions<PartitionColumnT>
+      extends PTransform<PBegin, PCollection<Row>> {
+
+    @Pure
+    abstract @Nullable SerializableFunction<Void, DataSource> 
getDataSourceProviderFn();
+
+    @Pure
+    abstract int getFetchSize();
+
+    @Pure
+    abstract boolean getDisableAutoCommit();
+
+    @Pure
+    abstract @Nullable Schema getSchema();
+
+    @Pure
+    abstract @Nullable Integer getNumPartitions();
+
+    @Pure
+    abstract @Nullable String getPartitionColumn();
+
+    @Pure
+    abstract @Nullable PartitionColumnT getLowerBound();
+
+    @Pure
+    abstract @Nullable PartitionColumnT getUpperBound();
+
+    @Pure
+    abstract @Nullable String getTable();
+
+    @Pure
+    abstract @Nullable TypeDescriptor<PartitionColumnT> 
getPartitionColumnType();
+
+    @Pure
+    abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT> 
getPartitionsHelper();
+
+    @Pure
+    abstract boolean getUseBeamSchema();
+
+    @Pure
+    abstract Builder<PartitionColumnT> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<PartitionColumnT> {
+
+      abstract Builder<PartitionColumnT> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<PartitionColumnT> setFetchSize(int fetchSize);
+
+      abstract Builder<PartitionColumnT> setNumPartitions(int numPartitions);
+
+      abstract Builder<PartitionColumnT> setPartitionColumn(String 
partitionColumn);
+
+      abstract Builder<PartitionColumnT> setLowerBound(PartitionColumnT 
lowerBound);
+
+      abstract Builder<PartitionColumnT> setUpperBound(PartitionColumnT 
upperBound);
+
+      abstract Builder<PartitionColumnT> setUseBeamSchema(boolean 
useBeamSchema);
+
+      abstract Builder<PartitionColumnT> setTable(String tableName);
+
+      abstract Builder<PartitionColumnT> setPartitionColumnType(
+          TypeDescriptor<PartitionColumnT> partitionColumnType);
+
+      abstract Builder<PartitionColumnT> setPartitionsHelper(
+          JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper);
+
+      abstract Builder<PartitionColumnT> setDisableAutoCommit(boolean 
disableAutoCommit);
+
+      abstract Builder<PartitionColumnT> setSchema(@Nullable Schema schema);
+
+      abstract ReadRowsWithPartitions<PartitionColumnT> build();
+    }
+
+    public ReadRowsWithPartitions<PartitionColumnT> 
withDataSourceConfiguration(
+        final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new 
DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    public ReadRowsWithPartitions<PartitionColumnT> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * The number of partitions. This, along with withLowerBound and 
withUpperBound, form partitions
+     * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
+     * evenly. When the input is less than 1, the number is set to 1.
+     */
+    public ReadRowsWithPartitions<PartitionColumnT> withNumPartitions(int 
numPartitions) {
+      checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
+      return toBuilder().setNumPartitions(numPartitions).build();
+    }
+
+    /** The name of a column of numeric type that will be used for 
partitioning. */
+    public ReadRowsWithPartitions<PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
+      checkNotNull(partitionColumn, "partitionColumn can not be null");
+      return toBuilder().setPartitionColumn(partitionColumn).build();
+    }
+
+    /** The number of rows to fetch from the database in the same {@link 
ResultSet} round-trip. */
+    public ReadRowsWithPartitions<PartitionColumnT> withFetchSize(int 
fetchSize) {
+      checkArgument(fetchSize > 0, "fetchSize can not be less than 1");
+      return toBuilder().setFetchSize(fetchSize).build();
+    }
+
+    /**
+     * Whether to disable auto commit on read. Defaults to true if not 
provided. The need for this
+     * config varies depending on the database platform. Informix requires 
this to be set to false
+     * while Postgres requires this to be set to true.
+     */
+    public ReadRowsWithPartitions<PartitionColumnT> withDisableAutoCommit(
+        boolean disableAutoCommit) {
+      return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+    }
+
+    /** Data output type is {@link Row}, and schema is auto-inferred from the 
database. */
+    public ReadRowsWithPartitions<PartitionColumnT> withRowOutput() {
+      return toBuilder().setUseBeamSchema(true).build();
+    }
+
+    public ReadRowsWithPartitions<PartitionColumnT> 
withLowerBound(PartitionColumnT lowerBound) {
+      return toBuilder().setLowerBound(lowerBound).build();
+    }
+
+    public ReadRowsWithPartitions<PartitionColumnT> 
withUpperBound(PartitionColumnT upperBound) {
+      return toBuilder().setUpperBound(upperBound).build();
+    }
+
+    /** Name of the table in the external database. Can be used to pass a 
user-defined subqery. */
+    public ReadRowsWithPartitions<PartitionColumnT> withTable(String 
tableName) {
+      checkNotNull(tableName, "table can not be null");
+      return toBuilder().setTable(tableName).build();
+    }
+
+    public ReadRowsWithPartitions<PartitionColumnT> withSchema(Schema schema) {
+      return toBuilder().setSchema(schema).build();
+    }
+
+    private static final int EQUAL = 0;

Review Comment:
   Can we add a comment as to what this variable is for?



##########
sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java:
##########
@@ -515,6 +515,110 @@ public void testIncompatibleSchemaThrowsError() {
         assertThrows(PipelineExecutionException.class, () -> 
pipeline.run().waitUntilFinish());
   }
 
+  @Test
+  public void testReadRowsPartitions() {
+    PCollection<Row> rows =
+        pipeline.apply(
+            JdbcIO.readRowsWithPartitions()
+                .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+                .withTable(READ_TABLE_NAME)
+                .withNumPartitions(1)
+                .withPartitionColumn("id")
+                .withLowerBound(0L)
+                .withUpperBound(1000L));
+    PAssert.thatSingleton(rows.apply("Count All", 
Count.globally())).isEqualTo(1000L);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadRowsPartitionsWithExplicitSchema() {
+    Schema customSchema =
+        Schema.of(
+            Schema.Field.of("CUSTOMER_NAME", 
Schema.FieldType.STRING).withNullable(true),
+            Schema.Field.of("CUSTOMER_ID", 
Schema.FieldType.INT32).withNullable(true));
+    PCollection<Row> rows =
+        pipeline.apply(
+            JdbcIO.readRowsWithPartitions()
+                .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+                .withTable(String.format("(select name,id from %s) as subq", 
READ_TABLE_NAME))
+                .withNumPartitions(5)
+                .withPartitionColumn("id")
+                .withLowerBound(0L)
+                .withUpperBound(1000L)
+                .withRowOutput()
+                .withSchema(customSchema));
+    assertEquals(customSchema, rows.getSchema());
+    PAssert.thatSingleton(rows.apply("Count All", 
Count.globally())).isEqualTo(1000L);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadRowsPartitionsBySubqery() {
+    PCollection<Row> rows =
+        pipeline.apply(
+            JdbcIO.readRowsWithPartitions()
+                .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+                .withTable(String.format("(select * from %s) as subq", 
READ_TABLE_NAME))
+                .withNumPartitions(10)
+                .withPartitionColumn("id")
+                .withLowerBound(0L)
+                .withUpperBound(1000L));
+    PAssert.thatSingleton(rows.apply("Count All", 
Count.globally())).isEqualTo(1000L);
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadRowsPartitionsIfNumPartitionsIsZero() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("numPartitions can not be less than 1");
+    pipeline.apply(
+        JdbcIO.readRowsWithPartitions()
+            .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+            .withTable(READ_TABLE_NAME)
+            .withNumPartitions(0)
+            .withPartitionColumn("id")
+            .withLowerBound(0L)
+            .withUpperBound(1000L));
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadRowsPartitionsLowerBoundIsMoreThanUpperBound() {

Review Comment:
   Can we also add a test to test a null partition column and a null table name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to