[ 
https://issues.apache.org/jira/browse/BEAM-13184?focusedWorklogId=688372&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-688372
 ]

ASF GitHub Bot logged work on BEAM-13184:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Dec/21 00:32
            Start Date: 01/Dec/21 00:32
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request 
#15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r759755819



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != 
IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {

Review comment:
       "&& getAutoSharding()" is redundant ?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != 
IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =
+            input.apply(
+                ParDo.of(
+                    new DoFn<T, Iterable<T>>() {
+                      List<T> outputList;
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        if (outputList == null) {
+                          outputList = new ArrayList<>();
+                        }
+                        outputList.add(c.element());

Review comment:
       Won't we run into OOMs if this list grows too much ?

##########
File path: 
sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
##########
@@ -258,6 +263,40 @@ private PipelineResult runRead() {
     return pipelineRead.run();
   }
 
+  @Test
+  public void testWriteWithAutosharding() throws Exception {
+    String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    DatabaseTestHelper.createTable(dataSource, firstTableName);
+    try {
+      List<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
+      TestStream.Builder<KV<Integer, String>> ts =
+          TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+              .advanceWatermarkTo(Instant.now());
+      for (KV<Integer, String> elm : data) {
+        ts.addElements(elm);
+      }
+
+      PCollection<KV<Integer, String>> dataCollection =
+          pipelineWrite.apply(ts.advanceWatermarkToInfinity());
+      dataCollection.apply(
+          JdbcIO.<KV<Integer, String>>write()
+              .withDataSourceProviderFn(voidInput -> dataSource)
+              .withStatement(String.format("insert into %s values(?, ?) 
returning *", tableName))
+              .withAutoSharding()

Review comment:
       Are we actually able to test that auto-sharding worked somehow ?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))

Review comment:
       Should we consider moving this WithKeys.of() and the subsequent 
Values.create() into GroupIntoBatches to improve it's user interface (in a 
separate PR) ?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1432,6 +1442,11 @@ void set(
       return toBuilder().setPreparedStatementSetter(setter).build();
     }
 
+    /** If true, enables using a dynamically determined number of shards to 
write. */
+    public WriteWithResults<T, V> withAutoSharding() {

Review comment:
       Yeah, that sounds like a separate feature. The purpose of this feature 
is for the runner to determine the appropriate amount of sharding automatically.

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1708,7 +1777,41 @@ void set(
         checkArgument(
             spec.getPreparedStatementSetter() != null, 
"withPreparedStatementSetter() is required");
       }
-      return input
+      PCollection<Iterable<T>> iterables;

Review comment:
       Can we share code between WriteWithResult and WriteVoid ? This 
introduces a significant amount of code duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 688372)
    Time Spent: 2h 50m  (was: 2h 40m)

> Support autosharding for JdbcIO writers
> ---------------------------------------
>
>                 Key: BEAM-13184
>                 URL: https://issues.apache.org/jira/browse/BEAM-13184
>             Project: Beam
>          Issue Type: Task
>          Components: io-java-jdbc
>            Reporter: Pablo Estrada
>            Priority: P2
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> This should improve efficiency for Jdbc writers on streaming pipelines



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to