svetakvsundhar commented on code in PR #34393: URL: https://github.com/apache/beam/pull/34393#discussion_r2009269295
########## sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java: ########## @@ -889,6 +927,305 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + /** Implementation of {@link #readRowsWithPartitions}. */ + @AutoValue + public abstract static class ReadRowsWithPartitions<PartitionColumnT> + extends PTransform<PBegin, PCollection<Row>> { + + @Pure + abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn(); + + @Pure + abstract int getFetchSize(); + + @Pure + abstract boolean getDisableAutoCommit(); + + @Pure + abstract @Nullable Schema getSchema(); + + @Pure + abstract @Nullable Integer getNumPartitions(); + + @Pure + abstract @Nullable String getPartitionColumn(); + + @Pure + abstract @Nullable PartitionColumnT getLowerBound(); + + @Pure + abstract @Nullable PartitionColumnT getUpperBound(); + + @Pure + abstract @Nullable String getTable(); + + @Pure + abstract @Nullable TypeDescriptor<PartitionColumnT> getPartitionColumnType(); + + @Pure + abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT> getPartitionsHelper(); + + @Pure + abstract boolean getUseBeamSchema(); + + @Pure + abstract Builder<PartitionColumnT> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<PartitionColumnT> { + + abstract Builder<PartitionColumnT> setDataSourceProviderFn( + SerializableFunction<Void, DataSource> dataSourceProviderFn); + + abstract Builder<PartitionColumnT> setFetchSize(int fetchSize); + + abstract Builder<PartitionColumnT> setNumPartitions(int numPartitions); + + abstract Builder<PartitionColumnT> setPartitionColumn(String partitionColumn); + + abstract Builder<PartitionColumnT> setLowerBound(PartitionColumnT lowerBound); + + abstract Builder<PartitionColumnT> setUpperBound(PartitionColumnT upperBound); + + abstract Builder<PartitionColumnT> setUseBeamSchema(boolean useBeamSchema); + + abstract Builder<PartitionColumnT> setTable(String tableName); + + abstract Builder<PartitionColumnT> setPartitionColumnType( + TypeDescriptor<PartitionColumnT> partitionColumnType); + + abstract Builder<PartitionColumnT> setPartitionsHelper( + JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper); + + abstract Builder<PartitionColumnT> setDisableAutoCommit(boolean disableAutoCommit); + + abstract Builder<PartitionColumnT> setSchema(@Nullable Schema schema); + + abstract ReadRowsWithPartitions<PartitionColumnT> build(); + } + + public ReadRowsWithPartitions<PartitionColumnT> withDataSourceConfiguration( + final DataSourceConfiguration config) { + return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config)); + } + + public ReadRowsWithPartitions<PartitionColumnT> withDataSourceProviderFn( + SerializableFunction<Void, DataSource> dataSourceProviderFn) { + return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build(); + } + + /** + * The number of partitions. This, along with withLowerBound and withUpperBound, form partitions + * strides for generated WHERE clause expressions used to split the column withPartitionColumn + * evenly. When the input is less than 1, the number is set to 1. + */ + public ReadRowsWithPartitions<PartitionColumnT> withNumPartitions(int numPartitions) { + checkArgument(numPartitions > 0, "numPartitions can not be less than 1"); + return toBuilder().setNumPartitions(numPartitions).build(); + } + + /** The name of a column of numeric type that will be used for partitioning. */ + public ReadRowsWithPartitions<PartitionColumnT> withPartitionColumn(String partitionColumn) { + checkNotNull(partitionColumn, "partitionColumn can not be null"); + return toBuilder().setPartitionColumn(partitionColumn).build(); + } + + /** The number of rows to fetch from the database in the same {@link ResultSet} round-trip. */ + public ReadRowsWithPartitions<PartitionColumnT> withFetchSize(int fetchSize) { + checkArgument(fetchSize > 0, "fetchSize can not be less than 1"); + return toBuilder().setFetchSize(fetchSize).build(); + } + + /** + * Whether to disable auto commit on read. Defaults to true if not provided. The need for this + * config varies depending on the database platform. Informix requires this to be set to false + * while Postgres requires this to be set to true. + */ + public ReadRowsWithPartitions<PartitionColumnT> withDisableAutoCommit( + boolean disableAutoCommit) { + return toBuilder().setDisableAutoCommit(disableAutoCommit).build(); + } + + /** Data output type is {@link Row}, and schema is auto-inferred from the database. */ + public ReadRowsWithPartitions<PartitionColumnT> withRowOutput() { + return toBuilder().setUseBeamSchema(true).build(); + } + + public ReadRowsWithPartitions<PartitionColumnT> withLowerBound(PartitionColumnT lowerBound) { + return toBuilder().setLowerBound(lowerBound).build(); + } + + public ReadRowsWithPartitions<PartitionColumnT> withUpperBound(PartitionColumnT upperBound) { + return toBuilder().setUpperBound(upperBound).build(); + } + + /** Name of the table in the external database. Can be used to pass a user-defined subqery. */ + public ReadRowsWithPartitions<PartitionColumnT> withTable(String tableName) { + checkNotNull(tableName, "table can not be null"); + return toBuilder().setTable(tableName).build(); + } + + public ReadRowsWithPartitions<PartitionColumnT> withSchema(Schema schema) { + return toBuilder().setSchema(schema).build(); + } + + private static final int EQUAL = 0; Review Comment: Can we add a comment as to what this variable is for? ########## sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java: ########## @@ -515,6 +515,110 @@ public void testIncompatibleSchemaThrowsError() { assertThrows(PipelineExecutionException.class, () -> pipeline.run().waitUntilFinish()); } + @Test + public void testReadRowsPartitions() { + PCollection<Row> rows = + pipeline.apply( + JdbcIO.readRowsWithPartitions() + .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION) + .withTable(READ_TABLE_NAME) + .withNumPartitions(1) + .withPartitionColumn("id") + .withLowerBound(0L) + .withUpperBound(1000L)); + PAssert.thatSingleton(rows.apply("Count All", Count.globally())).isEqualTo(1000L); + pipeline.run(); + } + + @Test + public void testReadRowsPartitionsWithExplicitSchema() { + Schema customSchema = + Schema.of( + Schema.Field.of("CUSTOMER_NAME", Schema.FieldType.STRING).withNullable(true), + Schema.Field.of("CUSTOMER_ID", Schema.FieldType.INT32).withNullable(true)); + PCollection<Row> rows = + pipeline.apply( + JdbcIO.readRowsWithPartitions() + .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION) + .withTable(String.format("(select name,id from %s) as subq", READ_TABLE_NAME)) + .withNumPartitions(5) + .withPartitionColumn("id") + .withLowerBound(0L) + .withUpperBound(1000L) + .withRowOutput() + .withSchema(customSchema)); + assertEquals(customSchema, rows.getSchema()); + PAssert.thatSingleton(rows.apply("Count All", Count.globally())).isEqualTo(1000L); + pipeline.run(); + } + + @Test + public void testReadRowsPartitionsBySubqery() { + PCollection<Row> rows = + pipeline.apply( + JdbcIO.readRowsWithPartitions() + .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION) + .withTable(String.format("(select * from %s) as subq", READ_TABLE_NAME)) + .withNumPartitions(10) + .withPartitionColumn("id") + .withLowerBound(0L) + .withUpperBound(1000L)); + PAssert.thatSingleton(rows.apply("Count All", Count.globally())).isEqualTo(1000L); + pipeline.run(); + } + + @Test + public void testReadRowsPartitionsIfNumPartitionsIsZero() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("numPartitions can not be less than 1"); + pipeline.apply( + JdbcIO.readRowsWithPartitions() + .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION) + .withTable(READ_TABLE_NAME) + .withNumPartitions(0) + .withPartitionColumn("id") + .withLowerBound(0L) + .withUpperBound(1000L)); + pipeline.run(); + } + + @Test + public void testReadRowsPartitionsLowerBoundIsMoreThanUpperBound() { Review Comment: Can we also add a test to test a null partition column and a null table name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org