pabloem commented on a change in pull request #15848:
URL: https://github.com/apache/beam/pull/15848#discussion_r798993247



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1112,136 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withLowerBound(PartitionColumnT lowerBound) {
       return toBuilder().setLowerBound(lowerBound).build();
     }
 
-    public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withUpperBound(PartitionColumnT upperBound) {
       return toBuilder().setUpperBound(upperBound).build();
     }
 
     /** Name of the table in the external database. Can be used to pass a 
user-defined subqery. */
-    public ReadWithPartitions<T> withTable(String tableName) {
+    public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName) 
{
       checkNotNull(tableName, "table can not be null");
       return toBuilder().setTable(tableName).build();
     }
 
+    private static final int EQUAL = 0;
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getRowMapper(), "withRowMapper() is required");
       checkNotNull(
           getDataSourceProviderFn(),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
       checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
-      checkArgument(
-          getUpperBound() != null && getLowerBound() != null,
-          "Upper and lower bounds are mandatory parameters for 
JdbcIO.readWithPartitions");
       checkNotNull(getTable(), "withTable() is required");
+      if (getLowerBound() != null && getLowerBound() instanceof Comparable<?>) 
{
+        // Not all partition types are comparable. For example, LocalDateTime, 
which is a valid
+        // partitioning type, is not Comparable, so we can't enforce this for 
all sorts of
+        // partitioning.
+        checkArgument(
+            ((Comparable<PartitionColumnT>) 
getLowerBound()).compareTo(getUpperBound()) < EQUAL,
+            "The lower bound of partitioning column is larger or equal than 
the upper bound");
+      }
       checkArgument(
-          getLowerBound() < getUpperBound(),
-          "The lower bound of partitioning column is larger or equal than the 
upper bound");
-      checkArgument(
-          getUpperBound() - getLowerBound() >= getNumPartitions(),
-          "The specified number of partitions is more than the difference 
between upper bound and lower bound");
+          
JdbcUtil.PRESET_HELPERS.containsKey(getPartitionColumnType().getRawType()),
+          "readWithPartitions only supports the following types: %s",
+          JdbcUtil.PRESET_HELPERS.keySet());
+
+      PCollection<KV<Integer, KV<PartitionColumnT, PartitionColumnT>>> params;
+
+      if (getLowerBound() == null && getUpperBound() == null) {
+        String query =
+            String.format(
+                "SELECT min(%s), max(%s) FROM %s",
+                getPartitionColumn(), getPartitionColumn(), getTable());
+        if (getNumPartitions() == null) {
+          query =
+              String.format(
+                  "SELECT min(%s), max(%s), count(*) FROM %s",
+                  getPartitionColumn(), getPartitionColumn(), getTable());
+        }
+        params =
+            input
+                .apply(
+                    JdbcIO.<KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>>read()
+                        .withQuery(query)
+                        .withDataSourceProviderFn(getDataSourceProviderFn())
+                        .withRowMapper(
+                            (JdbcReadWithPartitionsHelper<PartitionColumnT>)
+                                
JdbcUtil.PRESET_HELPERS.get(getPartitionColumnType().getRawType())))
+                .apply(
+                    MapElements.via(
+                        new SimpleFunction<
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>,
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>>() {
+                          @Override
+                          public KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> apply(
+                              KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> input) {
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> result;
+                            if (getNumPartitions() == null) {
+                              // In this case, we use the table row count to 
infer a number of
+                              // partitions.
+                              // We take the square root of the number of 
rows, and divide it by 5
+                              // to keep a relatively low number of 
partitions, given that an RDBMS
+                              // cannot usually accept a very large number of 
connections.
+                              Integer numPartitions =
+                                  Long.valueOf(
+                                          
Math.round(Math.floor(Math.sqrt(input.getKey()) / 5)))
+                                      .intValue();
+                              if (numPartitions == 0) {
+                                numPartitions = 1;
+                              }
+                              result = KV.of(numPartitions, input.getValue());
+                            } else {
+                              result = KV.of(getNumPartitions(), 
input.getValue());
+                            }
+                            LOG.info(
+                                "Inferred min: {} - max: {} - numPartitions: 
{}",
+                                result.getValue().getKey(),
+                                result.getValue().getValue(),
+                                result.getKey());
+                            return result;
+                          }
+                        }));
+      } else {
+        params =
+            input.apply(
+                Create.of(KV.of(getNumPartitions(), KV.of(getLowerBound(), 
getUpperBound()))));
+      }
 
-      PCollection<KV<Integer, KV<Long, Long>>> params =
-          input.apply(
-              Create.of(
-                  Collections.singletonList(
-                      KV.of(getNumPartitions(), KV.of(getLowerBound(), 
getUpperBound())))));
-      PCollection<KV<String, Iterable<Long>>> ranges =
+      PCollection<KV<PartitionColumnT, PartitionColumnT>> ranges =
           params
-              .apply("Partitioning", ParDo.of(new PartitioningFn()))
-              .apply("Group partitions", GroupByKey.create());
+              .apply("Partitioning", ParDo.of(new 
PartitioningFn<>(getPartitionColumnType())))
+              .apply("Reshuffle partitions", Reshuffle.viaRandomKey());
 
-      JdbcIO.ReadAll<KV<String, Iterable<Long>>, T> readAll =
-          JdbcIO.<KV<String, Iterable<Long>>, T>readAll()
+      JdbcIO.ReadAll<KV<PartitionColumnT, PartitionColumnT>, T> readAll =
+          JdbcIO.<KV<PartitionColumnT, PartitionColumnT>, T>readAll()
               .withDataSourceProviderFn(getDataSourceProviderFn())
               .withQuery(
                   String.format(
                       "select * from %1$s where %2$s >= ? and %2$s < ?",
                       getTable(), getPartitionColumn()))
               .withRowMapper(getRowMapper())
               .withParameterSetter(
-                  (PreparedStatementSetter<KV<String, Iterable<Long>>>)
-                      (element, preparedStatement) -> {
-                        String[] range = element.getKey().split(",", -1);
-                        preparedStatement.setLong(1, Long.parseLong(range[0]));
-                        preparedStatement.setLong(2, Long.parseLong(range[1]));
-                      })
+                  // This cast is unchecked, thus this is a small 
type-checking risk. We just need

Review comment:
       done!

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1128,157 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    /** The name of a column of numeric type that will be used for 
partitioning. */

Review comment:
       fixed

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -999,69 +1045,80 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   /** Implementation of {@link #readWithPartitions}. */
   @AutoValue
-  public abstract static class ReadWithPartitions<T> extends 
PTransform<PBegin, PCollection<T>> {
+  public abstract static class ReadWithPartitions<T, PartitionColumnT>
+      extends PTransform<PBegin, PCollection<T>> {
 
     abstract @Nullable SerializableFunction<Void, DataSource> 
getDataSourceProviderFn();
 
     abstract @Nullable RowMapper<T> getRowMapper();
 
     abstract @Nullable Coder<T> getCoder();
 
-    abstract Integer getNumPartitions();
+    abstract @Nullable Integer getNumPartitions();
 
     abstract @Nullable String getPartitionColumn();
 
-    abstract @Nullable Long getLowerBound();
+    abstract @Nullable Boolean getUseBeamSchema();

Review comment:
       fixed

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -325,17 +325,24 @@ public static ReadRows readRows() {
    *
    * @param <T> Type of the data to be read.
    */
-  public static <T> ReadWithPartitions<T> readWithPartitions() {
-    return new AutoValue_JdbcIO_ReadWithPartitions.Builder<T>()
+  public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> 
readWithPartitions(

Review comment:
       thanks!

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -196,31 +197,60 @@
  * );
  * }</code></pre>
  *
- * <p>3. To read all data from a table in parallel with partitioning can be 
done with {@link
- * ReadWithPartitions}:
+ * <h4>Parallel reading from a JDBC datasource</h4>
+ *
+ * <p>Beam supports partitioned reading of all data from a table. Automatic 
partitioning is
+ * supported for a few data types: {@link Long}, {@link 
org.joda.time.DateTime}, {@link String}. To
+ * enable this, use {@link ReadWithPartitions}.
+ *
+ * <p>The partitioning scheme depends on these parameters, which can be 
user-provided, or
+ * automatically inferred by Beam (for the supported types):
+ *
+ * <ul>
+ *   <li>Upper bound
+ *   <li>Lower bound
+ *   <li>Number of partitions - when auto-inferred, the number of partitions 
defaults to the square
+ *       root of the number of rows divided by 5 (i.e.: {@code 
Math.floor(Math.sqrt(numRows) / 5)}).
+ * </ul>
+ *
+ * <p>To trigger auto-inference of these parameters, the user just needs to 
not provide them. To
+ * infer them automatically, Beam runs either of these statements:
+ *
+ * <ul>
+ *   <li>{@code SELECT min(column), max(column), COUNT(*) from table} when 
none of the parameters is
+ *       passed to the transform.
+ *   <li>{@code SELECT min(column), max(column) from table} when only number 
of partitions is
+ *       provided, but not upper or lower bounds.
+ * </ul>
+ *
+ * <p><b>Should I use this transform?</b> Consider using this transform in the 
following situations:
+ *
+ * <ul>
+ *   <li>The partitioning column is indexed. This will help speed up the range 
queries

Review comment:
       done!

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -196,31 +197,60 @@
  * );
  * }</code></pre>
  *
- * <p>3. To read all data from a table in parallel with partitioning can be 
done with {@link
- * ReadWithPartitions}:
+ * <h4>Parallel reading from a JDBC datasource</h4>
+ *
+ * <p>Beam supports partitioned reading of all data from a table. Automatic 
partitioning is
+ * supported for a few data types: {@link Long}, {@link 
org.joda.time.DateTime}, {@link String}. To
+ * enable this, use {@link ReadWithPartitions}.

Review comment:
       I've included this change in my new commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to