This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4aec117eb4693f978f2d95580d72a12d14638e85 Author: Jing Ge <[email protected]> AuthorDate: Tue Mar 15 16:16:30 2022 +0100 [FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWriterMetricGroup directly. (cherry picked from commit 4e0c24a82e9b0fd35ca23610ba396a932b0f41b8) --- .../flink/connector/base/sink/writer/AsyncSinkWriter.java | 12 ++++++------ .../connector/base/sink/writer/TestSinkInitContext.java | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java index f45a3a5..090504a 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java @@ -69,10 +69,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable private final SinkWriterMetricGroup metrics; /* Counter for number of bytes this sink has attempted to send to the destination. */ - private final Counter numBytesOutCounter; + private final Counter numBytesSendCounter; /* Counter for number of records this sink has attempted to send to the destination. */ - private final Counter numRecordsOutCounter; + private final Counter numRecordsSendCounter; /** * Rate limiting strategy {@code inflightMessages} at any given time, {@code @@ -295,8 +295,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable this.metrics = context.metricGroup(); this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp); - this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter(); - this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter(); + this.numBytesSendCounter = this.metrics.getNumBytesSendCounter(); + this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter(); this.fatalExceptionCons = exception -> @@ -417,8 +417,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable batchSizeBytes += requestEntrySize; } - numRecordsOutCounter.inc(batch.size()); - numBytesOutCounter.inc(batchSizeBytes); + numRecordsSendCounter.inc(batch.size()); + numBytesSendCounter.inc(batchSizeBytes); return batch; } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java index b146190..a7e4979 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java @@ -146,10 +146,10 @@ public class TestSinkInitContext implements Sink.InitContext { } public Counter getNumRecordsOutCounter() { - return metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + return metricGroup.getNumRecordsSendCounter(); } public Counter getNumBytesOutCounter() { - return metricGroup.getIOMetricGroup().getNumBytesOutCounter(); + return metricGroup.getNumBytesSendCounter(); } }
