This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 62931a1 [FLINK-23686][connector/kafka] Increase counter
"commitsSucceeded" per commit instead of per partition
62931a1 is described below
commit 62931a1665e6a6976d088ed49375f9fdf00229d9
Author: Qingsheng Ren <[email protected]>
AuthorDate: Sat Aug 14 19:03:24 2021 +0800
[FLINK-23686][connector/kafka] Increase counter "commitsSucceeded" per
commit instead of per partition
---
.../source/metrics/KafkaSourceReaderMetrics.java | 6 ++++-
.../kafka/source/reader/KafkaSourceReader.java | 1 +
.../metrics/KafkaSourceReaderMetricsTest.java | 12 +++++++++-
.../kafka/source/reader/KafkaSourceReaderTest.java | 27 ++++++++++++++--------
4 files changed, 34 insertions(+), 12 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
index df1abe5..356409f 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
@@ -137,10 +137,14 @@ public class KafkaSourceReaderMetrics {
*/
public void recordCommittedOffset(TopicPartition tp, long offset) {
checkTopicPartitionTracked(tp);
- commitsSucceeded.inc();
offsets.get(tp).committedOffset = offset;
}
+ /** Mark a successful commit. */
+ public void recordSucceededCommit() {
+ commitsSucceeded.inc();
+ }
+
/** Mark a failure commit. */
public void recordFailedCommit() {
commitsFailed.inc();
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index 3a00be5..287dadf 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -130,6 +130,7 @@ public class KafkaSourceReader<T>
LOG.debug(
"Successfully committed offsets for
checkpoint {}",
checkpointId);
+
kafkaSourceReaderMetrics.recordSucceededCommit();
// If the finished topic partition has been
committed, we remove it
// from the offsets of the finished splits map.
Map<TopicPartition, OffsetAndMetadata>
committedPartitions =
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java
index b16a835..c7df9b6 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java
@@ -83,7 +83,17 @@ public class KafkaSourceReaderMetricsTest {
assertCommittedOffset(BAR_1, 15513L, metricListener);
assertEquals(
- 4L,
+ 0L,
+ metricListener
+ .getCounter(
+
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
+
KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER)
+ .getCount());
+
+ kafkaSourceReaderMetrics.recordSucceededCommit();
+
+ assertEquals(
+ 1L,
metricListener
.getCounter(
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 96f0bed..16e3d6a 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -238,7 +239,8 @@ public class KafkaSourceReaderTest extends
SourceReaderTestBase<KafkaPartitionSp
public void testKafkaSourceMetrics() throws Exception {
final MetricListener metricListener = new MetricListener();
final String groupId = "testKafkaSourceMetrics";
- final TopicPartition tp = new TopicPartition(TOPIC, 0);
+ final TopicPartition tp0 = new TopicPartition(TOPIC, 0);
+ final TopicPartition tp1 = new TopicPartition(TOPIC, 1);
try (KafkaSourceReader<Integer> reader =
(KafkaSourceReader<Integer>)
@@ -247,28 +249,32 @@ public class KafkaSourceReaderTest extends
SourceReaderTestBase<KafkaPartitionSp
groupId,
metricListener.getMetricGroup())) {
- KafkaPartitionSplit split =
- new KafkaPartitionSplit(tp,
KafkaPartitionSplit.EARLIEST_OFFSET);
- reader.addSplits(Collections.singletonList(split));
+ KafkaPartitionSplit split0 =
+ new KafkaPartitionSplit(tp0,
KafkaPartitionSplit.EARLIEST_OFFSET);
+ KafkaPartitionSplit split1 =
+ new KafkaPartitionSplit(tp1,
KafkaPartitionSplit.EARLIEST_OFFSET);
+ reader.addSplits(Arrays.asList(split0, split1));
TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
pollUntil(
reader,
output,
- () -> output.getEmittedRecords().size() ==
NUM_RECORDS_PER_SPLIT,
+ () -> output.getEmittedRecords().size() ==
NUM_RECORDS_PER_SPLIT * 2,
String.format(
- "Failed to poll %d records until timeout",
NUM_RECORDS_PER_SPLIT));
+ "Failed to poll %d records until timeout",
NUM_RECORDS_PER_SPLIT * 2));
// Metric "records-consumed-total" of KafkaConsumer should be
NUM_RECORDS_PER_SPLIT
assertEquals(
- NUM_RECORDS_PER_SPLIT,
+ NUM_RECORDS_PER_SPLIT * 2,
getKafkaConsumerMetric("records-consumed-total",
metricListener));
// Current consuming offset should be NUM_RECORD_PER_SPLIT - 1
- assertEquals(NUM_RECORDS_PER_SPLIT - 1, getCurrentOffsetMetric(tp,
metricListener));
+ assertEquals(NUM_RECORDS_PER_SPLIT - 1,
getCurrentOffsetMetric(tp0, metricListener));
+ assertEquals(NUM_RECORDS_PER_SPLIT - 1,
getCurrentOffsetMetric(tp1, metricListener));
// No offset is committed till now
- assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp,
metricListener));
+ assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp0,
metricListener));
+ assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp1,
metricListener));
// Trigger offset commit
reader.snapshotState(15213L);
@@ -284,7 +290,8 @@ public class KafkaSourceReaderTest extends
SourceReaderTestBase<KafkaPartitionSp
assertEquals(1, getKafkaConsumerMetric("commit-total",
metricListener));
// Committed offset should be NUM_RECORD_PER_SPLIT
- assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp,
metricListener));
+ assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp0,
metricListener));
+ assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp1,
metricListener));
// Number of successful commits should be 1
assertEquals(