mosche commented on a change in pull request #15848:
URL: https://github.com/apache/beam/pull/15848#discussion_r799520225



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1112,136 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withLowerBound(PartitionColumnT lowerBound) {
       return toBuilder().setLowerBound(lowerBound).build();
     }
 
-    public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withUpperBound(PartitionColumnT upperBound) {
       return toBuilder().setUpperBound(upperBound).build();
     }
 
     /** Name of the table in the external database. Can be used to pass a 
user-defined subqery. */
-    public ReadWithPartitions<T> withTable(String tableName) {
+    public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName) 
{
       checkNotNull(tableName, "table can not be null");
       return toBuilder().setTable(tableName).build();
     }
 
+    private static final int EQUAL = 0;
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getRowMapper(), "withRowMapper() is required");
       checkNotNull(
           getDataSourceProviderFn(),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
       checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
-      checkArgument(
-          getUpperBound() != null && getLowerBound() != null,
-          "Upper and lower bounds are mandatory parameters for 
JdbcIO.readWithPartitions");
       checkNotNull(getTable(), "withTable() is required");
+      if (getLowerBound() != null && getLowerBound() instanceof Comparable<?>) 
{
+        // Not all partition types are comparable. For example, LocalDateTime, 
which is a valid
+        // partitioning type, is not Comparable, so we can't enforce this for 
all sorts of
+        // partitioning.
+        checkArgument(
+            ((Comparable<PartitionColumnT>) 
getLowerBound()).compareTo(getUpperBound()) < EQUAL,
+            "The lower bound of partitioning column is larger or equal than 
the upper bound");
+      }
       checkArgument(
-          getLowerBound() < getUpperBound(),
-          "The lower bound of partitioning column is larger or equal than the 
upper bound");
-      checkArgument(
-          getUpperBound() - getLowerBound() >= getNumPartitions(),
-          "The specified number of partitions is more than the difference 
between upper bound and lower bound");
+          
JdbcUtil.PRESET_HELPERS.containsKey(getPartitionColumnType().getRawType()),
+          "readWithPartitions only supports the following types: %s",
+          JdbcUtil.PRESET_HELPERS.keySet());
+
+      PCollection<KV<Integer, KV<PartitionColumnT, PartitionColumnT>>> params;
+
+      if (getLowerBound() == null && getUpperBound() == null) {
+        String query =
+            String.format(
+                "SELECT min(%s), max(%s) FROM %s",
+                getPartitionColumn(), getPartitionColumn(), getTable());
+        if (getNumPartitions() == null) {
+          query =
+              String.format(
+                  "SELECT min(%s), max(%s), count(*) FROM %s",
+                  getPartitionColumn(), getPartitionColumn(), getTable());
+        }
+        params =
+            input
+                .apply(
+                    JdbcIO.<KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>>read()
+                        .withQuery(query)
+                        .withDataSourceProviderFn(getDataSourceProviderFn())
+                        .withRowMapper(
+                            (JdbcReadWithPartitionsHelper<PartitionColumnT>)
+                                
JdbcUtil.PRESET_HELPERS.get(getPartitionColumnType().getRawType())))
+                .apply(
+                    MapElements.via(
+                        new SimpleFunction<
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>,
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>>() {
+                          @Override
+                          public KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> apply(
+                              KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> input) {
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> result;
+                            if (getNumPartitions() == null) {
+                              // In this case, we use the table row count to 
infer a number of
+                              // partitions.
+                              // We take the square root of the number of 
rows, and divide it by 5
+                              // to keep a relatively low number of 
partitions, given that an RDBMS
+                              // cannot usually accept a very large number of 
connections.
+                              Integer numPartitions =

Review comment:
       i think that makes sense, e.g. `sqrt(x)/10` looks good. or alternatively 
just subtracting some constant value
   
![partitions](https://user-images.githubusercontent.com/1401430/152547255-76636290-e083-48a4-a30a-09a42e7e6e4f.png)
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to