This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bf5260bf Add per-batch metrics to JdbcIO.write (#15429)
bf5260bf is described below
commit bf5260bf4f9aece143e17445848c03d1128c5c09
Author: Pablo <[email protected]>
AuthorDate: Tue Aug 31 19:59:41 2021 -0700
Add per-batch metrics to JdbcIO.write (#15429)
* Add batch distribution metric to JdbcIO.write
* add ms per request
* fixup
* fixup
---
.../jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index e5ea008..b3c10df 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -41,6 +41,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -51,6 +52,8 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.jdbc.JdbcUtil.PartitioningFn;
import org.apache.beam.sdk.io.jdbc.SchemaUtil.FieldWithIndex;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
@@ -1615,6 +1618,10 @@ public class JdbcIO {
private static class WriteFn<T> extends DoFn<T, Void> {
+ private static final Distribution RECORDS_PER_BATCH =
+ Metrics.distribution(WriteFn.class, "records_per_jdbc_batch");
+ private static final Distribution MS_PER_BATCH =
+ Metrics.distribution(WriteFn.class, "milliseconds_per_batch");
private final WriteVoid<T> spec;
private DataSource dataSource;
private Connection connection;
@@ -1694,6 +1701,7 @@ public class JdbcIO {
if (records.isEmpty()) {
return;
}
+ Long startTimeNs = System.nanoTime();
// Only acquire the connection if there is something to write.
if (connection == null) {
connection = dataSource.getConnection();
@@ -1714,6 +1722,8 @@ public class JdbcIO {
preparedStatement.executeBatch();
// commit the changes
connection.commit();
+ RECORDS_PER_BATCH.update(records.size());
+
MS_PER_BATCH.update(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTimeNs));
break;
} catch (SQLException exception) {
if (!spec.getRetryStrategy().apply(exception)) {