daria-malkova commented on a change in pull request #15049:
URL: https://github.com/apache/beam/pull/15049#discussion_r661405448



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime 
dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */
+  static class PartitioningFn extends DoFn<List<Integer>, KV<String, Integer>> 
{
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      List<Integer> params = c.element();
+      Integer lowerBound = params.get(0);
+      Integer upperBound = params.get(1);
+      Integer numPartitions = params.get(2);
+      int stride = (upperBound - lowerBound) / numPartitions + 1;
+      for (int i = lowerBound; i < upperBound - stride; i += stride) {
+        String range = String.format("%s,%s", i, i + stride);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
+        int indexFrom = (numPartitions - 1) * stride;
+        int indexTo = upperBound + 1;
+        String range = String.format("%s,%s", indexFrom, indexTo);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+    }
+  }
+
+  /**
+   * Select maximal and minimal value from a table by partitioning column.
+   *
+   * @return pair of integers corresponds to the upper and lower bounds.
+   */
+  static Integer[] getBounds(
+      PBegin input,
+      String table,
+      SerializableFunction<Void, DataSource> providerFunctionFn,
+      String partitionColumn) {
+    final Integer[] bounds = {0, 0};
+    input
+        .apply(
+            String.format("Read min and max value by %s", partitionColumn),
+            JdbcIO.<String>read()
+                .withDataSourceProviderFn(providerFunctionFn)
+                .withQuery(
+                    String.format("select min(%1$s), max(%1$s) from %2$s", 
partitionColumn, table))
+                .withRowMapper(
+                    (JdbcIO.RowMapper<String>)
+                        resultSet ->
+                            String.join(
+                                ",", Arrays.asList(resultSet.getString(1), 
resultSet.getString(2))))
+                .withOutputParallelization(false)
+                .withCoder(StringUtf8Coder.of()))
+        .apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext context) {
+                    List<String> elements = 
Splitter.on(',').splitToList(context.element());
+                    bounds[0] = 
Integer.parseInt(Objects.requireNonNull(elements.get(0)));
+                    bounds[1] = 
Integer.parseInt(Objects.requireNonNull(elements.get(1)));
+                    context.output(context.element());
+                  }
+                }));
+    return bounds;

Review comment:
       I've checked on dataflow - everything works well, min and max values 
were defined.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to