This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new da4e1bf2832 KAFKA-14659 source-record-write-[rate|total] metrics 
should exclude filtered records (#13193)
da4e1bf2832 is described below

commit da4e1bf2832cfd1fbbed8ae200e24e8aad03a798
Author: Hector Geraldino <[email protected]>
AuthorDate: Tue Feb 28 09:40:18 2023 -0500

    KAFKA-14659 source-record-write-[rate|total] metrics should exclude 
filtered records (#13193)
    
    Reviewers: Christo Lolov <[email protected]>, Chris Egerton 
<[email protected]>
---
 .../kafka/connect/runtime/AbstractWorkerSourceTask.java     |  9 ++++++---
 .../kafka/connect/runtime/ConnectMetricsRegistry.java       | 13 ++++++-------
 .../kafka/connect/runtime/AbstractWorkerSourceTaskTest.java | 12 ++++++------
 3 files changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index fb3c04be6cf..2021808286c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -578,6 +578,8 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
         private final int batchSize;
         private boolean completed = false;
         private int counter;
+        private int skipped; // Keeps track of filtered records
+
         public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
metricsGroup) {
             assert batchSize > 0;
             assert metricsGroup != null;
@@ -586,6 +588,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
             this.metricsGroup = metricsGroup;
         }
         public void skipRecord() {
+            skipped += 1;
             if (counter > 0 && --counter == 0) {
                 finishedAllWrites();
             }
@@ -600,7 +603,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
         }
         private void finishedAllWrites() {
             if (!completed) {
-                metricsGroup.recordWrite(batchSize - counter);
+                metricsGroup.recordWrite(batchSize - counter, skipped);
                 completed = true;
             }
         }
@@ -652,8 +655,8 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
             sourceRecordActiveCount.record(activeRecordCount);
         }
 
-        void recordWrite(int recordCount) {
-            sourceRecordWrite.record(recordCount);
+        void recordWrite(int recordCount, int skippedCount) {
+            sourceRecordWrite.record(recordCount - skippedCount);
             activeRecordCount -= recordCount;
             activeRecordCount = Math.max(0, activeRecordCount);
             sourceRecordActiveCount.record(activeRecordCount);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index d8579d44fc6..507ebc405c8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -179,15 +179,14 @@ public class ConnectMetricsRegistry {
                                                "belonging to the named source 
connector in this worker.",
                                                sourceTaskTags);
         sourceRecordWriteRate = createTemplate("source-record-write-rate", 
SOURCE_TASK_GROUP_NAME,
-                                               "The average per-second number 
of records output from the transformations and written" +
-                                               " to Kafka for this task 
belonging to the named source connector in this worker. This" +
-                                               " is after transformations are 
applied and excludes any records filtered out by the " +
-                                               "transformations.",
+                                               "The average per-second number 
of records written to Kafka for this task belonging to the " +
+                                                "named source connector in 
this worker, since the task was last restarted. This is after " +
+                                                "transformations are applied, 
and excludes any records filtered out by the transformations.",
                                                sourceTaskTags);
         sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
-                                                "The number of records output 
from the transformations and written to Kafka for this" +
-                                                " task belonging to the named 
source connector in this worker, since the task was " +
-                                                "last restarted.",
+                                                "The number of records output 
written to Kafka for this task belonging to the " +
+                                                "named source connector in 
this worker, since the task was last restarted. This is after " +
+                                                "transformations are applied, 
and excludes any records filtered out by the transformations.",
                                                 sourceTaskTags);
         sourceRecordPollBatchTimeMax = 
createTemplate("poll-batch-max-time-ms", SOURCE_TASK_GROUP_NAME,
                                                       "The maximum time in 
milliseconds taken by this task to poll for a batch of " +
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index f2f63264e36..9212d8df695 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -191,18 +191,18 @@ public class AbstractWorkerSourceTaskTest {
         AbstractWorkerSourceTask.SourceTaskMetricsGroup group1 = new 
AbstractWorkerSourceTask.SourceTaskMetricsGroup(taskId1, metrics);
         for (int i = 0; i != 10; ++i) {
             group.recordPoll(100, 1000 + i * 100);
-            group.recordWrite(10);
+            group.recordWrite(10, 2);
         }
         for (int i = 0; i != 20; ++i) {
             group1.recordPoll(100, 1000 + i * 100);
-            group1.recordWrite(10);
+            group1.recordWrite(10, 4);
         }
         assertEquals(1900.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"poll-batch-max-time-ms"), 0.001d);
         assertEquals(1450.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"poll-batch-avg-time-ms"), 0.001d);
         assertEquals(33.333, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-poll-rate"), 0.001d);
         assertEquals(1000, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-poll-total"), 0.001d);
-        assertEquals(3.3333, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-write-rate"), 0.001d);
-        assertEquals(100, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-write-total"), 0.001d);
+        assertEquals(2.666, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-write-rate"), 0.001d);
+        assertEquals(80, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-write-total"), 0.001d);
         assertEquals(900.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-active-count"), 0.001d);
 
         // Close the group
@@ -226,8 +226,8 @@ public class AbstractWorkerSourceTaskTest {
         assertEquals(1950.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"poll-batch-avg-time-ms"), 0.001d);
         assertEquals(66.667, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-poll-rate"), 0.001d);
         assertEquals(2000, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-poll-total"), 0.001d);
-        assertEquals(6.667, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-write-rate"), 0.001d);
-        assertEquals(200, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-write-total"), 0.001d);
+        assertEquals(4.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-write-rate"), 0.001d);
+        assertEquals(120, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-write-total"), 0.001d);
         assertEquals(1800.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-active-count"), 0.001d);
     }
 

Reply via email to