This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7312e13a6b25f798d13523f450babf9d49490ef0 Author: Fabian Paul <[email protected]> AuthorDate: Fri Nov 12 09:10:01 2021 +0100 [FLINK-24409][kafka] Fix collection of KafkaSourceReaderMetrics for topics containing periods Internally, Kafka translates the periods in topic names to underscore. This led to that Flink could not collect the metrics and logged a warning. With this commit we also translate the topic name before trying to collect the metrics. --- .../source/metrics/KafkaSourceReaderMetrics.java | 6 ++- .../reader/KafkaPartitionSplitReaderTest.java | 47 ++++++++++++++-------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java index 570338b..94c1cb4 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java @@ -289,6 +289,8 @@ public class KafkaSourceReaderMetrics { private @Nullable Metric getRecordsLagMetric( Map<MetricName, ? extends Metric> metrics, TopicPartition tp) { try { + final String resolvedTopic = tp.topic().replace('.', '_'); + final String resolvedPartition = String.valueOf(tp.partition()); Predicate<Map.Entry<MetricName, ? extends Metric>> filter = entry -> { final MetricName metricName = entry.getKey(); @@ -297,9 +299,9 @@ public class KafkaSourceReaderMetrics { return metricName.group().equals(CONSUMER_FETCH_MANAGER_GROUP) && metricName.name().equals(RECORDS_LAG) && tags.containsKey("topic") - && tags.get("topic").equals(tp.topic()) + && tags.get("topic").equals(resolvedTopic) && tags.containsKey("partition") - && tags.get("partition").equals(String.valueOf(tp.partition())); + && tags.get("partition").equals(resolvedPartition); }; return MetricUtil.getKafkaMetric(metrics, filter); } catch (IllegalStateException e) { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java index 6e4af37..d4dd33f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java @@ -44,9 +44,12 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.hamcrest.Matchers; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EmptySource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; @@ -64,11 +67,11 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Unit tests for {@link KafkaPartitionSplitReader}. */ public class KafkaPartitionSplitReaderTest { @@ -79,7 +82,7 @@ public class KafkaPartitionSplitReaderTest { private static Map<Integer, Map<String, KafkaPartitionSplit>> splitsByOwners; private static Map<TopicPartition, Long> earliestOffsets; - @BeforeClass + @BeforeAll public static void setup() throws Throwable { KafkaSourceTestEnv.setup(); KafkaSourceTestEnv.setupTopic(TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopic); @@ -91,7 +94,7 @@ public class KafkaPartitionSplitReaderTest { KafkaSourceTestEnv.getPartitionsForTopics(Arrays.asList(TOPIC1, TOPIC2))); } - @AfterClass + @AfterAll public static void tearDown() throws Exception { KafkaSourceTestEnv.tearDown(); } @@ -162,8 +165,18 @@ public class KafkaPartitionSplitReaderTest { assertThat(numBytesInCounter.getCount(), Matchers.greaterThan(latestNumBytesIn)); } - @Test - public void testPendingRecordsGauge() throws Exception { + @ParameterizedTest + @EmptySource + @ValueSource(strings = {"_underscore.period-minus"}) + public void testPendingRecordsGauge(String topicSuffix) throws Throwable { + final String topic1Name = TOPIC1 + topicSuffix; + final String topic2Name = TOPIC2 + topicSuffix; + if (!topicSuffix.isEmpty()) { + KafkaSourceTestEnv.setupTopic( + topic1Name, true, true, KafkaSourceTestEnv::getRecordsForTopic); + KafkaSourceTestEnv.setupTopic( + topic2Name, true, true, KafkaSourceTestEnv::getRecordsForTopic); + } MetricListener metricListener = new MetricListener(); final Properties props = new Properties(); props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); @@ -175,7 +188,7 @@ public class KafkaPartitionSplitReaderTest { reader.handleSplitsChanges( new SplitsAddition<>( Collections.singletonList( - new KafkaPartitionSplit(new TopicPartition(TOPIC1, 0), 0L)))); + new KafkaPartitionSplit(new TopicPartition(topic1Name, 0), 0L)))); // pendingRecords should have not been registered because of lazily registration assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent()); // Trigger first fetch @@ -194,7 +207,7 @@ public class KafkaPartitionSplitReaderTest { reader.handleSplitsChanges( new SplitsAddition<>( Collections.singletonList( - new KafkaPartitionSplit(new TopicPartition(TOPIC2, 0), 0L)))); + new KafkaPartitionSplit(new TopicPartition(topic2Name, 0), 0L)))); // Validate pendingRecords for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) { reader.fetch(); @@ -282,11 +295,11 @@ public class KafkaPartitionSplitReaderTest { long earliestOffset = earliestOffsets.get(tp); long expectedRecordCount = NUM_RECORDS_PER_PARTITION - earliestOffset; assertEquals( + expectedRecordCount, + (long) recordCount, String.format( "%s should have %d records.", - splits.get(splitId), expectedRecordCount), - expectedRecordCount, - (long) recordCount); + splits.get(splitId), expectedRecordCount)); }); }
