This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 1ec0476839c KAFKA-14659 source-record-write-[rate|total] metrics should exclude filtered records (#13193) 1ec0476839c is described below commit 1ec0476839c04d7d9fc6dbadab26364422ff79a7 Author: Hector Geraldino <hgerald...@gmail.com> AuthorDate: Tue Feb 28 09:40:18 2023 -0500 KAFKA-14659 source-record-write-[rate|total] metrics should exclude filtered records (#13193) Reviewers: Christo Lolov <christolo...@gmail.com>, Chris Egerton <chr...@aiven.io> --- .../kafka/connect/runtime/AbstractWorkerSourceTask.java | 9 ++++++--- .../kafka/connect/runtime/ConnectMetricsRegistry.java | 13 ++++++------- .../kafka/connect/runtime/AbstractWorkerSourceTaskTest.java | 12 ++++++------ 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index d3ec60ac66d..51836a2d6a3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -574,6 +574,8 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { private final int batchSize; private boolean completed = false; private int counter; + private int skipped; // Keeps track of filtered records + public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) { assert batchSize > 0; assert metricsGroup != null; @@ -582,6 +584,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { this.metricsGroup = metricsGroup; } public void skipRecord() { + skipped += 1; if (counter > 0 && --counter == 0) { finishedAllWrites(); } @@ -596,7 +599,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { } private void finishedAllWrites() { if (!completed) { - metricsGroup.recordWrite(batchSize - counter); + metricsGroup.recordWrite(batchSize - counter, skipped); completed = true; } } @@ -648,8 +651,8 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { sourceRecordActiveCount.record(activeRecordCount); } - void recordWrite(int recordCount) { - sourceRecordWrite.record(recordCount); + void recordWrite(int recordCount, int skippedCount) { + sourceRecordWrite.record(recordCount - skippedCount); activeRecordCount -= recordCount; activeRecordCount = Math.max(0, activeRecordCount); sourceRecordActiveCount.record(activeRecordCount); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java index d8579d44fc6..507ebc405c8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java @@ -179,15 +179,14 @@ public class ConnectMetricsRegistry { "belonging to the named source connector in this worker.", sourceTaskTags); sourceRecordWriteRate = createTemplate("source-record-write-rate", SOURCE_TASK_GROUP_NAME, - "The average per-second number of records output from the transformations and written" + - " to Kafka for this task belonging to the named source connector in this worker. This" + - " is after transformations are applied and excludes any records filtered out by the " + - "transformations.", + "The average per-second number of records written to Kafka for this task belonging to the " + + "named source connector in this worker, since the task was last restarted. This is after " + + "transformations are applied, and excludes any records filtered out by the transformations.", sourceTaskTags); sourceRecordWriteTotal = createTemplate("source-record-write-total", SOURCE_TASK_GROUP_NAME, - "The number of records output from the transformations and written to Kafka for this" + - " task belonging to the named source connector in this worker, since the task was " + - "last restarted.", + "The number of records output written to Kafka for this task belonging to the " + + "named source connector in this worker, since the task was last restarted. This is after " + + "transformations are applied, and excludes any records filtered out by the transformations.", sourceTaskTags); sourceRecordPollBatchTimeMax = createTemplate("poll-batch-max-time-ms", SOURCE_TASK_GROUP_NAME, "The maximum time in milliseconds taken by this task to poll for a batch of " + diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java index d0833dbffc7..f82981d9499 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java @@ -189,18 +189,18 @@ public class AbstractWorkerSourceTaskTest { AbstractWorkerSourceTask.SourceTaskMetricsGroup group1 = new AbstractWorkerSourceTask.SourceTaskMetricsGroup(taskId1, metrics); for (int i = 0; i != 10; ++i) { group.recordPoll(100, 1000 + i * 100); - group.recordWrite(10); + group.recordWrite(10, 2); } for (int i = 0; i != 20; ++i) { group1.recordPoll(100, 1000 + i * 100); - group1.recordWrite(10); + group1.recordWrite(10, 4); } assertEquals(1900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-max-time-ms"), 0.001d); assertEquals(1450.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "poll-batch-avg-time-ms"), 0.001d); assertEquals(33.333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-rate"), 0.001d); assertEquals(1000, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-poll-total"), 0.001d); - assertEquals(3.3333, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d); - assertEquals(100, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d); + assertEquals(2.666, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-rate"), 0.001d); + assertEquals(80, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-write-total"), 0.001d); assertEquals(900.0, metrics.currentMetricValueAsDouble(group.metricGroup(), "source-record-active-count"), 0.001d); // Close the group @@ -224,8 +224,8 @@ public class AbstractWorkerSourceTaskTest { assertEquals(1950.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "poll-batch-avg-time-ms"), 0.001d); assertEquals(66.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-rate"), 0.001d); assertEquals(2000, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-poll-total"), 0.001d); - assertEquals(6.667, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d); - assertEquals(200, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d); + assertEquals(4.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-rate"), 0.001d); + assertEquals(120, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-write-total"), 0.001d); assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"), 0.001d); }