This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit d8abb2a4cf2bfc692f2386e62220520680193381 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Fri Nov 11 21:10:11 2022 +0800 [FLINK-26027][Connector/Pulsar] Expose Pulsar producer metrics and add FLIP-33 sink metrics. (#21249) --- .../pulsar/common/metrics/MetricNames.java | 63 ++++++++++++++++ .../common/metrics/ProducerMetricsInterceptor.java | 67 +++++++++++++++++ .../connector/pulsar/sink/PulsarSinkOptions.java | 14 ++++ .../pulsar/sink/config/SinkConfiguration.java | 16 +++- .../connector/pulsar/sink/writer/PulsarWriter.java | 6 +- .../sink/writer/topic/TopicProducerRegister.java | 86 +++++++++++++++++++++- .../pulsar/source/PulsarSourceOptions.java | 14 ++++ .../pulsar/source/config/SourceConfiguration.java | 16 +++- .../source/enumerator/PulsarSourceEnumerator.java | 8 ++ .../source/enumerator/assigner/SplitAssigner.java | 3 + .../enumerator/assigner/SplitAssignerBase.java | 5 ++ .../source/reader/PulsarSourceReaderFactory.java | 6 +- .../split/PulsarOrderedPartitionSplitReader.java | 6 +- .../split/PulsarPartitionSplitReaderBase.java | 66 ++++++++++++++++- .../split/PulsarUnorderedPartitionSplitReader.java | 4 +- .../connector/pulsar/sink/PulsarSinkITCase.java | 4 +- .../writer/topic/TopicProducerRegisterTest.java | 7 +- .../source/enumerator/cursor/StopCursorTest.java | 6 +- .../split/PulsarPartitionSplitReaderTestBase.java | 12 ++- .../testutils/sink/PulsarSinkTestSuiteBase.java | 37 ---------- 20 files changed, 389 insertions(+), 57 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java new file mode 100644 index 0000000..5156b47 --- /dev/null +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.metrics; + +/** The constant class for holding all the custom metrics names in Pulsar. */ +public final class MetricNames { + + private MetricNames() { + // No public constructor. + } + + public static final String PULSAR_PRODUCER_METRIC_NAME = "PulsarProducer"; + public static final String NUM_MSGS_SENT = "numMsgsSent"; + public static final String NUM_BYTES_SENT = "numBytesSent"; + public static final String NUM_SEND_FAILED = "numSendFailed"; + public static final String NUM_ACKS_RECEIVED = "numAcksReceived"; + public static final String SEND_MSGS_RATE = "sendMsgsRate"; + public static final String SEND_BYTES_RATE = "sendBytesRate"; + public static final String SEND_LATENCY_MILLIS_50_PCT = "sendLatencyMillis50pct"; + public static final String SEND_LATENCY_MILLIS_75_PCT = "sendLatencyMillis75pct"; + public static final String SEND_LATENCY_MILLIS_95_PCT = "sendLatencyMillis95pct"; + public static final String SEND_LATENCY_MILLIS_99_PCT = "sendLatencyMillis99pct"; + public static final String SEND_LATENCY_MILLIS_999_PCT = "sendLatencyMillis999pct"; + public static final String SEND_LATENCY_MILLIS_MAX = "sendLatencyMillisMax"; + public static final String TOTAL_MSGS_SENT = "totalMsgsSent"; + public static final String TOTAL_BYTES_SENT = "totalBytesSent"; + public static final String TOTAL_SEND_FAILED = "totalSendFailed"; + public static final String TOTAL_ACKS_RECEIVED = "totalAcksReceived"; + public static final String PENDING_QUEUE_SIZE = "pendingQueueSize"; + + public static final String PULSAR_CONSUMER_METRIC_NAME = "PulsarConsumer"; + public static final String NUM_MSGS_RECEIVED = "numMsgsReceived"; + public static final String NUM_BYTES_RECEIVED = "numBytesReceived"; + public static final String RATE_MSGS_RECEIVED = "rateMsgsReceived"; + public static final String RATE_BYTES_RECEIVED = "rateBytesReceived"; + public static final String NUM_ACKS_SENT = "numAcksSent"; + public static final String NUM_ACKS_FAILED = "numAcksFailed"; + public static final String NUM_RECEIVE_FAILED = "numReceiveFailed"; + public static final String NUM_BATCH_RECEIVE_FAILED = "numBatchReceiveFailed"; + public static final String TOTAL_MSGS_RECEIVED = "totalMsgsReceived"; + public static final String TOTAL_BYTES_RECEIVED = "totalBytesReceived"; + public static final String TOTAL_RECEIVED_FAILED = "totalReceivedFailed"; + public static final String TOTAL_BATCH_RECEIVED_FAILED = "totalBatchReceivedFailed"; + public static final String TOTAL_ACKS_SENT = "totalAcksSent"; + public static final String TOTAL_ACKS_FAILED = "totalAcksFailed"; + public static final String MSG_NUM_IN_RECEIVER_QUEUE = "msgNumInReceiverQueue"; +} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java new file mode 100644 index 0000000..e2b9915 --- /dev/null +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; + +/** The metric statistic for Pulsar's {@link Producer}. */ +public class ProducerMetricsInterceptor implements ProducerInterceptor { + + private final Counter numRecordsOutErrors; + private final Counter numRecordsOutCounter; + private final Counter numBytesOutCounter; + + public ProducerMetricsInterceptor(SinkWriterMetricGroup metricGroup) { + this.numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); + this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); + } + + @Override + public void close() { + // Nothing to do by default. + } + + @Override + public boolean eligible(Message message) { + return true; + } + + @Override + public Message beforeSend(Producer producer, Message message) { + return message; + } + + @Override + public void onSendAcknowledgement( + Producer producer, Message message, MessageId msgId, Throwable exception) { + if (exception != null) { + numRecordsOutErrors.inc(1); + } else { + numRecordsOutCounter.inc(1); + numBytesOutCounter.inc(message.size()); + } + } +} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java index 0433bb0..39b5f73 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java @@ -38,6 +38,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.flink.configuration.description.LinkElement.link; import static org.apache.flink.configuration.description.TextElement.code; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX; import static org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash.MURMUR3_32_HASH; @@ -139,6 +140,19 @@ public final class PulsarSinkOptions { "The maximum number of pending messages in one sink parallelism.") .build()); + public static final ConfigOption<Boolean> PULSAR_ENABLE_SINK_METRICS = + ConfigOptions.key(SINK_CONFIG_PREFIX + "enableMetrics") + .booleanType() + .defaultValue(true) + .withDescription( + Description.builder() + .text( + "The metrics from Pulsar Producer are only exposed if you enable this option.") + .text( + "You should set the %s to a positive value if you enable this option.", + code(PULSAR_STATS_INTERVAL_SECONDS.key())) + .build()); + /////////////////////////////////////////////////////////////////////////////// // // The configuration for ProducerConfigurationData part. diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java index d6a6ee3..0a56ff3 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java @@ -31,7 +31,9 @@ import org.apache.pulsar.client.api.Schema; import java.util.Objects; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES; +import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_ENABLE_SINK_METRICS; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL; @@ -51,6 +53,7 @@ public class SinkConfiguration extends PulsarConfiguration { private final MessageKeyHash messageKeyHash; private final boolean enableSchemaEvolution; private final int maxRecommitTimes; + private final boolean enableMetrics; public SinkConfiguration(Configuration configuration) { super(configuration); @@ -62,6 +65,8 @@ public class SinkConfiguration extends PulsarConfiguration { this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH); this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION); this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES); + this.enableMetrics = + get(PULSAR_ENABLE_SINK_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0; } /** The delivery guarantee changes the behavior of {@link PulsarWriter}. */ @@ -113,6 +118,11 @@ public class SinkConfiguration extends PulsarConfiguration { return maxRecommitTimes; } + /** Whether to expose the metrics from Pulsar Producer. */ + public boolean isEnableMetrics() { + return enableMetrics; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -130,7 +140,8 @@ public class SinkConfiguration extends PulsarConfiguration { && partitionSwitchSize == that.partitionSwitchSize && enableSchemaEvolution == that.enableSchemaEvolution && messageKeyHash == that.messageKeyHash - && maxRecommitTimes == that.maxRecommitTimes; + && maxRecommitTimes == that.maxRecommitTimes + && enableMetrics == that.enableMetrics; } @Override @@ -142,6 +153,7 @@ public class SinkConfiguration extends PulsarConfiguration { partitionSwitchSize, messageKeyHash, enableSchemaEvolution, - maxRecommitTimes); + maxRecommitTimes, + enableMetrics); } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java index e30b593..c5c4622 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java @@ -72,7 +72,7 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi private final PulsarSinkContext sinkContext; private final TopicProducerRegister producerRegister; private final MailboxExecutor mailboxExecutor; - private final AtomicLong pendingMessages = new AtomicLong(0); + private final AtomicLong pendingMessages; /** * Constructor creating a Pulsar writer. @@ -122,8 +122,10 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi } // Create this producer register after opening serialization schema! - this.producerRegister = new TopicProducerRegister(sinkConfiguration); + this.producerRegister = + new TopicProducerRegister(sinkConfiguration, initContext.metricGroup()); this.mailboxExecutor = initContext.getMailboxExecutor(); + this.pendingMessages = new AtomicLong(0); } @Override diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java index 9bb1753..cf8ff00 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java @@ -20,14 +20,18 @@ package org.apache.flink.connector.pulsar.sink.writer.topic; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.pulsar.common.metrics.ProducerMetricsInterceptor; import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava30.com.google.common.io.Closer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.ProducerStats; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; @@ -36,6 +40,7 @@ import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.shade.com.google.common.base.Strings; import java.io.Closeable; import java.io.IOException; @@ -44,8 +49,27 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_ACKS_RECEIVED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_BYTES_SENT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_MSGS_SENT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_SEND_FAILED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.PENDING_QUEUE_SIZE; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.PULSAR_PRODUCER_METRIC_NAME; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_BYTES_RATE; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_50_PCT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_75_PCT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_95_PCT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_999_PCT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_99_PCT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_MAX; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_MSGS_RATE; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_ACKS_RECEIVED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_BYTES_SENT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_MSGS_SENT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_SEND_FAILED; import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; import static org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction; import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder; @@ -60,14 +84,21 @@ public class TopicProducerRegister implements Closeable { private final PulsarClient pulsarClient; private final SinkConfiguration sinkConfiguration; + private final SinkWriterMetricGroup metricGroup; private final Map<String, Map<SchemaInfo, Producer<?>>> producerRegister; private final Map<String, Transaction> transactionRegister; - public TopicProducerRegister(SinkConfiguration sinkConfiguration) { + public TopicProducerRegister( + SinkConfiguration sinkConfiguration, SinkWriterMetricGroup metricGroup) { this.pulsarClient = createClient(sinkConfiguration); this.sinkConfiguration = sinkConfiguration; + this.metricGroup = metricGroup; this.producerRegister = new HashMap<>(); this.transactionRegister = new HashMap<>(); + + if (sinkConfiguration.isEnableMetrics()) { + metricGroup.setCurrentSendTimeGauge(this::currentSendTimeGauge); + } } /** @@ -150,7 +181,13 @@ public class TopicProducerRegister implements Closeable { createProducerBuilder(pulsarClient, schema, sinkConfiguration); // Set the required topic name. builder.topic(topic); + // Set the sending counter for metrics. + builder.intercept(new ProducerMetricsInterceptor(metricGroup)); + Producer<T> producer = sneakyClient(builder::create); + + // Expose the stats for calculating and monitoring. + exposeProducerMetrics(producer); producers.put(schemaInfo, producer); return producer; @@ -199,4 +236,51 @@ public class TopicProducerRegister implements Closeable { private void clearTransactions() { transactionRegister.clear(); } + + private Long currentSendTimeGauge() { + double sendTime = + producerRegister.values().stream() + .flatMap(v -> v.values().stream()) + .map(Producer::getStats) + .mapToDouble(ProducerStats::getSendLatencyMillis50pct) + .average() + .orElse(Long.MAX_VALUE); + + return Math.round(sendTime); + } + + private void exposeProducerMetrics(Producer<?> producer) { + if (sinkConfiguration.isEnableMetrics()) { + String producerIdentity = producer.getProducerName(); + if (Strings.isNullOrEmpty(producerIdentity)) { + // Fallback to use the topic name. + producerIdentity = UUID.randomUUID().toString(); + } + + MetricGroup group = + metricGroup + .addGroup(PULSAR_PRODUCER_METRIC_NAME) + .addGroup(producer.getTopic()) + .addGroup(producerIdentity); + ProducerStats stats = producer.getStats(); + + group.gauge(NUM_MSGS_SENT, stats::getNumMsgsSent); + group.gauge(NUM_BYTES_SENT, stats::getNumBytesSent); + group.gauge(NUM_SEND_FAILED, stats::getNumSendFailed); + group.gauge(NUM_ACKS_RECEIVED, stats::getNumAcksReceived); + group.gauge(SEND_MSGS_RATE, stats::getSendMsgsRate); + group.gauge(SEND_BYTES_RATE, stats::getSendBytesRate); + group.gauge(SEND_LATENCY_MILLIS_50_PCT, stats::getSendLatencyMillis50pct); + group.gauge(SEND_LATENCY_MILLIS_75_PCT, stats::getSendLatencyMillis75pct); + group.gauge(SEND_LATENCY_MILLIS_95_PCT, stats::getSendLatencyMillis95pct); + group.gauge(SEND_LATENCY_MILLIS_99_PCT, stats::getSendLatencyMillis99pct); + group.gauge(SEND_LATENCY_MILLIS_999_PCT, stats::getSendLatencyMillis999pct); + group.gauge(SEND_LATENCY_MILLIS_MAX, stats::getSendLatencyMillisMax); + group.gauge(TOTAL_MSGS_SENT, stats::getTotalMsgsSent); + group.gauge(TOTAL_BYTES_SENT, stats::getTotalBytesSent); + group.gauge(TOTAL_SEND_FAILED, stats::getTotalSendFailed); + group.gauge(TOTAL_ACKS_RECEIVED, stats::getTotalAcksReceived); + group.gauge(PENDING_QUEUE_SIZE, stats::getPendingQueueSize); + } + } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java index d0ef6de..8c2ab6b 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyMap; import static org.apache.flink.configuration.description.TextElement.code; import static org.apache.flink.configuration.description.TextElement.text; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.CONSUMER_CONFIG_PREFIX; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.SOURCE_CONFIG_PREFIX; @@ -204,6 +205,19 @@ public final class PulsarSourceOptions { "In this case, a single consumer will still receive all the keys, but they may be coming in different orders.") .build()); + public static final ConfigOption<Boolean> PULSAR_ENABLE_SOURCE_METRICS = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableMetrics") + .booleanType() + .defaultValue(true) + .withDescription( + Description.builder() + .text( + "The metrics from Pulsar Consumer are only exposed if you enable this option.") + .text( + "You should set the %s to a positive value if you enable this option.", + code(PULSAR_STATS_INTERVAL_SECONDS.key())) + .build()); + /////////////////////////////////////////////////////////////////////////////// // // The configuration for ConsumerConfigurationData part. diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java index f9e8a65..5a52a59 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java @@ -34,9 +34,11 @@ import java.time.Duration; import java.util.Objects; import static org.apache.flink.connector.base.source.reader.SourceReaderOptions.ELEMENT_QUEUE_CAPACITY; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; @@ -63,6 +65,7 @@ public class SourceConfiguration extends PulsarConfiguration { private final SubscriptionType subscriptionType; private final SubscriptionMode subscriptionMode; private final boolean allowKeySharedOutOfOrderDelivery; + private final boolean enableMetrics; public SourceConfiguration(Configuration configuration) { super(configuration); @@ -79,6 +82,8 @@ public class SourceConfiguration extends PulsarConfiguration { this.subscriptionType = get(PULSAR_SUBSCRIPTION_TYPE); this.subscriptionMode = get(PULSAR_SUBSCRIPTION_MODE); this.allowKeySharedOutOfOrderDelivery = get(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY); + this.enableMetrics = + get(PULSAR_ENABLE_SOURCE_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0; } /** The capacity of the element queue in the source reader. */ @@ -191,6 +196,11 @@ public class SourceConfiguration extends PulsarConfiguration { return allowKeySharedOutOfOrderDelivery; } + /** Whether to expose the metrics from Pulsar Consumer. */ + public boolean isEnableMetrics() { + return enableMetrics; + } + /** Convert the subscription into a readable str. */ public String getSubscriptionDesc() { return getSubscriptionName() @@ -223,7 +233,8 @@ public class SourceConfiguration extends PulsarConfiguration { && Objects.equals(subscriptionName, that.subscriptionName) && subscriptionType == that.subscriptionType && subscriptionMode == that.subscriptionMode - && allowKeySharedOutOfOrderDelivery == that.allowKeySharedOutOfOrderDelivery; + && allowKeySharedOutOfOrderDelivery == that.allowKeySharedOutOfOrderDelivery + && enableMetrics == that.enableMetrics; } @Override @@ -240,6 +251,7 @@ public class SourceConfiguration extends PulsarConfiguration { subscriptionName, subscriptionType, subscriptionMode, - allowKeySharedOutOfOrderDelivery); + allowKeySharedOutOfOrderDelivery, + enableMetrics); } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java index 931dc6a..8dbead2 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java @@ -30,6 +30,7 @@ import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubs import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup; import org.apache.flink.util.FlinkRuntimeException; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -63,6 +64,7 @@ public class PulsarSourceEnumerator private final SourceConfiguration sourceConfiguration; private final SplitEnumeratorContext<PulsarPartitionSplit> context; private final SplitAssigner splitAssigner; + private final SplitEnumeratorMetricGroup metricGroup; public PulsarSourceEnumerator( PulsarSubscriber subscriber, @@ -96,12 +98,18 @@ public class PulsarSourceEnumerator this.sourceConfiguration = sourceConfiguration; this.context = context; this.splitAssigner = createAssigner(stopCursor, sourceConfiguration, context, enumState); + this.metricGroup = context.metricGroup(); } @Override public void start() { rangeGenerator.open(sourceConfiguration); + // Expose the split assignment metrics if Flink has supported. + if (metricGroup != null) { + metricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount); + } + // Check the pulsar topic information and convert it into source split. if (sourceConfiguration.isEnablePartitionDiscovery()) { LOG.info( diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java index c734389..87faa28 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java @@ -60,4 +60,7 @@ public interface SplitAssigner { /** Snapshot the current assign state into checkpoint. */ PulsarSourceEnumState snapshotState(); + + /** Expose this for standard flink metrics. */ + long getUnassignedSplitCount(); } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java index 733072c..fedb1aa 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java @@ -94,6 +94,11 @@ abstract class SplitAssignerBase implements SplitAssigner { return new PulsarSourceEnumState(appendedPartitions); } + @Override + public long getUnassignedSplitCount() { + return pendingPartitionSplits.values().stream().mapToLong(Set::size).sum(); + } + /** Add split to pending lists. */ protected void addSplitToPendingList(int readerId, PulsarPartitionSplit split) { Set<PulsarPartitionSplit> splits = diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java index 2e83ab5..5d37614 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java @@ -84,7 +84,10 @@ public final class PulsarSourceReaderFactory { Supplier<PulsarOrderedPartitionSplitReader> splitReaderSupplier = () -> new PulsarOrderedPartitionSplitReader( - pulsarClient, pulsarAdmin, sourceConfiguration); + pulsarClient, + pulsarAdmin, + sourceConfiguration, + readerContext.metricGroup()); return new PulsarOrderedSourceReader<>( elementsQueue, @@ -109,6 +112,7 @@ public final class PulsarSourceReaderFactory { pulsarClient, pulsarAdmin, sourceConfiguration, + readerContext.metricGroup(), coordinatorClient); return new PulsarUnorderedSourceReader<>( diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java index 6b02e0f..8eb7b29 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -53,8 +54,9 @@ public class PulsarOrderedPartitionSplitReader extends PulsarPartitionSplitReade public PulsarOrderedPartitionSplitReader( PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, - SourceConfiguration sourceConfiguration) { - super(pulsarClient, pulsarAdmin, sourceConfiguration); + SourceConfiguration sourceConfiguration, + SourceReaderMetricGroup metricGroup) { + super(pulsarClient, pulsarAdmin, sourceConfiguration, metricGroup); } @Override diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java index 08e9977..fa36ee9 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java @@ -29,11 +29,16 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; @@ -49,9 +54,26 @@ import java.io.IOException; import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.MSG_NUM_IN_RECEIVER_QUEUE; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_ACKS_FAILED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_ACKS_SENT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_BATCH_RECEIVE_FAILED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_BYTES_RECEIVED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_MSGS_RECEIVED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_RECEIVE_FAILED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.PULSAR_CONSUMER_METRIC_NAME; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.RATE_BYTES_RECEIVED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.RATE_MSGS_RECEIVED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_ACKS_FAILED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_ACKS_SENT; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_BATCH_RECEIVED_FAILED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_BYTES_RECEIVED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_MSGS_RECEIVED; +import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_RECEIVED_FAILED; import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder; import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN; @@ -65,6 +87,7 @@ abstract class PulsarPartitionSplitReaderBase protected final PulsarClient pulsarClient; protected final PulsarAdmin pulsarAdmin; protected final SourceConfiguration sourceConfiguration; + protected final SourceReaderMetricGroup metricGroup; protected Consumer<byte[]> pulsarConsumer; protected PulsarPartitionSplit registeredSplit; @@ -72,10 +95,12 @@ abstract class PulsarPartitionSplitReaderBase protected PulsarPartitionSplitReaderBase( PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, - SourceConfiguration sourceConfiguration) { + SourceConfiguration sourceConfiguration, + SourceReaderMetricGroup metricGroup) { this.pulsarClient = pulsarClient; this.pulsarAdmin = pulsarAdmin; this.sourceConfiguration = sourceConfiguration; + this.metricGroup = metricGroup; } @Override @@ -239,6 +264,43 @@ abstract class PulsarPartitionSplitReaderBase } // Create the consumer configuration by using common utils. - return sneakyClient(consumerBuilder::subscribe); + Consumer<byte[]> consumer = sneakyClient(consumerBuilder::subscribe); + + // Exposing the consumer metrics. + exposeConsumerMetrics(consumer); + + return consumer; + } + + private void exposeConsumerMetrics(Consumer<byte[]> consumer) { + if (sourceConfiguration.isEnableMetrics()) { + String consumerIdentity = consumer.getConsumerName(); + if (Strings.isNullOrEmpty(consumerIdentity)) { + consumerIdentity = UUID.randomUUID().toString(); + } + + MetricGroup group = + metricGroup + .addGroup(PULSAR_CONSUMER_METRIC_NAME) + .addGroup(consumer.getTopic()) + .addGroup(consumerIdentity); + ConsumerStats stats = consumer.getStats(); + + group.gauge(NUM_MSGS_RECEIVED, stats::getNumMsgsReceived); + group.gauge(NUM_BYTES_RECEIVED, stats::getNumBytesReceived); + group.gauge(RATE_MSGS_RECEIVED, stats::getRateMsgsReceived); + group.gauge(RATE_BYTES_RECEIVED, stats::getRateBytesReceived); + group.gauge(NUM_ACKS_SENT, stats::getNumAcksSent); + group.gauge(NUM_ACKS_FAILED, stats::getNumAcksFailed); + group.gauge(NUM_RECEIVE_FAILED, stats::getNumReceiveFailed); + group.gauge(NUM_BATCH_RECEIVE_FAILED, stats::getNumBatchReceiveFailed); + group.gauge(TOTAL_MSGS_RECEIVED, stats::getTotalMsgsReceived); + group.gauge(TOTAL_BYTES_RECEIVED, stats::getTotalBytesReceived); + group.gauge(TOTAL_RECEIVED_FAILED, stats::getTotalReceivedFailed); + group.gauge(TOTAL_BATCH_RECEIVED_FAILED, stats::getTotaBatchReceivedFailed); + group.gauge(TOTAL_ACKS_SENT, stats::getTotalAcksSent); + group.gauge(TOTAL_ACKS_FAILED, stats::getTotalAcksFailed); + group.gauge(MSG_NUM_IN_RECEIVER_QUEUE, stats::getMsgNumInReceiverQueue); + } } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java index f095c12..72e7c6f 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; @@ -65,8 +66,9 @@ public class PulsarUnorderedPartitionSplitReader extends PulsarPartitionSplitRea PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, + SourceReaderMetricGroup metricGroup, TransactionCoordinatorClient coordinatorClient) { - super(pulsarClient, pulsarAdmin, sourceConfiguration); + super(pulsarClient, pulsarAdmin, sourceConfiguration, metricGroup); this.coordinatorClient = coordinatorClient; } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java index 0e5a173..690d7ff 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java @@ -26,12 +26,12 @@ import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; import org.apache.flink.connector.pulsar.testutils.function.ControlSource; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext; -import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase; import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; @@ -60,7 +60,7 @@ class PulsarSinkITCase { /** Integration test based on connector testing framework. */ @Nested - class IntegrationTest extends PulsarSinkTestSuiteBase { + class IntegrationTest extends SinkTestSuiteBase<String> { @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java index 2e36bfb..f0eb67e 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java @@ -35,6 +35,7 @@ import java.util.List; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE; +import static org.apache.flink.metrics.groups.UnregisteredMetricsGroup.createSinkWriterMetricGroup; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -49,7 +50,8 @@ class TopicProducerRegisterTest extends PulsarTestSuiteBase { operator().createTopic(topic, 8); SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee); - TopicProducerRegister register = new TopicProducerRegister(configuration); + TopicProducerRegister register = + new TopicProducerRegister(configuration, createSinkWriterMetricGroup()); String message = randomAlphabetic(10); register.createMessageBuilder(topic, Schema.STRING).value(message).send(); @@ -76,7 +78,8 @@ class TopicProducerRegisterTest extends PulsarTestSuiteBase { operator().createTopic(topic, 8); SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee); - TopicProducerRegister register = new TopicProducerRegister(configuration); + TopicProducerRegister register = + new TopicProducerRegister(configuration, createSinkWriterMetricGroup()); String message = randomAlphabetic(10); register.createMessageBuilder(topic, Schema.STRING).value(message).sendAsync(); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java index 0aee463..be099e7 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java @@ -42,6 +42,7 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.metrics.groups.UnregisteredMetricsGroup.createSourceReaderMetricGroup; import static org.assertj.core.api.Assertions.assertThat; /** Test different implementation of StopCursor. */ @@ -54,7 +55,10 @@ class StopCursorTest extends PulsarTestSuiteBase { PulsarOrderedPartitionSplitReader splitReader = new PulsarOrderedPartitionSplitReader( - operator().client(), operator().admin(), sourceConfig()); + operator().client(), + operator().admin(), + sourceConfig(), + createSourceReaderMetricGroup()); // send the first message and set the stopCursor to filter any late stopCursor operator() .sendMessage( diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java index 4631798..635b0a3 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java @@ -70,6 +70,7 @@ import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlin import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS; import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; +import static org.apache.flink.metrics.groups.UnregisteredMetricsGroup.createSourceReaderMetricGroup; import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.pulsar.client.api.Schema.STRING; @@ -341,10 +342,17 @@ abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase { private PulsarPartitionSplitReaderBase splitReader(SubscriptionType subscriptionType) { if (subscriptionType == SubscriptionType.Failover) { return new PulsarOrderedPartitionSplitReader( - operator().client(), operator().admin(), sourceConfig()); + operator().client(), + operator().admin(), + sourceConfig(), + createSourceReaderMetricGroup()); } else { return new PulsarUnorderedPartitionSplitReader( - operator().client(), operator().admin(), sourceConfig(), null); + operator().client(), + operator().admin(), + sourceConfig(), + createSourceReaderMetricGroup(), + null); } } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java deleted file mode 100644 index 0695a43..0000000 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.pulsar.testutils.sink; - -import org.apache.flink.connector.testframe.environment.TestEnvironment; -import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext; -import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; -import org.apache.flink.streaming.api.CheckpointingMode; - -import org.junit.jupiter.api.Disabled; - -/** Pulsar sink don't expose the monitor metrics now. We have to disable this test. */ -public abstract class PulsarSinkTestSuiteBase extends SinkTestSuiteBase<String> { - - @Override - @Disabled("Enable this test after FLINK-26027 being merged.") - public void testMetrics( - TestEnvironment testEnv, - DataStreamSinkExternalContext<String> externalContext, - CheckpointingMode semantic) {} -}