This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bf5260bf Add per-batch metrics to JdbcIO.write (#15429)
bf5260bf is described below

commit bf5260bf4f9aece143e17445848c03d1128c5c09
Author: Pablo <[email protected]>
AuthorDate: Tue Aug 31 19:59:41 2021 -0700

    Add per-batch metrics to JdbcIO.write (#15429)
    
    * Add batch distribution metric to JdbcIO.write
    
    * add ms per request
    
    * fixup
    
    * fixup
---
 .../jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index e5ea008..b3c10df 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -41,6 +41,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -51,6 +52,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.io.jdbc.JdbcUtil.PartitioningFn;
 import org.apache.beam.sdk.io.jdbc.SchemaUtil.FieldWithIndex;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
@@ -1615,6 +1618,10 @@ public class JdbcIO {
 
     private static class WriteFn<T> extends DoFn<T, Void> {
 
+      private static final Distribution RECORDS_PER_BATCH =
+          Metrics.distribution(WriteFn.class, "records_per_jdbc_batch");
+      private static final Distribution MS_PER_BATCH =
+          Metrics.distribution(WriteFn.class, "milliseconds_per_batch");
       private final WriteVoid<T> spec;
       private DataSource dataSource;
       private Connection connection;
@@ -1694,6 +1701,7 @@ public class JdbcIO {
         if (records.isEmpty()) {
           return;
         }
+        Long startTimeNs = System.nanoTime();
         // Only acquire the connection if there is something to write.
         if (connection == null) {
           connection = dataSource.getConnection();
@@ -1714,6 +1722,8 @@ public class JdbcIO {
               preparedStatement.executeBatch();
               // commit the changes
               connection.commit();
+              RECORDS_PER_BATCH.update(records.size());
+              
MS_PER_BATCH.update(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs));
               break;
             } catch (SQLException exception) {
               if (!spec.getRetryStrategy().apply(exception)) {

Reply via email to