This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 339cd3c98e4783e3c8225c5495be40a0c04ff386 Author: Jing Ge <[email protected]> AuthorDate: Thu Feb 17 23:35:47 2022 +0100 [FLINK-26126][kafka] use record/byte send counter metrics from SinkWriterMetricGroup directly. Bug fixed. Enable the metric test in KafkaSinkITCase and KafkaSinkE2ECase again. --- .../apache/flink/connector/kafka/sink/KafkaWriter.java | 15 ++++++++------- .../flink/connector/kafka/sink/KafkaSinkITCase.java | 13 ------------- .../flink/connector/kafka/sink/KafkaWriterITCase.java | 15 ++++++++++----- .../apache/flink/tests/util/kafka/KafkaSinkE2ECase.java | 13 ------------- 4 files changed, 18 insertions(+), 38 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 0e0a874..8102ee8 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -89,12 +89,13 @@ class KafkaWriter<IN> private final KafkaRecordSerializationSchema<IN> recordSerializer; private final Callback deliveryCallback; private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext; + private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>(); private final SinkWriterMetricGroup metricGroup; - private final Counter numBytesOutCounter; - private final ProcessingTimeService timeService; private final boolean disabledMetrics; - private final Counter numRecordsOutCounter; + private final Counter numRecordsSendCounter; + private final Counter numBytesSendCounter; + private final ProcessingTimeService timeService; // Number of outgoing bytes at the latest metric sync private long latestOutgoingByteTotal; @@ -151,8 +152,8 @@ class KafkaWriter<IN> checkNotNull(sinkInitContext, "sinkInitContext"); this.timeService = sinkInitContext.getProcessingTimeService(); this.metricGroup = sinkInitContext.metricGroup(); - this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); - this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + this.numBytesSendCounter = metricGroup.getNumBytesSendCounter(); + this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter(); this.kafkaSinkContext = new DefaultKafkaSinkContext( sinkInitContext.getSubtaskId(), @@ -192,7 +193,7 @@ class KafkaWriter<IN> final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); currentProducer.send(record, deliveryCallback); - numRecordsOutCounter.inc(); + numRecordsSendCounter.inc(); } @Override @@ -385,7 +386,7 @@ class KafkaWriter<IN> long outgoingBytesUntilNow = ((Number) byteOutMetric.metricValue()).longValue(); long outgoingBytesSinceLastUpdate = outgoingBytesUntilNow - latestOutgoingByteTotal; - numBytesOutCounter.inc(outgoingBytesSinceLastUpdate); + numBytesSendCounter.inc(outgoingBytesSinceLastUpdate); latestOutgoingByteTotal = outgoingBytesUntilNow; lastSync = time; registerMetricSync(); diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index 32210c6..be7a928 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -34,9 +34,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory; import org.apache.flink.connector.kafka.testutils.KafkaUtil; import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; -import org.apache.flink.connector.testframe.environment.TestEnvironment; import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; -import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -81,9 +79,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.TestTemplate; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,15 +201,6 @@ public class KafkaSinkITCase extends TestLogger { @TestContext KafkaSinkExternalContextFactory sinkContext = new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList()); - - @Disabled("Skip metric test until FLINK-26126 fixed") - @TestTemplate - @Override - public void testMetrics( - TestEnvironment testEnv, - DataStreamSinkExternalContext<String> externalContext, - CheckpointingMode semantic) - throws Exception {} } @Test diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index d68a813..b8447a1 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -147,13 +147,18 @@ public class KafkaWriterITCase { try (final KafkaWriter<Integer> writer = createWriterWithConfiguration( getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { - final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter(); - final Counter numRecordsOut = operatorIOMetricGroup.getNumRecordsOutCounter(); - assertEquals(numBytesOut.getCount(), 0L); + final Counter numBytesSend = metricGroup.getNumBytesSendCounter(); + final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter(); + final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter(); + assertEquals(numBytesSend.getCount(), 0L); + assertEquals(numRecordsSend.getCount(), 0); + assertEquals(numRecordsWrittenErrors.getCount(), 0); + writer.write(1, SINK_WRITER_CONTEXT); timeService.trigger(); - assertEquals(numRecordsOut.getCount(), 1); - assertThat(numBytesOut.getCount(), greaterThan(0L)); + assertEquals(numRecordsSend.getCount(), 1); + assertEquals(numRecordsWrittenErrors.getCount(), 0); + assertThat(numBytesSend.getCount(), greaterThan(0L)); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java index ff845f4..0c75a00 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java @@ -19,9 +19,7 @@ package org.apache.flink.tests.util.kafka; import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory; -import org.apache.flink.connector.testframe.environment.TestEnvironment; import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; -import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -32,8 +30,6 @@ import org.apache.flink.tests.util.TestUtils; import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment; import org.apache.flink.util.DockerImageVersions; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; @@ -84,13 +80,4 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> { .toURL())); public KafkaSinkE2ECase() throws Exception {} - - @Disabled("Skip metric test until FLINK-26126 fixed") - @TestTemplate - @Override - public void testMetrics( - TestEnvironment testEnv, - DataStreamSinkExternalContext<String> externalContext, - CheckpointingMode semantic) - throws Exception {} }
