This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit d8abb2a4cf2bfc692f2386e62220520680193381
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Fri Nov 11 21:10:11 2022 +0800

    [FLINK-26027][Connector/Pulsar] Expose Pulsar producer metrics and add 
FLIP-33 sink metrics. (#21249)
---
 .../pulsar/common/metrics/MetricNames.java         | 63 ++++++++++++++++
 .../common/metrics/ProducerMetricsInterceptor.java | 67 +++++++++++++++++
 .../connector/pulsar/sink/PulsarSinkOptions.java   | 14 ++++
 .../pulsar/sink/config/SinkConfiguration.java      | 16 +++-
 .../connector/pulsar/sink/writer/PulsarWriter.java |  6 +-
 .../sink/writer/topic/TopicProducerRegister.java   | 86 +++++++++++++++++++++-
 .../pulsar/source/PulsarSourceOptions.java         | 14 ++++
 .../pulsar/source/config/SourceConfiguration.java  | 16 +++-
 .../source/enumerator/PulsarSourceEnumerator.java  |  8 ++
 .../source/enumerator/assigner/SplitAssigner.java  |  3 +
 .../enumerator/assigner/SplitAssignerBase.java     |  5 ++
 .../source/reader/PulsarSourceReaderFactory.java   |  6 +-
 .../split/PulsarOrderedPartitionSplitReader.java   |  6 +-
 .../split/PulsarPartitionSplitReaderBase.java      | 66 ++++++++++++++++-
 .../split/PulsarUnorderedPartitionSplitReader.java |  4 +-
 .../connector/pulsar/sink/PulsarSinkITCase.java    |  4 +-
 .../writer/topic/TopicProducerRegisterTest.java    |  7 +-
 .../source/enumerator/cursor/StopCursorTest.java   |  6 +-
 .../split/PulsarPartitionSplitReaderTestBase.java  | 12 ++-
 .../testutils/sink/PulsarSinkTestSuiteBase.java    | 37 ----------
 20 files changed, 389 insertions(+), 57 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java
new file mode 100644
index 0000000..5156b47
--- /dev/null
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.metrics;
+
+/** The constant class for holding all the custom metrics names in Pulsar. */
+public final class MetricNames {
+
+    private MetricNames() {
+        // No public constructor.
+    }
+
+    public static final String PULSAR_PRODUCER_METRIC_NAME = "PulsarProducer";
+    public static final String NUM_MSGS_SENT = "numMsgsSent";
+    public static final String NUM_BYTES_SENT = "numBytesSent";
+    public static final String NUM_SEND_FAILED = "numSendFailed";
+    public static final String NUM_ACKS_RECEIVED = "numAcksReceived";
+    public static final String SEND_MSGS_RATE = "sendMsgsRate";
+    public static final String SEND_BYTES_RATE = "sendBytesRate";
+    public static final String SEND_LATENCY_MILLIS_50_PCT = 
"sendLatencyMillis50pct";
+    public static final String SEND_LATENCY_MILLIS_75_PCT = 
"sendLatencyMillis75pct";
+    public static final String SEND_LATENCY_MILLIS_95_PCT = 
"sendLatencyMillis95pct";
+    public static final String SEND_LATENCY_MILLIS_99_PCT = 
"sendLatencyMillis99pct";
+    public static final String SEND_LATENCY_MILLIS_999_PCT = 
"sendLatencyMillis999pct";
+    public static final String SEND_LATENCY_MILLIS_MAX = 
"sendLatencyMillisMax";
+    public static final String TOTAL_MSGS_SENT = "totalMsgsSent";
+    public static final String TOTAL_BYTES_SENT = "totalBytesSent";
+    public static final String TOTAL_SEND_FAILED = "totalSendFailed";
+    public static final String TOTAL_ACKS_RECEIVED = "totalAcksReceived";
+    public static final String PENDING_QUEUE_SIZE = "pendingQueueSize";
+
+    public static final String PULSAR_CONSUMER_METRIC_NAME = "PulsarConsumer";
+    public static final String NUM_MSGS_RECEIVED = "numMsgsReceived";
+    public static final String NUM_BYTES_RECEIVED = "numBytesReceived";
+    public static final String RATE_MSGS_RECEIVED = "rateMsgsReceived";
+    public static final String RATE_BYTES_RECEIVED = "rateBytesReceived";
+    public static final String NUM_ACKS_SENT = "numAcksSent";
+    public static final String NUM_ACKS_FAILED = "numAcksFailed";
+    public static final String NUM_RECEIVE_FAILED = "numReceiveFailed";
+    public static final String NUM_BATCH_RECEIVE_FAILED = 
"numBatchReceiveFailed";
+    public static final String TOTAL_MSGS_RECEIVED = "totalMsgsReceived";
+    public static final String TOTAL_BYTES_RECEIVED = "totalBytesReceived";
+    public static final String TOTAL_RECEIVED_FAILED = "totalReceivedFailed";
+    public static final String TOTAL_BATCH_RECEIVED_FAILED = 
"totalBatchReceivedFailed";
+    public static final String TOTAL_ACKS_SENT = "totalAcksSent";
+    public static final String TOTAL_ACKS_FAILED = "totalAcksFailed";
+    public static final String MSG_NUM_IN_RECEIVER_QUEUE = 
"msgNumInReceiverQueue";
+}
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java
new file mode 100644
index 0000000..e2b9915
--- /dev/null
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
+
+/** The metric statistic for Pulsar's {@link Producer}. */
+public class ProducerMetricsInterceptor implements ProducerInterceptor {
+
+    private final Counter numRecordsOutErrors;
+    private final Counter numRecordsOutCounter;
+    private final Counter numBytesOutCounter;
+
+    public ProducerMetricsInterceptor(SinkWriterMetricGroup metricGroup) {
+        this.numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
+        this.numRecordsOutCounter = 
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        this.numBytesOutCounter = 
metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do by default.
+    }
+
+    @Override
+    public boolean eligible(Message message) {
+        return true;
+    }
+
+    @Override
+    public Message beforeSend(Producer producer, Message message) {
+        return message;
+    }
+
+    @Override
+    public void onSendAcknowledgement(
+            Producer producer, Message message, MessageId msgId, Throwable 
exception) {
+        if (exception != null) {
+            numRecordsOutErrors.inc(1);
+        } else {
+            numRecordsOutCounter.inc(1);
+            numBytesOutCounter.inc(message.size());
+        }
+    }
+}
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index 0433bb0..39b5f73 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -38,6 +38,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX;
 import static 
org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash.MURMUR3_32_HASH;
@@ -139,6 +140,19 @@ public final class PulsarSinkOptions {
                                             "The maximum number of pending 
messages in one sink parallelism.")
                                     .build());
 
+    public static final ConfigOption<Boolean> PULSAR_ENABLE_SINK_METRICS =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "enableMetrics")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The metrics from Pulsar Producer 
are only exposed if you enable this option.")
+                                    .text(
+                                            "You should set the %s to a 
positive value if you enable this option.",
+                                            
code(PULSAR_STATS_INTERVAL_SECONDS.key()))
+                                    .build());
+
     
///////////////////////////////////////////////////////////////////////////////
     //
     // The configuration for ProducerConfigurationData part.
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
index d6a6ee3..0a56ff3 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -31,7 +31,9 @@ import org.apache.pulsar.client.api.Schema;
 
 import java.util.Objects;
 
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_ENABLE_SINK_METRICS;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
@@ -51,6 +53,7 @@ public class SinkConfiguration extends PulsarConfiguration {
     private final MessageKeyHash messageKeyHash;
     private final boolean enableSchemaEvolution;
     private final int maxRecommitTimes;
+    private final boolean enableMetrics;
 
     public SinkConfiguration(Configuration configuration) {
         super(configuration);
@@ -62,6 +65,8 @@ public class SinkConfiguration extends PulsarConfiguration {
         this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
         this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
         this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
+        this.enableMetrics =
+                get(PULSAR_ENABLE_SINK_METRICS) && 
get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
     }
 
     /** The delivery guarantee changes the behavior of {@link PulsarWriter}. */
@@ -113,6 +118,11 @@ public class SinkConfiguration extends PulsarConfiguration 
{
         return maxRecommitTimes;
     }
 
+    /** Whether to expose the metrics from Pulsar Producer. */
+    public boolean isEnableMetrics() {
+        return enableMetrics;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -130,7 +140,8 @@ public class SinkConfiguration extends PulsarConfiguration {
                 && partitionSwitchSize == that.partitionSwitchSize
                 && enableSchemaEvolution == that.enableSchemaEvolution
                 && messageKeyHash == that.messageKeyHash
-                && maxRecommitTimes == that.maxRecommitTimes;
+                && maxRecommitTimes == that.maxRecommitTimes
+                && enableMetrics == that.enableMetrics;
     }
 
     @Override
@@ -142,6 +153,7 @@ public class SinkConfiguration extends PulsarConfiguration {
                 partitionSwitchSize,
                 messageKeyHash,
                 enableSchemaEvolution,
-                maxRecommitTimes);
+                maxRecommitTimes,
+                enableMetrics);
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index e30b593..c5c4622 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -72,7 +72,7 @@ public class PulsarWriter<IN> implements 
PrecommittingSinkWriter<IN, PulsarCommi
     private final PulsarSinkContext sinkContext;
     private final TopicProducerRegister producerRegister;
     private final MailboxExecutor mailboxExecutor;
-    private final AtomicLong pendingMessages = new AtomicLong(0);
+    private final AtomicLong pendingMessages;
 
     /**
      * Constructor creating a Pulsar writer.
@@ -122,8 +122,10 @@ public class PulsarWriter<IN> implements 
PrecommittingSinkWriter<IN, PulsarCommi
         }
 
         // Create this producer register after opening serialization schema!
-        this.producerRegister = new TopicProducerRegister(sinkConfiguration);
+        this.producerRegister =
+                new TopicProducerRegister(sinkConfiguration, 
initContext.metricGroup());
         this.mailboxExecutor = initContext.getMailboxExecutor();
+        this.pendingMessages = new AtomicLong(0);
     }
 
     @Override
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
index 9bb1753..cf8ff00 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
@@ -20,14 +20,18 @@ package org.apache.flink.connector.pulsar.sink.writer.topic;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.pulsar.common.metrics.ProducerMetricsInterceptor;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
 
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerStats;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -36,6 +40,7 @@ import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.shade.com.google.common.base.Strings;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -44,8 +49,27 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_ACKS_RECEIVED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_BYTES_SENT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_MSGS_SENT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_SEND_FAILED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.PENDING_QUEUE_SIZE;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.PULSAR_PRODUCER_METRIC_NAME;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_BYTES_RATE;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_50_PCT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_75_PCT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_95_PCT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_999_PCT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_99_PCT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_LATENCY_MILLIS_MAX;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.SEND_MSGS_RATE;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_ACKS_RECEIVED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_BYTES_SENT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_MSGS_SENT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_SEND_FAILED;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
 import static 
org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder;
@@ -60,14 +84,21 @@ public class TopicProducerRegister implements Closeable {
 
     private final PulsarClient pulsarClient;
     private final SinkConfiguration sinkConfiguration;
+    private final SinkWriterMetricGroup metricGroup;
     private final Map<String, Map<SchemaInfo, Producer<?>>> producerRegister;
     private final Map<String, Transaction> transactionRegister;
 
-    public TopicProducerRegister(SinkConfiguration sinkConfiguration) {
+    public TopicProducerRegister(
+            SinkConfiguration sinkConfiguration, SinkWriterMetricGroup 
metricGroup) {
         this.pulsarClient = createClient(sinkConfiguration);
         this.sinkConfiguration = sinkConfiguration;
+        this.metricGroup = metricGroup;
         this.producerRegister = new HashMap<>();
         this.transactionRegister = new HashMap<>();
+
+        if (sinkConfiguration.isEnableMetrics()) {
+            metricGroup.setCurrentSendTimeGauge(this::currentSendTimeGauge);
+        }
     }
 
     /**
@@ -150,7 +181,13 @@ public class TopicProducerRegister implements Closeable {
                     createProducerBuilder(pulsarClient, schema, 
sinkConfiguration);
             // Set the required topic name.
             builder.topic(topic);
+            // Set the sending counter for metrics.
+            builder.intercept(new ProducerMetricsInterceptor(metricGroup));
+
             Producer<T> producer = sneakyClient(builder::create);
+
+            // Expose the stats for calculating and monitoring.
+            exposeProducerMetrics(producer);
             producers.put(schemaInfo, producer);
 
             return producer;
@@ -199,4 +236,51 @@ public class TopicProducerRegister implements Closeable {
     private void clearTransactions() {
         transactionRegister.clear();
     }
+
+    private Long currentSendTimeGauge() {
+        double sendTime =
+                producerRegister.values().stream()
+                        .flatMap(v -> v.values().stream())
+                        .map(Producer::getStats)
+                        .mapToDouble(ProducerStats::getSendLatencyMillis50pct)
+                        .average()
+                        .orElse(Long.MAX_VALUE);
+
+        return Math.round(sendTime);
+    }
+
+    private void exposeProducerMetrics(Producer<?> producer) {
+        if (sinkConfiguration.isEnableMetrics()) {
+            String producerIdentity = producer.getProducerName();
+            if (Strings.isNullOrEmpty(producerIdentity)) {
+                // Fallback to use the topic name.
+                producerIdentity = UUID.randomUUID().toString();
+            }
+
+            MetricGroup group =
+                    metricGroup
+                            .addGroup(PULSAR_PRODUCER_METRIC_NAME)
+                            .addGroup(producer.getTopic())
+                            .addGroup(producerIdentity);
+            ProducerStats stats = producer.getStats();
+
+            group.gauge(NUM_MSGS_SENT, stats::getNumMsgsSent);
+            group.gauge(NUM_BYTES_SENT, stats::getNumBytesSent);
+            group.gauge(NUM_SEND_FAILED, stats::getNumSendFailed);
+            group.gauge(NUM_ACKS_RECEIVED, stats::getNumAcksReceived);
+            group.gauge(SEND_MSGS_RATE, stats::getSendMsgsRate);
+            group.gauge(SEND_BYTES_RATE, stats::getSendBytesRate);
+            group.gauge(SEND_LATENCY_MILLIS_50_PCT, 
stats::getSendLatencyMillis50pct);
+            group.gauge(SEND_LATENCY_MILLIS_75_PCT, 
stats::getSendLatencyMillis75pct);
+            group.gauge(SEND_LATENCY_MILLIS_95_PCT, 
stats::getSendLatencyMillis95pct);
+            group.gauge(SEND_LATENCY_MILLIS_99_PCT, 
stats::getSendLatencyMillis99pct);
+            group.gauge(SEND_LATENCY_MILLIS_999_PCT, 
stats::getSendLatencyMillis999pct);
+            group.gauge(SEND_LATENCY_MILLIS_MAX, 
stats::getSendLatencyMillisMax);
+            group.gauge(TOTAL_MSGS_SENT, stats::getTotalMsgsSent);
+            group.gauge(TOTAL_BYTES_SENT, stats::getTotalBytesSent);
+            group.gauge(TOTAL_SEND_FAILED, stats::getTotalSendFailed);
+            group.gauge(TOTAL_ACKS_RECEIVED, stats::getTotalAcksReceived);
+            group.gauge(PENDING_QUEUE_SIZE, stats::getPendingQueueSize);
+        }
+    }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index d0ef6de..8c2ab6b 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import static java.util.Collections.emptyMap;
 import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.configuration.description.TextElement.text;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.CONSUMER_CONFIG_PREFIX;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.SOURCE_CONFIG_PREFIX;
 
@@ -204,6 +205,19 @@ public final class PulsarSourceOptions {
                                             "In this case, a single consumer 
will still receive all the keys, but they may be coming in different orders.")
                                     .build());
 
+    public static final ConfigOption<Boolean> PULSAR_ENABLE_SOURCE_METRICS =
+            ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableMetrics")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The metrics from Pulsar Consumer 
are only exposed if you enable this option.")
+                                    .text(
+                                            "You should set the %s to a 
positive value if you enable this option.",
+                                            
code(PULSAR_STATS_INTERVAL_SECONDS.key()))
+                                    .build());
+
     
///////////////////////////////////////////////////////////////////////////////
     //
     // The configuration for ConsumerConfigurationData part.
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index f9e8a65..5a52a59 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -34,9 +34,11 @@ import java.time.Duration;
 import java.util.Objects;
 
 import static 
org.apache.flink.connector.base.source.reader.SourceReaderOptions.ELEMENT_QUEUE_CAPACITY;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
@@ -63,6 +65,7 @@ public class SourceConfiguration extends PulsarConfiguration {
     private final SubscriptionType subscriptionType;
     private final SubscriptionMode subscriptionMode;
     private final boolean allowKeySharedOutOfOrderDelivery;
+    private final boolean enableMetrics;
 
     public SourceConfiguration(Configuration configuration) {
         super(configuration);
@@ -79,6 +82,8 @@ public class SourceConfiguration extends PulsarConfiguration {
         this.subscriptionType = get(PULSAR_SUBSCRIPTION_TYPE);
         this.subscriptionMode = get(PULSAR_SUBSCRIPTION_MODE);
         this.allowKeySharedOutOfOrderDelivery = 
get(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY);
+        this.enableMetrics =
+                get(PULSAR_ENABLE_SOURCE_METRICS) && 
get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
     }
 
     /** The capacity of the element queue in the source reader. */
@@ -191,6 +196,11 @@ public class SourceConfiguration extends 
PulsarConfiguration {
         return allowKeySharedOutOfOrderDelivery;
     }
 
+    /** Whether to expose the metrics from Pulsar Consumer. */
+    public boolean isEnableMetrics() {
+        return enableMetrics;
+    }
+
     /** Convert the subscription into a readable str. */
     public String getSubscriptionDesc() {
         return getSubscriptionName()
@@ -223,7 +233,8 @@ public class SourceConfiguration extends 
PulsarConfiguration {
                 && Objects.equals(subscriptionName, that.subscriptionName)
                 && subscriptionType == that.subscriptionType
                 && subscriptionMode == that.subscriptionMode
-                && allowKeySharedOutOfOrderDelivery == 
that.allowKeySharedOutOfOrderDelivery;
+                && allowKeySharedOutOfOrderDelivery == 
that.allowKeySharedOutOfOrderDelivery
+                && enableMetrics == that.enableMetrics;
     }
 
     @Override
@@ -240,6 +251,7 @@ public class SourceConfiguration extends 
PulsarConfiguration {
                 subscriptionName,
                 subscriptionType,
                 subscriptionMode,
-                allowKeySharedOutOfOrderDelivery);
+                allowKeySharedOutOfOrderDelivery,
+                enableMetrics);
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 931dc6a..8dbead2 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubs
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -63,6 +64,7 @@ public class PulsarSourceEnumerator
     private final SourceConfiguration sourceConfiguration;
     private final SplitEnumeratorContext<PulsarPartitionSplit> context;
     private final SplitAssigner splitAssigner;
+    private final SplitEnumeratorMetricGroup metricGroup;
 
     public PulsarSourceEnumerator(
             PulsarSubscriber subscriber,
@@ -96,12 +98,18 @@ public class PulsarSourceEnumerator
         this.sourceConfiguration = sourceConfiguration;
         this.context = context;
         this.splitAssigner = createAssigner(stopCursor, sourceConfiguration, 
context, enumState);
+        this.metricGroup = context.metricGroup();
     }
 
     @Override
     public void start() {
         rangeGenerator.open(sourceConfiguration);
 
+        // Expose the split assignment metrics if Flink has supported.
+        if (metricGroup != null) {
+            
metricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount);
+        }
+
         // Check the pulsar topic information and convert it into source split.
         if (sourceConfiguration.isEnablePartitionDiscovery()) {
             LOG.info(
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
index c734389..87faa28 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
@@ -60,4 +60,7 @@ public interface SplitAssigner {
 
     /** Snapshot the current assign state into checkpoint. */
     PulsarSourceEnumState snapshotState();
+
+    /** Expose this for standard flink metrics. */
+    long getUnassignedSplitCount();
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
index 733072c..fedb1aa 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
@@ -94,6 +94,11 @@ abstract class SplitAssignerBase implements SplitAssigner {
         return new PulsarSourceEnumState(appendedPartitions);
     }
 
+    @Override
+    public long getUnassignedSplitCount() {
+        return 
pendingPartitionSplits.values().stream().mapToLong(Set::size).sum();
+    }
+
     /** Add split to pending lists. */
     protected void addSplitToPendingList(int readerId, PulsarPartitionSplit 
split) {
         Set<PulsarPartitionSplit> splits =
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
index 2e83ab5..5d37614 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
@@ -84,7 +84,10 @@ public final class PulsarSourceReaderFactory {
             Supplier<PulsarOrderedPartitionSplitReader> splitReaderSupplier =
                     () ->
                             new PulsarOrderedPartitionSplitReader(
-                                    pulsarClient, pulsarAdmin, 
sourceConfiguration);
+                                    pulsarClient,
+                                    pulsarAdmin,
+                                    sourceConfiguration,
+                                    readerContext.metricGroup());
 
             return new PulsarOrderedSourceReader<>(
                     elementsQueue,
@@ -109,6 +112,7 @@ public final class PulsarSourceReaderFactory {
                                     pulsarClient,
                                     pulsarAdmin,
                                     sourceConfiguration,
+                                    readerContext.metricGroup(),
                                     coordinatorClient);
 
             return new PulsarUnorderedSourceReader<>(
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
index 6b02e0f..8eb7b29 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -53,8 +54,9 @@ public class PulsarOrderedPartitionSplitReader extends 
PulsarPartitionSplitReade
     public PulsarOrderedPartitionSplitReader(
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
-            SourceConfiguration sourceConfiguration) {
-        super(pulsarClient, pulsarAdmin, sourceConfiguration);
+            SourceConfiguration sourceConfiguration,
+            SourceReaderMetricGroup metricGroup) {
+        super(pulsarClient, pulsarAdmin, sourceConfiguration, metricGroup);
     }
 
     @Override
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index 08e9977..fa36ee9 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -29,11 +29,16 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
+
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -49,9 +54,26 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.MSG_NUM_IN_RECEIVER_QUEUE;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_ACKS_FAILED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_ACKS_SENT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_BATCH_RECEIVE_FAILED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_BYTES_RECEIVED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_MSGS_RECEIVED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.NUM_RECEIVE_FAILED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.PULSAR_CONSUMER_METRIC_NAME;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.RATE_BYTES_RECEIVED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.RATE_MSGS_RECEIVED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_ACKS_FAILED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_ACKS_SENT;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_BATCH_RECEIVED_FAILED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_BYTES_RECEIVED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_MSGS_RECEIVED;
+import static 
org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_RECEIVED_FAILED;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
@@ -65,6 +87,7 @@ abstract class PulsarPartitionSplitReaderBase
     protected final PulsarClient pulsarClient;
     protected final PulsarAdmin pulsarAdmin;
     protected final SourceConfiguration sourceConfiguration;
+    protected final SourceReaderMetricGroup metricGroup;
 
     protected Consumer<byte[]> pulsarConsumer;
     protected PulsarPartitionSplit registeredSplit;
@@ -72,10 +95,12 @@ abstract class PulsarPartitionSplitReaderBase
     protected PulsarPartitionSplitReaderBase(
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
-            SourceConfiguration sourceConfiguration) {
+            SourceConfiguration sourceConfiguration,
+            SourceReaderMetricGroup metricGroup) {
         this.pulsarClient = pulsarClient;
         this.pulsarAdmin = pulsarAdmin;
         this.sourceConfiguration = sourceConfiguration;
+        this.metricGroup = metricGroup;
     }
 
     @Override
@@ -239,6 +264,43 @@ abstract class PulsarPartitionSplitReaderBase
         }
 
         // Create the consumer configuration by using common utils.
-        return sneakyClient(consumerBuilder::subscribe);
+        Consumer<byte[]> consumer = sneakyClient(consumerBuilder::subscribe);
+
+        // Exposing the consumer metrics.
+        exposeConsumerMetrics(consumer);
+
+        return consumer;
+    }
+
+    private void exposeConsumerMetrics(Consumer<byte[]> consumer) {
+        if (sourceConfiguration.isEnableMetrics()) {
+            String consumerIdentity = consumer.getConsumerName();
+            if (Strings.isNullOrEmpty(consumerIdentity)) {
+                consumerIdentity = UUID.randomUUID().toString();
+            }
+
+            MetricGroup group =
+                    metricGroup
+                            .addGroup(PULSAR_CONSUMER_METRIC_NAME)
+                            .addGroup(consumer.getTopic())
+                            .addGroup(consumerIdentity);
+            ConsumerStats stats = consumer.getStats();
+
+            group.gauge(NUM_MSGS_RECEIVED, stats::getNumMsgsReceived);
+            group.gauge(NUM_BYTES_RECEIVED, stats::getNumBytesReceived);
+            group.gauge(RATE_MSGS_RECEIVED, stats::getRateMsgsReceived);
+            group.gauge(RATE_BYTES_RECEIVED, stats::getRateBytesReceived);
+            group.gauge(NUM_ACKS_SENT, stats::getNumAcksSent);
+            group.gauge(NUM_ACKS_FAILED, stats::getNumAcksFailed);
+            group.gauge(NUM_RECEIVE_FAILED, stats::getNumReceiveFailed);
+            group.gauge(NUM_BATCH_RECEIVE_FAILED, 
stats::getNumBatchReceiveFailed);
+            group.gauge(TOTAL_MSGS_RECEIVED, stats::getTotalMsgsReceived);
+            group.gauge(TOTAL_BYTES_RECEIVED, stats::getTotalBytesReceived);
+            group.gauge(TOTAL_RECEIVED_FAILED, stats::getTotalReceivedFailed);
+            group.gauge(TOTAL_BATCH_RECEIVED_FAILED, 
stats::getTotaBatchReceivedFailed);
+            group.gauge(TOTAL_ACKS_SENT, stats::getTotalAcksSent);
+            group.gauge(TOTAL_ACKS_FAILED, stats::getTotalAcksFailed);
+            group.gauge(MSG_NUM_IN_RECEIVER_QUEUE, 
stats::getMsgNumInReceiverQueue);
+        }
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index f095c12..72e7c6f 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
@@ -65,8 +66,9 @@ public class PulsarUnorderedPartitionSplitReader extends 
PulsarPartitionSplitRea
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
             SourceConfiguration sourceConfiguration,
+            SourceReaderMetricGroup metricGroup,
             TransactionCoordinatorClient coordinatorClient) {
-        super(pulsarClient, pulsarAdmin, sourceConfiguration);
+        super(pulsarClient, pulsarAdmin, sourceConfiguration, metricGroup);
 
         this.coordinatorClient = coordinatorClient;
     }
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 0e5a173..690d7ff 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -26,12 +26,12 @@ import 
org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import org.apache.flink.connector.pulsar.testutils.function.ControlSource;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
-import 
org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase;
 import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
 import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
@@ -60,7 +60,7 @@ class PulsarSinkITCase {
 
     /** Integration test based on connector testing framework. */
     @Nested
-    class IntegrationTest extends PulsarSinkTestSuiteBase {
+    class IntegrationTest extends SinkTestSuiteBase<String> {
 
         @TestEnv MiniClusterTestEnvironment flink = new 
MiniClusterTestEnvironment();
 
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
index 2e36bfb..f0eb67e 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
@@ -35,6 +35,7 @@ import java.util.List;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup.createSinkWriterMetricGroup;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -49,7 +50,8 @@ class TopicProducerRegisterTest extends PulsarTestSuiteBase {
         operator().createTopic(topic, 8);
 
         SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee);
-        TopicProducerRegister register = new 
TopicProducerRegister(configuration);
+        TopicProducerRegister register =
+                new TopicProducerRegister(configuration, 
createSinkWriterMetricGroup());
 
         String message = randomAlphabetic(10);
         register.createMessageBuilder(topic, 
Schema.STRING).value(message).send();
@@ -76,7 +78,8 @@ class TopicProducerRegisterTest extends PulsarTestSuiteBase {
         operator().createTopic(topic, 8);
 
         SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee);
-        TopicProducerRegister register = new 
TopicProducerRegister(configuration);
+        TopicProducerRegister register =
+                new TopicProducerRegister(configuration, 
createSinkWriterMetricGroup());
 
         String message = randomAlphabetic(10);
         register.createMessageBuilder(topic, 
Schema.STRING).value(message).sendAsync();
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index 0aee463..be099e7 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -42,6 +42,7 @@ import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+import static 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup.createSourceReaderMetricGroup;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test different implementation of StopCursor. */
@@ -54,7 +55,10 @@ class StopCursorTest extends PulsarTestSuiteBase {
 
         PulsarOrderedPartitionSplitReader splitReader =
                 new PulsarOrderedPartitionSplitReader(
-                        operator().client(), operator().admin(), 
sourceConfig());
+                        operator().client(),
+                        operator().admin(),
+                        sourceConfig(),
+                        createSourceReaderMetricGroup());
         // send the first message and set the stopCursor to filter any late 
stopCursor
         operator()
                 .sendMessage(
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
index 4631798..635b0a3 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
@@ -70,6 +70,7 @@ import static 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlin
 import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
 import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
+import static 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup.createSourceReaderMetricGroup;
 import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.pulsar.client.api.Schema.STRING;
@@ -341,10 +342,17 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     private PulsarPartitionSplitReaderBase splitReader(SubscriptionType 
subscriptionType) {
         if (subscriptionType == SubscriptionType.Failover) {
             return new PulsarOrderedPartitionSplitReader(
-                    operator().client(), operator().admin(), sourceConfig());
+                    operator().client(),
+                    operator().admin(),
+                    sourceConfig(),
+                    createSourceReaderMetricGroup());
         } else {
             return new PulsarUnorderedPartitionSplitReader(
-                    operator().client(), operator().admin(), sourceConfig(), 
null);
+                    operator().client(),
+                    operator().admin(),
+                    sourceConfig(),
+                    createSourceReaderMetricGroup(),
+                    null);
         }
     }
 
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java
deleted file mode 100644
index 0695a43..0000000
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.sink;
-
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import 
org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
-import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
-import org.apache.flink.streaming.api.CheckpointingMode;
-
-import org.junit.jupiter.api.Disabled;
-
-/** Pulsar sink don't expose the monitor metrics now. We have to disable this 
test. */
-public abstract class PulsarSinkTestSuiteBase extends 
SinkTestSuiteBase<String> {
-
-    @Override
-    @Disabled("Enable this test after FLINK-26027 being merged.")
-    public void testMetrics(
-            TestEnvironment testEnv,
-            DataStreamSinkExternalContext<String> externalContext,
-            CheckpointingMode semantic) {}
-}

Reply via email to