pabloem commented on a change in pull request #15848:
URL: https://github.com/apache/beam/pull/15848#discussion_r798993247
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1112,136 @@ public void populateDisplayData(DisplayData.Builder
builder) {
* strides for generated WHERE clause expressions used to split the column
withPartitionColumn
* evenly. When the input is less than 1, the number is set to 1.
*/
- public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+ public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int
numPartitions) {
checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
return toBuilder().setNumPartitions(numPartitions).build();
}
/** The name of a column of numeric type that will be used for
partitioning. */
- public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+ public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String
partitionColumn) {
checkNotNull(partitionColumn, "partitionColumn can not be null");
return toBuilder().setPartitionColumn(partitionColumn).build();
}
- public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+ public ReadWithPartitions<T, PartitionColumnT>
withLowerBound(PartitionColumnT lowerBound) {
return toBuilder().setLowerBound(lowerBound).build();
}
- public ReadWithPartitions<T> withUpperBound(Long upperBound) {
+ public ReadWithPartitions<T, PartitionColumnT>
withUpperBound(PartitionColumnT upperBound) {
return toBuilder().setUpperBound(upperBound).build();
}
/** Name of the table in the external database. Can be used to pass a
user-defined subqery. */
- public ReadWithPartitions<T> withTable(String tableName) {
+ public ReadWithPartitions<T, PartitionColumnT> withTable(String tableName)
{
checkNotNull(tableName, "table can not be null");
return toBuilder().setTable(tableName).build();
}
+ private static final int EQUAL = 0;
+
@Override
public PCollection<T> expand(PBegin input) {
checkNotNull(getRowMapper(), "withRowMapper() is required");
checkNotNull(
getDataSourceProviderFn(),
"withDataSourceConfiguration() or withDataSourceProviderFn() is
required");
checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
- checkArgument(
- getUpperBound() != null && getLowerBound() != null,
- "Upper and lower bounds are mandatory parameters for
JdbcIO.readWithPartitions");
checkNotNull(getTable(), "withTable() is required");
+ if (getLowerBound() != null && getLowerBound() instanceof Comparable<?>)
{
+ // Not all partition types are comparable. For example, LocalDateTime,
which is a valid
+ // partitioning type, is not Comparable, so we can't enforce this for
all sorts of
+ // partitioning.
+ checkArgument(
+ ((Comparable<PartitionColumnT>)
getLowerBound()).compareTo(getUpperBound()) < EQUAL,
+ "The lower bound of partitioning column is larger or equal than
the upper bound");
+ }
checkArgument(
- getLowerBound() < getUpperBound(),
- "The lower bound of partitioning column is larger or equal than the
upper bound");
- checkArgument(
- getUpperBound() - getLowerBound() >= getNumPartitions(),
- "The specified number of partitions is more than the difference
between upper bound and lower bound");
+
JdbcUtil.PRESET_HELPERS.containsKey(getPartitionColumnType().getRawType()),
+ "readWithPartitions only supports the following types: %s",
+ JdbcUtil.PRESET_HELPERS.keySet());
+
+ PCollection<KV<Integer, KV<PartitionColumnT, PartitionColumnT>>> params;
+
+ if (getLowerBound() == null && getUpperBound() == null) {
+ String query =
+ String.format(
+ "SELECT min(%s), max(%s) FROM %s",
+ getPartitionColumn(), getPartitionColumn(), getTable());
+ if (getNumPartitions() == null) {
+ query =
+ String.format(
+ "SELECT min(%s), max(%s), count(*) FROM %s",
+ getPartitionColumn(), getPartitionColumn(), getTable());
+ }
+ params =
+ input
+ .apply(
+ JdbcIO.<KV<Integer, KV<PartitionColumnT,
PartitionColumnT>>>read()
+ .withQuery(query)
+ .withDataSourceProviderFn(getDataSourceProviderFn())
+ .withRowMapper(
+ (JdbcReadWithPartitionsHelper<PartitionColumnT>)
+
JdbcUtil.PRESET_HELPERS.get(getPartitionColumnType().getRawType())))
+ .apply(
+ MapElements.via(
+ new SimpleFunction<
+ KV<Integer, KV<PartitionColumnT,
PartitionColumnT>>,
+ KV<Integer, KV<PartitionColumnT,
PartitionColumnT>>>() {
+ @Override
+ public KV<Integer, KV<PartitionColumnT,
PartitionColumnT>> apply(
+ KV<Integer, KV<PartitionColumnT,
PartitionColumnT>> input) {
+ KV<Integer, KV<PartitionColumnT,
PartitionColumnT>> result;
+ if (getNumPartitions() == null) {
+ // In this case, we use the table row count to
infer a number of
+ // partitions.
+ // We take the square root of the number of
rows, and divide it by 5
+ // to keep a relatively low number of
partitions, given that an RDBMS
+ // cannot usually accept a very large number of
connections.
+ Integer numPartitions =
+ Long.valueOf(
+
Math.round(Math.floor(Math.sqrt(input.getKey()) / 5)))
+ .intValue();
+ if (numPartitions == 0) {
+ numPartitions = 1;
+ }
+ result = KV.of(numPartitions, input.getValue());
+ } else {
+ result = KV.of(getNumPartitions(),
input.getValue());
+ }
+ LOG.info(
+ "Inferred min: {} - max: {} - numPartitions:
{}",
+ result.getValue().getKey(),
+ result.getValue().getValue(),
+ result.getKey());
+ return result;
+ }
+ }));
+ } else {
+ params =
+ input.apply(
+ Create.of(KV.of(getNumPartitions(), KV.of(getLowerBound(),
getUpperBound()))));
+ }
- PCollection<KV<Integer, KV<Long, Long>>> params =
- input.apply(
- Create.of(
- Collections.singletonList(
- KV.of(getNumPartitions(), KV.of(getLowerBound(),
getUpperBound())))));
- PCollection<KV<String, Iterable<Long>>> ranges =
+ PCollection<KV<PartitionColumnT, PartitionColumnT>> ranges =
params
- .apply("Partitioning", ParDo.of(new PartitioningFn()))
- .apply("Group partitions", GroupByKey.create());
+ .apply("Partitioning", ParDo.of(new
PartitioningFn<>(getPartitionColumnType())))
+ .apply("Reshuffle partitions", Reshuffle.viaRandomKey());
- JdbcIO.ReadAll<KV<String, Iterable<Long>>, T> readAll =
- JdbcIO.<KV<String, Iterable<Long>>, T>readAll()
+ JdbcIO.ReadAll<KV<PartitionColumnT, PartitionColumnT>, T> readAll =
+ JdbcIO.<KV<PartitionColumnT, PartitionColumnT>, T>readAll()
.withDataSourceProviderFn(getDataSourceProviderFn())
.withQuery(
String.format(
"select * from %1$s where %2$s >= ? and %2$s < ?",
getTable(), getPartitionColumn()))
.withRowMapper(getRowMapper())
.withParameterSetter(
- (PreparedStatementSetter<KV<String, Iterable<Long>>>)
- (element, preparedStatement) -> {
- String[] range = element.getKey().split(",", -1);
- preparedStatement.setLong(1, Long.parseLong(range[0]));
- preparedStatement.setLong(2, Long.parseLong(range[1]));
- })
+ // This cast is unchecked, thus this is a small
type-checking risk. We just need
Review comment:
done!
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1071,74 +1128,157 @@ public void populateDisplayData(DisplayData.Builder
builder) {
* strides for generated WHERE clause expressions used to split the column
withPartitionColumn
* evenly. When the input is less than 1, the number is set to 1.
*/
- public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+ public ReadWithPartitions<T, PartitionColumnT> withNumPartitions(int
numPartitions) {
checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
return toBuilder().setNumPartitions(numPartitions).build();
}
/** The name of a column of numeric type that will be used for
partitioning. */
- public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+ public ReadWithPartitions<T, PartitionColumnT> withPartitionColumn(String
partitionColumn) {
checkNotNull(partitionColumn, "partitionColumn can not be null");
return toBuilder().setPartitionColumn(partitionColumn).build();
}
- public ReadWithPartitions<T> withLowerBound(Long lowerBound) {
+ /** The name of a column of numeric type that will be used for
partitioning. */
Review comment:
fixed
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -999,69 +1045,80 @@ public void populateDisplayData(DisplayData.Builder
builder) {
/** Implementation of {@link #readWithPartitions}. */
@AutoValue
- public abstract static class ReadWithPartitions<T> extends
PTransform<PBegin, PCollection<T>> {
+ public abstract static class ReadWithPartitions<T, PartitionColumnT>
+ extends PTransform<PBegin, PCollection<T>> {
abstract @Nullable SerializableFunction<Void, DataSource>
getDataSourceProviderFn();
abstract @Nullable RowMapper<T> getRowMapper();
abstract @Nullable Coder<T> getCoder();
- abstract Integer getNumPartitions();
+ abstract @Nullable Integer getNumPartitions();
abstract @Nullable String getPartitionColumn();
- abstract @Nullable Long getLowerBound();
+ abstract @Nullable Boolean getUseBeamSchema();
Review comment:
fixed
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -325,17 +325,24 @@ public static ReadRows readRows() {
*
* @param <T> Type of the data to be read.
*/
- public static <T> ReadWithPartitions<T> readWithPartitions() {
- return new AutoValue_JdbcIO_ReadWithPartitions.Builder<T>()
+ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT>
readWithPartitions(
Review comment:
thanks!
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -196,31 +197,60 @@
* );
* }</code></pre>
*
- * <p>3. To read all data from a table in parallel with partitioning can be
done with {@link
- * ReadWithPartitions}:
+ * <h4>Parallel reading from a JDBC datasource</h4>
+ *
+ * <p>Beam supports partitioned reading of all data from a table. Automatic
partitioning is
+ * supported for a few data types: {@link Long}, {@link
org.joda.time.DateTime}, {@link String}. To
+ * enable this, use {@link ReadWithPartitions}.
+ *
+ * <p>The partitioning scheme depends on these parameters, which can be
user-provided, or
+ * automatically inferred by Beam (for the supported types):
+ *
+ * <ul>
+ * <li>Upper bound
+ * <li>Lower bound
+ * <li>Number of partitions - when auto-inferred, the number of partitions
defaults to the square
+ * root of the number of rows divided by 5 (i.e.: {@code
Math.floor(Math.sqrt(numRows) / 5)}).
+ * </ul>
+ *
+ * <p>To trigger auto-inference of these parameters, the user just needs to
not provide them. To
+ * infer them automatically, Beam runs either of these statements:
+ *
+ * <ul>
+ * <li>{@code SELECT min(column), max(column), COUNT(*) from table} when
none of the parameters is
+ * passed to the transform.
+ * <li>{@code SELECT min(column), max(column) from table} when only number
of partitions is
+ * provided, but not upper or lower bounds.
+ * </ul>
+ *
+ * <p><b>Should I use this transform?</b> Consider using this transform in the
following situations:
+ *
+ * <ul>
+ * <li>The partitioning column is indexed. This will help speed up the range
queries
Review comment:
done!
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -196,31 +197,60 @@
* );
* }</code></pre>
*
- * <p>3. To read all data from a table in parallel with partitioning can be
done with {@link
- * ReadWithPartitions}:
+ * <h4>Parallel reading from a JDBC datasource</h4>
+ *
+ * <p>Beam supports partitioned reading of all data from a table. Automatic
partitioning is
+ * supported for a few data types: {@link Long}, {@link
org.joda.time.DateTime}, {@link String}. To
+ * enable this, use {@link ReadWithPartitions}.
Review comment:
I've included this change in my new commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]