mosche commented on a change in pull request #15848:
URL: https://github.com/apache/beam/pull/15848#discussion_r798585058



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1128,157 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    /** The name of a column of numeric type that will be used for 
partitioning. */

Review comment:
       copy paste? comment looks wrong here 

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -196,31 +197,60 @@
  * );
  * }</code></pre>
  *
- * <p>3. To read all data from a table in parallel with partitioning can be 
done with {@link
- * ReadWithPartitions}:
+ * <h4>Parallel reading from a JDBC datasource</h4>
+ *
+ * <p>Beam supports partitioned reading of all data from a table. Automatic 
partitioning is
+ * supported for a few data types: {@link Long}, {@link 
org.joda.time.DateTime}, {@link String}. To
+ * enable this, use {@link ReadWithPartitions}.

Review comment:
       ```suggestion
    * enable this, use {@link JdbcIO#readWithPartitions(TypeDescriptor)}.
   ```

##########
File path: 
sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOAutoPartitioningIT.java
##########
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Objects;
+import java.util.Random;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+/** A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on test containers. */
+@RunWith(Parameterized.class)
+public class JdbcIOAutoPartitioningIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JdbcIOAutoPartitioningIT.class);
+
+  public static final Integer NUM_ROWS = 1_000;
+  public static final String TABLE_NAME = "baseTable";
+
+  @ClassRule public static TestPipeline pipelineWrite = TestPipeline.create();
+  @Rule public TestPipeline pipelineRead = TestPipeline.create();
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Iterable<String> params() {

Review comment:
       Awesome 🎉 

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -334,30 +342,188 @@ private static Calendar 
withTimestampAndTimezone(DateTime dateTime) {
   }
 
   /** Create partitions on a table. */
-  static class PartitioningFn extends DoFn<KV<Integer, KV<Long, Long>>, 
KV<String, Long>> {
+  static class PartitioningFn<T> extends DoFn<KV<Integer, KV<T, T>>, KV<T, T>> 
{
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitioningFn.class);
+    final TypeDescriptor<T> partitioningColumnType;
+
+    PartitioningFn(TypeDescriptor<T> partitioningColumnType) {
+      this.partitioningColumnType = partitioningColumnType;
+    }
+
     @ProcessElement
     public void processElement(ProcessContext c) {
-      Integer numPartitions = c.element().getKey();
-      Long lowerBound = c.element().getValue().getKey();
-      Long upperBound = c.element().getValue().getValue();
-      if (lowerBound > upperBound) {
-        throw new RuntimeException(
-            String.format(
-                "Lower bound [%s] is higher than upper bound [%s]", 
lowerBound, upperBound));
-      }
-      long stride = (upperBound - lowerBound) / numPartitions + 1;
-      for (long i = lowerBound; i < upperBound - stride; i += stride) {
-        String range = String.format("%s,%s", i, i + stride);
-        KV<String, Long> kvRange = KV.of(range, 1L);
-        c.output(kvRange);
-      }
-      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
-        long indexFrom = (numPartitions - 1) * stride;
-        long indexTo = upperBound + 1;
-        String range = String.format("%s,%s", indexFrom, indexTo);
-        KV<String, Long> kvRange = KV.of(range, 1L);
-        c.output(kvRange);
+      T lowerBound = c.element().getValue().getKey();
+      T upperBound = c.element().getValue().getValue();
+      JdbcReadWithPartitionsHelper<T> helper =
+          (JdbcReadWithPartitionsHelper<T>) 
PRESET_HELPERS.get(partitioningColumnType.getRawType());
+      List<KV<T, T>> ranges =
+          Lists.newArrayList(helper.calculateRanges(lowerBound, upperBound, 
c.element().getKey()));
+      LOG.warn("Total of {} ranges: {}", ranges.size(), ranges);
+      for (KV<T, T> e : ranges) {
+        c.output(e);
       }
     }
   }
+
+  public static final Map<Class<?>, JdbcReadWithPartitionsHelper<?>> 
PRESET_HELPERS =
+      ImmutableMap.of(
+          String.class,

Review comment:
       Interesting to think about SDFs in that context, being able to claim 
work requires a stable order for sure. So the index is really becoming a must 
... But you could gather stats while querying and use that to repartition 
dynamically. That would be great in deed! :)

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1128,157 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    /** The name of a column of numeric type that will be used for 
partitioning. */
+    public ReadWithPartitions<T, PartitionColumnT> withRowOutput() {
+      return toBuilder().setUseBeamSchema(true).build();
+    }
+
+    public ReadWithPartitions<T, PartitionColumnT> 
withLowerBound(PartitionColumnT lowerBound) {
       return toBuilder().setLowerBound(lowerBound).build();
     }
 
-    public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withUpperBound(PartitionColumnT upperBound) {
       return toBuilder().setUpperBound(upperBound).build();
     }
 
     /** Name of the table in the external database. Can be used to pass a 
user-defined subqery. */
-    public ReadWithPartitions<T> withTable(String tableName) {
+    public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName) 
{
       checkNotNull(tableName, "table can not be null");
       return toBuilder().setTable(tableName).build();
     }
 
+    private static final int EQUAL = 0;
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getRowMapper(), "withRowMapper() is required");
       checkNotNull(
           getDataSourceProviderFn(),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
       checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
-      checkArgument(

Review comment:
       I think this check should be kept and updated to `(getUpperBound() != 
null) == (getLowerBound() != null)` to make sure either both or no bounds are 
set ...? Otherwise `params` might be set incorrectly

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -999,69 +1045,80 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   /** Implementation of {@link #readWithPartitions}. */
   @AutoValue
-  public abstract static class ReadWithPartitions<T> extends 
PTransform<PBegin, PCollection<T>> {
+  public abstract static class ReadWithPartitions<T, PartitionColumnT>
+      extends PTransform<PBegin, PCollection<T>> {
 
     abstract @Nullable SerializableFunction<Void, DataSource> 
getDataSourceProviderFn();
 
     abstract @Nullable RowMapper<T> getRowMapper();
 
     abstract @Nullable Coder<T> getCoder();
 
-    abstract Integer getNumPartitions();
+    abstract @Nullable Integer getNumPartitions();
 
     abstract @Nullable String getPartitionColumn();
 
-    abstract @Nullable Long getLowerBound();
+    abstract @Nullable Boolean getUseBeamSchema();

Review comment:
       Any particular reason to have this Nullable / Boolean instead of using a 
primitive boolean?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -196,31 +197,60 @@
  * );
  * }</code></pre>
  *
- * <p>3. To read all data from a table in parallel with partitioning can be 
done with {@link
- * ReadWithPartitions}:
+ * <h4>Parallel reading from a JDBC datasource</h4>
+ *
+ * <p>Beam supports partitioned reading of all data from a table. Automatic 
partitioning is
+ * supported for a few data types: {@link Long}, {@link 
org.joda.time.DateTime}, {@link String}. To
+ * enable this, use {@link ReadWithPartitions}.
+ *
+ * <p>The partitioning scheme depends on these parameters, which can be 
user-provided, or
+ * automatically inferred by Beam (for the supported types):
+ *
+ * <ul>
+ *   <li>Upper bound
+ *   <li>Lower bound
+ *   <li>Number of partitions - when auto-inferred, the number of partitions 
defaults to the square
+ *       root of the number of rows divided by 5 (i.e.: {@code 
Math.floor(Math.sqrt(numRows) / 5)}).
+ * </ul>
+ *
+ * <p>To trigger auto-inference of these parameters, the user just needs to 
not provide them. To
+ * infer them automatically, Beam runs either of these statements:
+ *
+ * <ul>
+ *   <li>{@code SELECT min(column), max(column), COUNT(*) from table} when 
none of the parameters is
+ *       passed to the transform.
+ *   <li>{@code SELECT min(column), max(column) from table} when only number 
of partitions is
+ *       provided, but not upper or lower bounds.
+ * </ul>
+ *
+ * <p><b>Should I use this transform?</b> Consider using this transform in the 
following situations:
+ *
+ * <ul>
+ *   <li>The partitioning column is indexed. This will help speed up the range 
queries

Review comment:
       Is it worth mentioning that auto partitioning only works well if values 
of the partition column are more or less uniformly distributed?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -334,30 +342,188 @@ private static Calendar 
withTimestampAndTimezone(DateTime dateTime) {
   }
 
   /** Create partitions on a table. */
-  static class PartitioningFn extends DoFn<KV<Integer, KV<Long, Long>>, 
KV<String, Long>> {
+  static class PartitioningFn<T> extends DoFn<KV<Integer, KV<T, T>>, KV<T, T>> 
{
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitioningFn.class);
+    final TypeDescriptor<T> partitioningColumnType;
+
+    PartitioningFn(TypeDescriptor<T> partitioningColumnType) {
+      this.partitioningColumnType = partitioningColumnType;
+    }
+
     @ProcessElement
     public void processElement(ProcessContext c) {
-      Integer numPartitions = c.element().getKey();
-      Long lowerBound = c.element().getValue().getKey();
-      Long upperBound = c.element().getValue().getValue();
-      if (lowerBound > upperBound) {
-        throw new RuntimeException(
-            String.format(
-                "Lower bound [%s] is higher than upper bound [%s]", 
lowerBound, upperBound));
-      }
-      long stride = (upperBound - lowerBound) / numPartitions + 1;
-      for (long i = lowerBound; i < upperBound - stride; i += stride) {
-        String range = String.format("%s,%s", i, i + stride);
-        KV<String, Long> kvRange = KV.of(range, 1L);
-        c.output(kvRange);
-      }
-      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
-        long indexFrom = (numPartitions - 1) * stride;
-        long indexTo = upperBound + 1;
-        String range = String.format("%s,%s", indexFrom, indexTo);
-        KV<String, Long> kvRange = KV.of(range, 1L);
-        c.output(kvRange);
+      T lowerBound = c.element().getValue().getKey();
+      T upperBound = c.element().getValue().getValue();
+      JdbcReadWithPartitionsHelper<T> helper =
+          (JdbcReadWithPartitionsHelper<T>) 
PRESET_HELPERS.get(partitioningColumnType.getRawType());
+      List<KV<T, T>> ranges =
+          Lists.newArrayList(helper.calculateRanges(lowerBound, upperBound, 
c.element().getKey()));
+      LOG.warn("Total of {} ranges: {}", ranges.size(), ranges);
+      for (KV<T, T> e : ranges) {
+        c.output(e);
       }
     }
   }
+
+  public static final Map<Class<?>, JdbcReadWithPartitionsHelper<?>> 
PRESET_HELPERS =
+      ImmutableMap.of(
+          String.class,

Review comment:
       Yes, without querying significant stats from the DB it's going to be 
hard to do this well ...
   I'm just wondering if user's aren't better off partitioning manually using 
`readAll` using their domain knowledge rather than offering something automatic 
we already know isn't going to perform well. 

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -333,31 +343,206 @@ private static Calendar 
withTimestampAndTimezone(DateTime dateTime) {
     return calendar;
   }
 
+  /**
+   * A helper for {@link ReadWithPartitions} that handles range calculations.
+   *
+   * @param <PartitionT>
+   */
+  interface JdbcReadWithPartitionsHelper<PartitionT>
+      extends PreparedStatementSetter<KV<PartitionT, PartitionT>>,
+          RowMapper<KV<Long, KV<PartitionT, PartitionT>>> {
+    Iterable<KV<PartitionT, PartitionT>> calculateRanges(
+        PartitionT lowerBound, PartitionT upperBound, Long partitions);
+
+    @Override
+    void setParameters(KV<PartitionT, PartitionT> element, PreparedStatement 
preparedStatement);
+
+    @Override
+    KV<Long, KV<PartitionT, PartitionT>> mapRow(ResultSet resultSet) throws 
Exception;
+  }
+
   /** Create partitions on a table. */
-  static class PartitioningFn extends DoFn<KV<Integer, KV<Long, Long>>, 
KV<String, Long>> {
+  static class PartitioningFn<T> extends DoFn<KV<Long, KV<T, T>>, KV<T, T>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitioningFn.class);
+    final TypeDescriptor<T> partitioningColumnType;
+
+    PartitioningFn(TypeDescriptor<T> partitioningColumnType) {
+      this.partitioningColumnType = partitioningColumnType;
+    }
+
     @ProcessElement
     public void processElement(ProcessContext c) {
-      Integer numPartitions = c.element().getKey();
-      Long lowerBound = c.element().getValue().getKey();
-      Long upperBound = c.element().getValue().getValue();
-      if (lowerBound > upperBound) {
-        throw new RuntimeException(
-            String.format(
-                "Lower bound [%s] is higher than upper bound [%s]", 
lowerBound, upperBound));
-      }
-      long stride = (upperBound - lowerBound) / numPartitions + 1;
-      for (long i = lowerBound; i < upperBound - stride; i += stride) {
-        String range = String.format("%s,%s", i, i + stride);
-        KV<String, Long> kvRange = KV.of(range, 1L);
-        c.output(kvRange);
-      }
-      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
-        long indexFrom = (numPartitions - 1) * stride;
-        long indexTo = upperBound + 1;
-        String range = String.format("%s,%s", indexFrom, indexTo);
-        KV<String, Long> kvRange = KV.of(range, 1L);
-        c.output(kvRange);
+      T lowerBound = c.element().getValue().getKey();
+      T upperBound = c.element().getValue().getValue();
+      JdbcReadWithPartitionsHelper<T> helper =
+          (JdbcReadWithPartitionsHelper<T>) 
PRESET_HELPERS.get(partitioningColumnType.getRawType());
+      List<KV<T, T>> ranges =
+          Lists.newArrayList(helper.calculateRanges(lowerBound, upperBound, 
c.element().getKey()));
+      LOG.warn("Total of {} ranges: {}", ranges.size(), ranges);
+      for (KV<T, T> e : ranges) {
+        c.output(e);
       }
     }
   }
+
+  public static final Map<Class<?>, JdbcReadWithPartitionsHelper<?>> 
PRESET_HELPERS =

Review comment:
       Too me it looks like unit tests have been quite abandoned for this IO, 
particularly this util class has almost no (direct) test coverage :( I know the 
partitioning fn is tested further down, but I believe it would be beneficial to 
have some decent tests for the partitioning logic / range calculations itself 
without having to run a pipeline




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to