This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 62931a1  [FLINK-23686][connector/kafka] Increase counter 
"commitsSucceeded" per commit instead of per partition
62931a1 is described below

commit 62931a1665e6a6976d088ed49375f9fdf00229d9
Author: Qingsheng Ren <[email protected]>
AuthorDate: Sat Aug 14 19:03:24 2021 +0800

    [FLINK-23686][connector/kafka] Increase counter "commitsSucceeded" per 
commit instead of per partition
---
 .../source/metrics/KafkaSourceReaderMetrics.java   |  6 ++++-
 .../kafka/source/reader/KafkaSourceReader.java     |  1 +
 .../metrics/KafkaSourceReaderMetricsTest.java      | 12 +++++++++-
 .../kafka/source/reader/KafkaSourceReaderTest.java | 27 ++++++++++++++--------
 4 files changed, 34 insertions(+), 12 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
index df1abe5..356409f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
@@ -137,10 +137,14 @@ public class KafkaSourceReaderMetrics {
      */
     public void recordCommittedOffset(TopicPartition tp, long offset) {
         checkTopicPartitionTracked(tp);
-        commitsSucceeded.inc();
         offsets.get(tp).committedOffset = offset;
     }
 
+    /** Mark a successful commit. */
+    public void recordSucceededCommit() {
+        commitsSucceeded.inc();
+    }
+
     /** Mark a failure commit. */
     public void recordFailedCommit() {
         commitsFailed.inc();
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index 3a00be5..287dadf 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -130,6 +130,7 @@ public class KafkaSourceReader<T>
                                 LOG.debug(
                                         "Successfully committed offsets for 
checkpoint {}",
                                         checkpointId);
+                                
kafkaSourceReaderMetrics.recordSucceededCommit();
                                 // If the finished topic partition has been 
committed, we remove it
                                 // from the offsets of the finished splits map.
                                 Map<TopicPartition, OffsetAndMetadata> 
committedPartitions =
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java
index b16a835..c7df9b6 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetricsTest.java
@@ -83,7 +83,17 @@ public class KafkaSourceReaderMetricsTest {
         assertCommittedOffset(BAR_1, 15513L, metricListener);
 
         assertEquals(
-                4L,
+                0L,
+                metricListener
+                        .getCounter(
+                                
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
+                                
KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER)
+                        .getCount());
+
+        kafkaSourceReaderMetrics.recordSucceededCommit();
+
+        assertEquals(
+                1L,
                 metricListener
                         .getCounter(
                                 
KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP,
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 96f0bed..16e3d6a 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -238,7 +239,8 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
     public void testKafkaSourceMetrics() throws Exception {
         final MetricListener metricListener = new MetricListener();
         final String groupId = "testKafkaSourceMetrics";
-        final TopicPartition tp = new TopicPartition(TOPIC, 0);
+        final TopicPartition tp0 = new TopicPartition(TOPIC, 0);
+        final TopicPartition tp1 = new TopicPartition(TOPIC, 1);
 
         try (KafkaSourceReader<Integer> reader =
                 (KafkaSourceReader<Integer>)
@@ -247,28 +249,32 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
                                 groupId,
                                 metricListener.getMetricGroup())) {
 
-            KafkaPartitionSplit split =
-                    new KafkaPartitionSplit(tp, 
KafkaPartitionSplit.EARLIEST_OFFSET);
-            reader.addSplits(Collections.singletonList(split));
+            KafkaPartitionSplit split0 =
+                    new KafkaPartitionSplit(tp0, 
KafkaPartitionSplit.EARLIEST_OFFSET);
+            KafkaPartitionSplit split1 =
+                    new KafkaPartitionSplit(tp1, 
KafkaPartitionSplit.EARLIEST_OFFSET);
+            reader.addSplits(Arrays.asList(split0, split1));
 
             TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
             pollUntil(
                     reader,
                     output,
-                    () -> output.getEmittedRecords().size() == 
NUM_RECORDS_PER_SPLIT,
+                    () -> output.getEmittedRecords().size() == 
NUM_RECORDS_PER_SPLIT * 2,
                     String.format(
-                            "Failed to poll %d records until timeout", 
NUM_RECORDS_PER_SPLIT));
+                            "Failed to poll %d records until timeout", 
NUM_RECORDS_PER_SPLIT * 2));
 
             // Metric "records-consumed-total" of KafkaConsumer should be 
NUM_RECORDS_PER_SPLIT
             assertEquals(
-                    NUM_RECORDS_PER_SPLIT,
+                    NUM_RECORDS_PER_SPLIT * 2,
                     getKafkaConsumerMetric("records-consumed-total", 
metricListener));
 
             // Current consuming offset should be NUM_RECORD_PER_SPLIT - 1
-            assertEquals(NUM_RECORDS_PER_SPLIT - 1, getCurrentOffsetMetric(tp, 
metricListener));
+            assertEquals(NUM_RECORDS_PER_SPLIT - 1, 
getCurrentOffsetMetric(tp0, metricListener));
+            assertEquals(NUM_RECORDS_PER_SPLIT - 1, 
getCurrentOffsetMetric(tp1, metricListener));
 
             // No offset is committed till now
-            assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp, 
metricListener));
+            assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp0, 
metricListener));
+            assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp1, 
metricListener));
 
             // Trigger offset commit
             reader.snapshotState(15213L);
@@ -284,7 +290,8 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
             assertEquals(1, getKafkaConsumerMetric("commit-total", 
metricListener));
 
             // Committed offset should be NUM_RECORD_PER_SPLIT
-            assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp, 
metricListener));
+            assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp0, 
metricListener));
+            assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp1, 
metricListener));
 
             // Number of successful commits should be 1
             assertEquals(

Reply via email to