pabloem commented on a change in pull request #15848:
URL: https://github.com/apache/beam/pull/15848#discussion_r799014490



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1128,157 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    /** The name of a column of numeric type that will be used for 
partitioning. */
+    public ReadWithPartitions<T, PartitionColumnT> withRowOutput() {
+      return toBuilder().setUseBeamSchema(true).build();
+    }
+
+    public ReadWithPartitions<T, PartitionColumnT> 
withLowerBound(PartitionColumnT lowerBound) {
       return toBuilder().setLowerBound(lowerBound).build();
     }
 
-    public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withUpperBound(PartitionColumnT upperBound) {
       return toBuilder().setUpperBound(upperBound).build();
     }
 
     /** Name of the table in the external database. Can be used to pass a 
user-defined subqery. */
-    public ReadWithPartitions<T> withTable(String tableName) {
+    public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName) 
{
       checkNotNull(tableName, "table can not be null");
       return toBuilder().setTable(tableName).build();
     }
 
+    private static final int EQUAL = 0;
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getRowMapper(), "withRowMapper() is required");
       checkNotNull(
           getDataSourceProviderFn(),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
       checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
-      checkArgument(

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to