mosche commented on a change in pull request #15848:
URL: https://github.com/apache/beam/pull/15848#discussion_r797695823



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -999,59 +1015,84 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   /** Implementation of {@link #readWithPartitions}. */
   @AutoValue
-  public abstract static class ReadWithPartitions<T> extends 
PTransform<PBegin, PCollection<T>> {
+  public abstract static class ReadWithPartitions<T, PartitionColumnT>
+      extends PTransform<PBegin, PCollection<T>> {
 
     abstract @Nullable SerializableFunction<Void, DataSource> 
getDataSourceProviderFn();
 
     abstract @Nullable RowMapper<T> getRowMapper();
 
     abstract @Nullable Coder<T> getCoder();
 
-    abstract Integer getNumPartitions();
+    abstract @Nullable Integer getNumPartitions();
 
     abstract @Nullable String getPartitionColumn();
 
-    abstract @Nullable Long getLowerBound();
+    abstract @javax.annotation.Nullable @Nullable PartitionColumnT 
getLowerBound();

Review comment:
       Wondering, why also adding javax Nullable here?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1112,136 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withLowerBound(PartitionColumnT lowerBound) {
       return toBuilder().setLowerBound(lowerBound).build();
     }
 
-    public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withUpperBound(PartitionColumnT upperBound) {
       return toBuilder().setUpperBound(upperBound).build();
     }
 
     /** Name of the table in the external database. Can be used to pass a 
user-defined subqery. */
-    public ReadWithPartitions<T> withTable(String tableName) {
+    public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName) 
{
       checkNotNull(tableName, "table can not be null");
       return toBuilder().setTable(tableName).build();
     }
 
+    private static final int EQUAL = 0;
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getRowMapper(), "withRowMapper() is required");
       checkNotNull(
           getDataSourceProviderFn(),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
       checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
-      checkArgument(
-          getUpperBound() != null && getLowerBound() != null,
-          "Upper and lower bounds are mandatory parameters for 
JdbcIO.readWithPartitions");
       checkNotNull(getTable(), "withTable() is required");
+      if (getLowerBound() != null && getLowerBound() instanceof Comparable<?>) 
{

Review comment:
       Validation currently doesn't check that `(getLowerBound() != null) == 
(getUpperBound() != null)`, this may break the if condition further down ... 
especially if not implementing `Comparable`.
   Imho, this is an example where something like `withBounds(lower, upper)` 
provides much more guidance, but also makes validation a lot easier.

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1112,136 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withLowerBound(PartitionColumnT lowerBound) {
       return toBuilder().setLowerBound(lowerBound).build();
     }
 
-    public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withUpperBound(PartitionColumnT upperBound) {
       return toBuilder().setUpperBound(upperBound).build();
     }
 
     /** Name of the table in the external database. Can be used to pass a 
user-defined subqery. */
-    public ReadWithPartitions<T> withTable(String tableName) {
+    public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName) 
{
       checkNotNull(tableName, "table can not be null");
       return toBuilder().setTable(tableName).build();
     }
 
+    private static final int EQUAL = 0;
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getRowMapper(), "withRowMapper() is required");
       checkNotNull(
           getDataSourceProviderFn(),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
       checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
-      checkArgument(
-          getUpperBound() != null && getLowerBound() != null,
-          "Upper and lower bounds are mandatory parameters for 
JdbcIO.readWithPartitions");
       checkNotNull(getTable(), "withTable() is required");
+      if (getLowerBound() != null && getLowerBound() instanceof Comparable<?>) 
{
+        // Not all partition types are comparable. For example, LocalDateTime, 
which is a valid
+        // partitioning type, is not Comparable, so we can't enforce this for 
all sorts of
+        // partitioning.
+        checkArgument(
+            ((Comparable<PartitionColumnT>) 
getLowerBound()).compareTo(getUpperBound()) < EQUAL,
+            "The lower bound of partitioning column is larger or equal than 
the upper bound");
+      }
       checkArgument(
-          getLowerBound() < getUpperBound(),
-          "The lower bound of partitioning column is larger or equal than the 
upper bound");
-      checkArgument(
-          getUpperBound() - getLowerBound() >= getNumPartitions(),
-          "The specified number of partitions is more than the difference 
between upper bound and lower bound");
+          
JdbcUtil.PRESET_HELPERS.containsKey(getPartitionColumnType().getRawType()),
+          "readWithPartitions only supports the following types: %s",
+          JdbcUtil.PRESET_HELPERS.keySet());
+
+      PCollection<KV<Integer, KV<PartitionColumnT, PartitionColumnT>>> params;
+
+      if (getLowerBound() == null && getUpperBound() == null) {
+        String query =
+            String.format(
+                "SELECT min(%s), max(%s) FROM %s",
+                getPartitionColumn(), getPartitionColumn(), getTable());
+        if (getNumPartitions() == null) {
+          query =
+              String.format(
+                  "SELECT min(%s), max(%s), count(*) FROM %s",
+                  getPartitionColumn(), getPartitionColumn(), getTable());
+        }
+        params =
+            input
+                .apply(
+                    JdbcIO.<KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>>read()
+                        .withQuery(query)
+                        .withDataSourceProviderFn(getDataSourceProviderFn())
+                        .withRowMapper(
+                            (JdbcReadWithPartitionsHelper<PartitionColumnT>)
+                                
JdbcUtil.PRESET_HELPERS.get(getPartitionColumnType().getRawType())))
+                .apply(
+                    MapElements.via(
+                        new SimpleFunction<
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>,
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>>() {
+                          @Override
+                          public KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> apply(
+                              KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> input) {
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> result;
+                            if (getNumPartitions() == null) {
+                              // In this case, we use the table row count to 
infer a number of
+                              // partitions.
+                              // We take the square root of the number of 
rows, and divide it by 5
+                              // to keep a relatively low number of 
partitions, given that an RDBMS
+                              // cannot usually accept a very large number of 
connections.
+                              Integer numPartitions =

Review comment:
       Wondering, for smallish counts this still creates a fairly large number 
of partitions ...

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -334,30 +342,188 @@ private static Calendar 
withTimestampAndTimezone(DateTime dateTime) {
   }
 
   /** Create partitions on a table. */
-  static class PartitioningFn extends DoFn<KV<Integer, KV<Long, Long>>, 
KV<String, Long>> {
+  static class PartitioningFn<T> extends DoFn<KV<Integer, KV<T, T>>, KV<T, T>> 
{
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitioningFn.class);
+    final TypeDescriptor<T> partitioningColumnType;
+
+    PartitioningFn(TypeDescriptor<T> partitioningColumnType) {
+      this.partitioningColumnType = partitioningColumnType;
+    }
+
     @ProcessElement
     public void processElement(ProcessContext c) {
-      Integer numPartitions = c.element().getKey();
-      Long lowerBound = c.element().getValue().getKey();
-      Long upperBound = c.element().getValue().getValue();
-      if (lowerBound > upperBound) {
-        throw new RuntimeException(
-            String.format(
-                "Lower bound [%s] is higher than upper bound [%s]", 
lowerBound, upperBound));
-      }
-      long stride = (upperBound - lowerBound) / numPartitions + 1;
-      for (long i = lowerBound; i < upperBound - stride; i += stride) {
-        String range = String.format("%s,%s", i, i + stride);
-        KV<String, Long> kvRange = KV.of(range, 1L);
-        c.output(kvRange);
-      }
-      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
-        long indexFrom = (numPartitions - 1) * stride;
-        long indexTo = upperBound + 1;
-        String range = String.format("%s,%s", indexFrom, indexTo);
-        KV<String, Long> kvRange = KV.of(range, 1L);
-        c.output(kvRange);
+      T lowerBound = c.element().getValue().getKey();
+      T upperBound = c.element().getValue().getValue();
+      JdbcReadWithPartitionsHelper<T> helper =
+          (JdbcReadWithPartitionsHelper<T>) 
PRESET_HELPERS.get(partitioningColumnType.getRawType());
+      List<KV<T, T>> ranges =
+          Lists.newArrayList(helper.calculateRanges(lowerBound, upperBound, 
c.element().getKey()));
+      LOG.warn("Total of {} ranges: {}", ranges.size(), ranges);
+      for (KV<T, T> e : ranges) {
+        c.output(e);
       }
     }
   }
+
+  public static final Map<Class<?>, JdbcReadWithPartitionsHelper<?>> 
PRESET_HELPERS =
+      ImmutableMap.of(
+          String.class,

Review comment:
       To be honest, I'm not sure how to feel about offering equal width 
binning for strings. From past experience I fear this might trick users into a 
seemingly simple solution that could quickly turn into a bad surprise. I've 
rarely seen string columns to be uniformly distributed. Too often there's high 
skew around certain chars, not even speaking about categorical values... What 
do you think? 

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1112,136 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * strides for generated WHERE clause expressions used to split the column 
withPartitionColumn
      * evenly. When the input is less than 1, the number is set to 1.
      */
-    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+    public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int 
numPartitions) {
       checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
       return toBuilder().setNumPartitions(numPartitions).build();
     }
 
     /** The name of a column of numeric type that will be used for 
partitioning. */
-    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+    public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String 
partitionColumn) {
       checkNotNull(partitionColumn, "partitionColumn can not be null");
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
-    public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withLowerBound(PartitionColumnT lowerBound) {
       return toBuilder().setLowerBound(lowerBound).build();
     }
 
-    public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+    public ReadWithPartitions<T, PartitionColumnT> 
withUpperBound(PartitionColumnT upperBound) {
       return toBuilder().setUpperBound(upperBound).build();
     }
 
     /** Name of the table in the external database. Can be used to pass a 
user-defined subqery. */
-    public ReadWithPartitions<T> withTable(String tableName) {
+    public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName) 
{
       checkNotNull(tableName, "table can not be null");
       return toBuilder().setTable(tableName).build();
     }
 
+    private static final int EQUAL = 0;
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getRowMapper(), "withRowMapper() is required");
       checkNotNull(
           getDataSourceProviderFn(),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
       checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
-      checkArgument(
-          getUpperBound() != null && getLowerBound() != null,
-          "Upper and lower bounds are mandatory parameters for 
JdbcIO.readWithPartitions");
       checkNotNull(getTable(), "withTable() is required");
+      if (getLowerBound() != null && getLowerBound() instanceof Comparable<?>) 
{
+        // Not all partition types are comparable. For example, LocalDateTime, 
which is a valid
+        // partitioning type, is not Comparable, so we can't enforce this for 
all sorts of
+        // partitioning.
+        checkArgument(
+            ((Comparable<PartitionColumnT>) 
getLowerBound()).compareTo(getUpperBound()) < EQUAL,
+            "The lower bound of partitioning column is larger or equal than 
the upper bound");
+      }
       checkArgument(
-          getLowerBound() < getUpperBound(),
-          "The lower bound of partitioning column is larger or equal than the 
upper bound");
-      checkArgument(
-          getUpperBound() - getLowerBound() >= getNumPartitions(),
-          "The specified number of partitions is more than the difference 
between upper bound and lower bound");
+          
JdbcUtil.PRESET_HELPERS.containsKey(getPartitionColumnType().getRawType()),
+          "readWithPartitions only supports the following types: %s",
+          JdbcUtil.PRESET_HELPERS.keySet());
+
+      PCollection<KV<Integer, KV<PartitionColumnT, PartitionColumnT>>> params;
+
+      if (getLowerBound() == null && getUpperBound() == null) {
+        String query =
+            String.format(
+                "SELECT min(%s), max(%s) FROM %s",
+                getPartitionColumn(), getPartitionColumn(), getTable());
+        if (getNumPartitions() == null) {
+          query =
+              String.format(
+                  "SELECT min(%s), max(%s), count(*) FROM %s",
+                  getPartitionColumn(), getPartitionColumn(), getTable());
+        }
+        params =
+            input
+                .apply(
+                    JdbcIO.<KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>>read()
+                        .withQuery(query)
+                        .withDataSourceProviderFn(getDataSourceProviderFn())
+                        .withRowMapper(
+                            (JdbcReadWithPartitionsHelper<PartitionColumnT>)
+                                
JdbcUtil.PRESET_HELPERS.get(getPartitionColumnType().getRawType())))
+                .apply(
+                    MapElements.via(
+                        new SimpleFunction<
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>,
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>>>() {
+                          @Override
+                          public KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> apply(
+                              KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> input) {
+                            KV<Integer, KV<PartitionColumnT, 
PartitionColumnT>> result;
+                            if (getNumPartitions() == null) {
+                              // In this case, we use the table row count to 
infer a number of
+                              // partitions.
+                              // We take the square root of the number of 
rows, and divide it by 5
+                              // to keep a relatively low number of 
partitions, given that an RDBMS
+                              // cannot usually accept a very large number of 
connections.
+                              Integer numPartitions =
+                                  Long.valueOf(
+                                          
Math.round(Math.floor(Math.sqrt(input.getKey()) / 5)))
+                                      .intValue();
+                              if (numPartitions == 0) {
+                                numPartitions = 1;
+                              }
+                              result = KV.of(numPartitions, input.getValue());
+                            } else {
+                              result = KV.of(getNumPartitions(), 
input.getValue());
+                            }
+                            LOG.info(
+                                "Inferred min: {} - max: {} - numPartitions: 
{}",
+                                result.getValue().getKey(),
+                                result.getValue().getValue(),
+                                result.getKey());
+                            return result;
+                          }
+                        }));
+      } else {
+        params =
+            input.apply(
+                Create.of(KV.of(getNumPartitions(), KV.of(getLowerBound(), 
getUpperBound()))));
+      }
 
-      PCollection<KV<Integer, KV<Long, Long>>> params =
-          input.apply(
-              Create.of(
-                  Collections.singletonList(
-                      KV.of(getNumPartitions(), KV.of(getLowerBound(), 
getUpperBound())))));
-      PCollection<KV<String, Iterable<Long>>> ranges =
+      PCollection<KV<PartitionColumnT, PartitionColumnT>> ranges =
           params
-              .apply("Partitioning", ParDo.of(new PartitioningFn()))
-              .apply("Group partitions", GroupByKey.create());
+              .apply("Partitioning", ParDo.of(new 
PartitioningFn<>(getPartitionColumnType())))
+              .apply("Reshuffle partitions", Reshuffle.viaRandomKey());
 
-      JdbcIO.ReadAll<KV<String, Iterable<Long>>, T> readAll =
-          JdbcIO.<KV<String, Iterable<Long>>, T>readAll()
+      JdbcIO.ReadAll<KV<PartitionColumnT, PartitionColumnT>, T> readAll =
+          JdbcIO.<KV<PartitionColumnT, PartitionColumnT>, T>readAll()
               .withDataSourceProviderFn(getDataSourceProviderFn())
               .withQuery(
                   String.format(
                       "select * from %1$s where %2$s >= ? and %2$s < ?",
                       getTable(), getPartitionColumn()))
               .withRowMapper(getRowMapper())
               .withParameterSetter(
-                  (PreparedStatementSetter<KV<String, Iterable<Long>>>)
-                      (element, preparedStatement) -> {
-                        String[] range = element.getKey().split(",", -1);
-                        preparedStatement.setLong(1, Long.parseLong(range[0]));
-                        preparedStatement.setLong(2, Long.parseLong(range[1]));
-                      })
+                  // This cast is unchecked, thus this is a small 
type-checking risk. We just need

Review comment:
       Seems fine 👍 How about exposing something like `<T> 
JdbcReadWithPartitionsHelper<T> getPartitionsHelper(TypeDescriptor<T> type)` in 
JdbcUtil rather than the preset map itself? With that the cast has to be done 
in one place only ...

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -999,59 +1015,84 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   /** Implementation of {@link #readWithPartitions}. */
   @AutoValue
-  public abstract static class ReadWithPartitions<T> extends 
PTransform<PBegin, PCollection<T>> {
+  public abstract static class ReadWithPartitions<T, PartitionColumnT>
+      extends PTransform<PBegin, PCollection<T>> {
 
     abstract @Nullable SerializableFunction<Void, DataSource> 
getDataSourceProviderFn();
 
     abstract @Nullable RowMapper<T> getRowMapper();
 
     abstract @Nullable Coder<T> getCoder();
 
-    abstract Integer getNumPartitions();
+    abstract @Nullable Integer getNumPartitions();
 
     abstract @Nullable String getPartitionColumn();
 
-    abstract @Nullable Long getLowerBound();
+    abstract @javax.annotation.Nullable @Nullable PartitionColumnT 
getLowerBound();
 
-    abstract @Nullable Long getUpperBound();
+    abstract @javax.annotation.Nullable @Nullable PartitionColumnT 
getUpperBound();
 
     abstract @Nullable String getTable();
 
-    abstract Builder<T> toBuilder();
+    abstract TypeDescriptor<PartitionColumnT> getPartitionColumnType();
+
+    abstract Builder<T, PartitionColumnT> toBuilder();
+
+    /**
+     * A helper for {@link ReadWithPartitions} that handles range calculations.
+     *
+     * @param <PartitionT>
+     */
+    public interface JdbcReadWithPartitionsHelper<PartitionT>

Review comment:
       How about keeping this interface private initially and move it into 
JdbcUtil? It might not be polished / hardened enough to be exposed to users 
right away .. Also, to be on the safe treat count as long rather than int?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -325,17 +325,24 @@ public static ReadRows readRows() {
    *
    * @param <T> Type of the data to be read.
    */
-  public static <T> ReadWithPartitions<T> readWithPartitions() {
-    return new AutoValue_JdbcIO_ReadWithPartitions.Builder<T>()
+  public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> 
readWithPartitions(

Review comment:
       Hmm, introducing `PartitionColumnT` is a breaking change. Though, 
thinking about how likely this is going to affect people and what's required to 
work around it when using AutoValue, this feels like the better alternative ;)

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -999,59 +1015,84 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   /** Implementation of {@link #readWithPartitions}. */
   @AutoValue
-  public abstract static class ReadWithPartitions<T> extends 
PTransform<PBegin, PCollection<T>> {
+  public abstract static class ReadWithPartitions<T, PartitionColumnT>
+      extends PTransform<PBegin, PCollection<T>> {
 
     abstract @Nullable SerializableFunction<Void, DataSource> 
getDataSourceProviderFn();
 
     abstract @Nullable RowMapper<T> getRowMapper();
 
     abstract @Nullable Coder<T> getCoder();
 
-    abstract Integer getNumPartitions();
+    abstract @Nullable Integer getNumPartitions();
 
     abstract @Nullable String getPartitionColumn();
 
-    abstract @Nullable Long getLowerBound();
+    abstract @javax.annotation.Nullable @Nullable PartitionColumnT 
getLowerBound();
 
-    abstract @Nullable Long getUpperBound();
+    abstract @javax.annotation.Nullable @Nullable PartitionColumnT 
getUpperBound();
 
     abstract @Nullable String getTable();
 
-    abstract Builder<T> toBuilder();
+    abstract TypeDescriptor<PartitionColumnT> getPartitionColumnType();
+
+    abstract Builder<T, PartitionColumnT> toBuilder();
+
+    /**
+     * A helper for {@link ReadWithPartitions} that handles range calculations.
+     *
+     * @param <PartitionT>
+     */
+    public interface JdbcReadWithPartitionsHelper<PartitionT>

Review comment:
       Would be nice to have a simple `Pair` / `Tuple` value similar to `KV` to 
avoid the semantic connotation of using `KV` in such cases and the confusion 
this might create ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to