This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 1ec0476839c KAFKA-14659 source-record-write-[rate|total] metrics
should exclude filtered records (#13193)
1ec0476839c is described below
commit 1ec0476839c04d7d9fc6dbadab26364422ff79a7
Author: Hector Geraldino <[email protected]>
AuthorDate: Tue Feb 28 09:40:18 2023 -0500
KAFKA-14659 source-record-write-[rate|total] metrics should exclude
filtered records (#13193)
Reviewers: Christo Lolov <[email protected]>, Chris Egerton
<[email protected]>
---
.../kafka/connect/runtime/AbstractWorkerSourceTask.java | 9 ++++++---
.../kafka/connect/runtime/ConnectMetricsRegistry.java | 13 ++++++-------
.../kafka/connect/runtime/AbstractWorkerSourceTaskTest.java | 12 ++++++------
3 files changed, 18 insertions(+), 16 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index d3ec60ac66d..51836a2d6a3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -574,6 +574,8 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
private final int batchSize;
private boolean completed = false;
private int counter;
+ private int skipped; // Keeps track of filtered records
+
public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup
metricsGroup) {
assert batchSize > 0;
assert metricsGroup != null;
@@ -582,6 +584,7 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
this.metricsGroup = metricsGroup;
}
public void skipRecord() {
+ skipped += 1;
if (counter > 0 && --counter == 0) {
finishedAllWrites();
}
@@ -596,7 +599,7 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
}
private void finishedAllWrites() {
if (!completed) {
- metricsGroup.recordWrite(batchSize - counter);
+ metricsGroup.recordWrite(batchSize - counter, skipped);
completed = true;
}
}
@@ -648,8 +651,8 @@ public abstract class AbstractWorkerSourceTask extends
WorkerTask {
sourceRecordActiveCount.record(activeRecordCount);
}
- void recordWrite(int recordCount) {
- sourceRecordWrite.record(recordCount);
+ void recordWrite(int recordCount, int skippedCount) {
+ sourceRecordWrite.record(recordCount - skippedCount);
activeRecordCount -= recordCount;
activeRecordCount = Math.max(0, activeRecordCount);
sourceRecordActiveCount.record(activeRecordCount);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index d8579d44fc6..507ebc405c8 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -179,15 +179,14 @@ public class ConnectMetricsRegistry {
"belonging to the named source
connector in this worker.",
sourceTaskTags);
sourceRecordWriteRate = createTemplate("source-record-write-rate",
SOURCE_TASK_GROUP_NAME,
- "The average per-second number
of records output from the transformations and written" +
- " to Kafka for this task
belonging to the named source connector in this worker. This" +
- " is after transformations are
applied and excludes any records filtered out by the " +
- "transformations.",
+ "The average per-second number
of records written to Kafka for this task belonging to the " +
+ "named source connector in
this worker, since the task was last restarted. This is after " +
+ "transformations are applied,
and excludes any records filtered out by the transformations.",
sourceTaskTags);
sourceRecordWriteTotal = createTemplate("source-record-write-total",
SOURCE_TASK_GROUP_NAME,
- "The number of records output
from the transformations and written to Kafka for this" +
- " task belonging to the named
source connector in this worker, since the task was " +
- "last restarted.",
+ "The number of records output
written to Kafka for this task belonging to the " +
+ "named source connector in
this worker, since the task was last restarted. This is after " +
+ "transformations are applied,
and excludes any records filtered out by the transformations.",
sourceTaskTags);
sourceRecordPollBatchTimeMax =
createTemplate("poll-batch-max-time-ms", SOURCE_TASK_GROUP_NAME,
"The maximum time in
milliseconds taken by this task to poll for a batch of " +
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index d0833dbffc7..f82981d9499 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -189,18 +189,18 @@ public class AbstractWorkerSourceTaskTest {
AbstractWorkerSourceTask.SourceTaskMetricsGroup group1 = new
AbstractWorkerSourceTask.SourceTaskMetricsGroup(taskId1, metrics);
for (int i = 0; i != 10; ++i) {
group.recordPoll(100, 1000 + i * 100);
- group.recordWrite(10);
+ group.recordWrite(10, 2);
}
for (int i = 0; i != 20; ++i) {
group1.recordPoll(100, 1000 + i * 100);
- group1.recordWrite(10);
+ group1.recordWrite(10, 4);
}
assertEquals(1900.0,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"poll-batch-max-time-ms"), 0.001d);
assertEquals(1450.0,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"poll-batch-avg-time-ms"), 0.001d);
assertEquals(33.333,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"source-record-poll-rate"), 0.001d);
assertEquals(1000,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"source-record-poll-total"), 0.001d);
- assertEquals(3.3333,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"source-record-write-rate"), 0.001d);
- assertEquals(100,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"source-record-write-total"), 0.001d);
+ assertEquals(2.666,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"source-record-write-rate"), 0.001d);
+ assertEquals(80,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"source-record-write-total"), 0.001d);
assertEquals(900.0,
metrics.currentMetricValueAsDouble(group.metricGroup(),
"source-record-active-count"), 0.001d);
// Close the group
@@ -224,8 +224,8 @@ public class AbstractWorkerSourceTaskTest {
assertEquals(1950.0,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"poll-batch-avg-time-ms"), 0.001d);
assertEquals(66.667,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-poll-rate"), 0.001d);
assertEquals(2000,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-poll-total"), 0.001d);
- assertEquals(6.667,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-write-rate"), 0.001d);
- assertEquals(200,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-write-total"), 0.001d);
+ assertEquals(4.0,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-write-rate"), 0.001d);
+ assertEquals(120,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-write-total"), 0.001d);
assertEquals(1800.0,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-active-count"), 0.001d);
}