mosche commented on a change in pull request #15848:
URL: https://github.com/apache/beam/pull/15848#discussion_r799520225
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1112,136 @@ public void populateDisplayData(DisplayData.Builder
builder) {
* strides for generated WHERE clause expressions used to split the column
withPartitionColumn
* evenly. When the input is less than 1, the number is set to 1.
*/
- public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+ public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int
numPartitions) {
checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
return toBuilder().setNumPartitions(numPartitions).build();
}
/** The name of a column of numeric type that will be used for
partitioning. */
- public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+ public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String
partitionColumn) {
checkNotNull(partitionColumn, "partitionColumn can not be null");
return toBuilder().setPartitionColumn(partitionColumn).build();
}
- public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+ public ReadWithPartitions<T, PartitionColumnT>
withLowerBound(PartitionColumnT lowerBound) {
return toBuilder().setLowerBound(lowerBound).build();
}
- public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+ public ReadWithPartitions<T, PartitionColumnT>
withUpperBound(PartitionColumnT upperBound) {
return toBuilder().setUpperBound(upperBound).build();
}
/** Name of the table in the external database. Can be used to pass a
user-defined subqery. */
- public ReadWithPartitions<T> withTable(String tableName) {
+ public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName)
{
checkNotNull(tableName, "table can not be null");
return toBuilder().setTable(tableName).build();
}
+ private static final int EQUAL = 0;
+
@Override
public PCollection<T> expand(PBegin input) {
checkNotNull(getRowMapper(), "withRowMapper() is required");
checkNotNull(
getDataSourceProviderFn(),
"withDataSourceConfiguration() or withDataSourceProviderFn() is
required");
checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
- checkArgument(
- getUpperBound() != null && getLowerBound() != null,
- "Upper and lower bounds are mandatory parameters for
JdbcIO.readWithPartitions");
checkNotNull(getTable(), "withTable() is required");
+ if (getLowerBound() != null && getLowerBound() instanceof Comparable<?>)
{
+ // Not all partition types are comparable. For example, LocalDateTime,
which is a valid
+ // partitioning type, is not Comparable, so we can't enforce this for
all sorts of
+ // partitioning.
+ checkArgument(
+ ((Comparable<PartitionColumnT>)
getLowerBound()).compareTo(getUpperBound()) < EQUAL,
+ "The lower bound of partitioning column is larger or equal than
the upper bound");
+ }
checkArgument(
- getLowerBound() < getUpperBound(),
- "The lower bound of partitioning column is larger or equal than the
upper bound");
- checkArgument(
- getUpperBound() - getLowerBound() >= getNumPartitions(),
- "The specified number of partitions is more than the difference
between upper bound and lower bound");
+
JdbcUtil.PRESET_HELPERS.containsKey(getPartitionColumnType().getRawType()),
+ "readWithPartitions only supports the following types: %s",
+ JdbcUtil.PRESET_HELPERS.keySet());
+
+ PCollection<KV<Integer, KV<PartitionColumnT, PartitionColumnT>>> params;
+
+ if (getLowerBound() == null && getUpperBound() == null) {
+ String query =
+ String.format(
+ "SELECT min(%s), max(%s) FROM %s",
+ getPartitionColumn(), getPartitionColumn(), getTable());
+ if (getNumPartitions() == null) {
+ query =
+ String.format(
+ "SELECT min(%s), max(%s), count(*) FROM %s",
+ getPartitionColumn(), getPartitionColumn(), getTable());
+ }
+ params =
+ input
+ .apply(
+ JdbcIO.<KV<Integer, KV<PartitionColumnT,
PartitionColumnT>>>read()
+ .withQuery(query)
+ .withDataSourceProviderFn(getDataSourceProviderFn())
+ .withRowMapper(
+ (JdbcReadWithPartitionsHelper<PartitionColumnT>)
+
JdbcUtil.PRESET_HELPERS.get(getPartitionColumnType().getRawType())))
+ .apply(
+ MapElements.via(
+ new SimpleFunction<
+ KV<Integer, KV<PartitionColumnT,
PartitionColumnT>>,
+ KV<Integer, KV<PartitionColumnT,
PartitionColumnT>>>() {
+ @Override
+ public KV<Integer, KV<PartitionColumnT,
PartitionColumnT>> apply(
+ KV<Integer, KV<PartitionColumnT,
PartitionColumnT>> input) {
+ KV<Integer, KV<PartitionColumnT,
PartitionColumnT>> result;
+ if (getNumPartitions() == null) {
+ // In this case, we use the table row count to
infer a number of
+ // partitions.
+ // We take the square root of the number of
rows, and divide it by 5
+ // to keep a relatively low number of
partitions, given that an RDBMS
+ // cannot usually accept a very large number of
connections.
+ Integer numPartitions =
Review comment:
i think that makes sense, e.g. `sqrt(x)/10` looks good. or alternatively
just subtracting some constant value

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]