Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r192224177
--- Diff:
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
---
@@ -207,13 +238,18 @@ public void writeRecord(Row row) throws IOException {
if (batchCount >= batchInterval) {
// execute batch
+ batchLimitReachedMeter.markEvent();
flush();
}
}
void flush() {
try {
+ flushMeter.markEvent();
+ flushBatchCountHisto.update(batchCount);
+ long before = System.currentTimeMillis();
upload.executeBatch();
+ flushDurationMsHisto.update(System.currentTimeMillis()
- before);
--- End diff --
This may result in a negative duration.
---