This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new acac72ea03f [improve][broker][PIP-379] Add observability stats for 
"draining hashes" (#23429)
acac72ea03f is described below

commit acac72ea03f7c38cab99ec011b309d5e6bb4fe9d
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Oct 10 12:46:42 2024 +0300

    [improve][broker][PIP-379] Add observability stats for "draining hashes" 
(#23429)
---
 .../org/apache/pulsar/broker/service/Consumer.java |   7 +
 .../broker/service/DrainingHashesTracker.java      | 112 +++++++++
 ...istentStickyKeyDispatcherMultipleConsumers.java |   6 +-
 .../service/persistent/PersistentSubscription.java |  20 +-
 .../service/persistent/RescheduleReadHandler.java  |  14 ++
 .../org/apache/pulsar/broker/BrokerTestUtil.java   |  78 +++++-
 .../stats/AuthenticatedConsumerStatsTest.java      |  57 +----
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 276 +++++++++++++++++++--
 pulsar-broker/src/test/resources/log4j2.xml        |  11 +
 .../pulsar/common/policies/data/ConsumerStats.java |  48 +++-
 .../pulsar/common/policies/data/DrainingHash.java  |  41 +++
 .../common/policies/data/SubscriptionStats.java    |  24 ++
 .../policies/data/stats/ConsumerStatsImpl.java     |  43 +++-
 .../policies/data/stats/DrainingHashImpl.java      |  46 ++++
 .../policies/data/stats/SubscriptionStatsImpl.java |  22 ++
 .../pulsar/common/util/ObjectMapperFactory.java    |   3 +
 16 files changed, 734 insertions(+), 74 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index d25ebd0839d..bcd29d86490 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -174,6 +174,10 @@ public class Consumer {
     @Setter
     private volatile PendingAcksMap.PendingAcksRemoveHandler 
pendingAcksRemoveHandler;
 
+    @Getter
+    @Setter
+    private volatile java.util.function.BiConsumer<Consumer, 
ConsumerStatsImpl> drainingHashesConsumerStatsUpdater;
+
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     boolean isDurable, TransportCnx cnx, String appId,
@@ -976,6 +980,9 @@ public class Consumer {
         if (readPositionWhenJoining != null) {
             stats.readPositionWhenJoining = readPositionWhenJoining.toString();
         }
+        if (drainingHashesConsumerStatsUpdater != null) {
+            drainingHashesConsumerStatsUpdater.accept(this, stats);
+        }
         return stats;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
index 3521fa197a1..46762c844db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
@@ -20,8 +20,18 @@ package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.PrimitiveIterator;
+import java.util.concurrent.ConcurrentHashMap;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.policies.data.DrainingHash;
+import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl;
+import org.roaringbitmap.RoaringBitmap;
 
 /**
  * A thread-safe map to store draining hashes in the consumer.
@@ -34,6 +44,8 @@ public class DrainingHashesTracker {
     private final Int2ObjectOpenHashMap<DrainingHashEntry> drainingHashes = 
new Int2ObjectOpenHashMap<>();
     int batchLevel;
     boolean unblockedWhileBatching;
+    private final Map<ConsumerIdentityWrapper, ConsumerDrainingHashesStats> 
consumerDrainingHashesStatsMap =
+            new ConcurrentHashMap<>();
 
     /**
      * Represents an entry in the draining hashes tracker.
@@ -98,6 +110,52 @@ public class DrainingHashesTracker {
         }
     }
 
+    private class ConsumerDrainingHashesStats {
+        private final RoaringBitmap drainingHashes = new RoaringBitmap();
+        long drainingHashesClearedTotal;
+
+        public synchronized void addHash(int stickyHash) {
+            drainingHashes.add(stickyHash);
+        }
+
+        public synchronized boolean clearHash(int hash) {
+            drainingHashes.remove(hash);
+            drainingHashesClearedTotal++;
+            boolean empty = drainingHashes.isEmpty();
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Cleared hash {} in stats. empty={} 
totalCleared={} hashes={}",
+                        dispatcherName, hash, empty, 
drainingHashesClearedTotal, drainingHashes.getCardinality());
+            }
+            return empty;
+        }
+
+        public synchronized void updateConsumerStats(Consumer consumer, 
ConsumerStatsImpl consumerStats) {
+            int drainingHashesUnackedMessages = 0;
+            List<DrainingHash> drainingHashesStats = new ArrayList<>();
+            PrimitiveIterator.OfInt hashIterator = 
drainingHashes.stream().iterator();
+            while (hashIterator.hasNext()) {
+                int hash = hashIterator.nextInt();
+                DrainingHashEntry entry = getEntry(hash);
+                if (entry == null) {
+                    log.warn("[{}] Draining hash {} not found in the tracker 
for consumer {}", dispatcherName, hash,
+                            consumer);
+                    continue;
+                }
+                int unackedMessages = entry.refCount;
+                DrainingHashImpl drainingHash = new DrainingHashImpl();
+                drainingHash.hash = hash;
+                drainingHash.unackMsgs = unackedMessages;
+                drainingHash.blockedAttempts = entry.blockedCount;
+                drainingHashesStats.add(drainingHash);
+                drainingHashesUnackedMessages += unackedMessages;
+            }
+            consumerStats.drainingHashesCount = drainingHashesStats.size();
+            consumerStats.drainingHashesClearedTotal = 
drainingHashesClearedTotal;
+            consumerStats.drainingHashesUnackedMessages = 
drainingHashesUnackedMessages;
+            consumerStats.drainingHashes = drainingHashesStats;
+        }
+    }
+
     /**
      * Interface for handling the unblocking of sticky key hashes.
      */
@@ -127,13 +185,25 @@ public class DrainingHashesTracker {
         }
         DrainingHashEntry entry = drainingHashes.get(stickyHash);
         if (entry == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Adding and incrementing draining hash {} for 
consumer id:{} name:{}", dispatcherName,
+                        stickyHash, consumer.consumerId(), 
consumer.consumerName());
+            }
             entry = new DrainingHashEntry(consumer);
             drainingHashes.put(stickyHash, entry);
+            // update the consumer specific stats
+            consumerDrainingHashesStatsMap.computeIfAbsent(new 
ConsumerIdentityWrapper(consumer),
+                    k -> new 
ConsumerDrainingHashesStats()).addHash(stickyHash);
         } else if (entry.getConsumer() != consumer) {
             throw new IllegalStateException(
                     "Consumer " + entry.getConsumer() + " is already draining 
hash " + stickyHash
                             + " in dispatcher " + dispatcherName + ". Same 
hash being used for consumer " + consumer
                             + ".");
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Draining hash {} incrementing {} consumer 
id:{} name:{}", dispatcherName, stickyHash,
+                        entry.refCount + 1, consumer.consumerId(), 
consumer.consumerName());
+            }
         }
         entry.incrementRefCount();
     }
@@ -178,7 +248,17 @@ public class DrainingHashesTracker {
                             + ".");
         }
         if (entry.decrementRefCount()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Draining hash {} removing consumer id:{} 
name:{}", dispatcherName, stickyHash,
+                        consumer.consumerId(), consumer.consumerName());
+            }
             DrainingHashEntry removed = drainingHashes.remove(stickyHash);
+            // update the consumer specific stats
+            ConsumerDrainingHashesStats drainingHashesStats =
+                    consumerDrainingHashesStatsMap.get(new 
ConsumerIdentityWrapper(consumer));
+            if (drainingHashesStats != null) {
+                drainingHashesStats.clearHash(stickyHash);
+            }
             if (!closing && removed.isBlocking()) {
                 if (batchLevel > 0) {
                     unblockedWhileBatching = true;
@@ -186,6 +266,11 @@ public class DrainingHashesTracker {
                     unblockingHandler.stickyKeyHashUnblocked(stickyHash);
                 }
             }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Draining hash {} decrementing {} consumer 
id:{} name:{}", dispatcherName, stickyHash,
+                        entry.refCount, consumer.consumerId(), 
consumer.consumerName());
+            }
         }
     }
 
@@ -237,5 +322,32 @@ public class DrainingHashesTracker {
      */
     public synchronized void clear() {
         drainingHashes.clear();
+        consumerDrainingHashesStatsMap.clear();
+    }
+
+    /**
+     * Update the consumer specific stats to the target {@link 
ConsumerStatsImpl}.
+     *
+     * @param consumer the consumer
+     * @param consumerStats the consumer stats to update the values to
+     */
+    public void updateConsumerStats(Consumer consumer, ConsumerStatsImpl 
consumerStats) {
+        consumerStats.drainingHashesCount = 0;
+        consumerStats.drainingHashesClearedTotal = 0;
+        consumerStats.drainingHashesUnackedMessages = 0;
+        consumerStats.drainingHashes = Collections.emptyList();
+        ConsumerDrainingHashesStats consumerDrainingHashesStats =
+                consumerDrainingHashesStatsMap.get(new 
ConsumerIdentityWrapper(consumer));
+        if (consumerDrainingHashesStats != null) {
+            consumerDrainingHashesStats.updateConsumerStats(consumer, 
consumerStats);
+        }
+    }
+
+    /**
+     * Remove the consumer specific stats from the draining hashes tracker.
+     * @param consumer the consumer
+     */
+    public void consumerRemoved(Consumer consumer) {
+        consumerDrainingHashesStatsMap.remove(new 
ConsumerIdentityWrapper(consumer));
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index df053e6d8a5..1a3e2f706cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -157,6 +157,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                         drainingHashesTracker.endBatch();
                     }
                 });
+                
consumer.setDrainingHashesConsumerStatsUpdater(drainingHashesTracker::updateConsumerStats);
                 registerDrainingHashes(consumer, 
impactedConsumers.orElseThrow());
             }
         }).exceptionally(ex -> {
@@ -193,6 +194,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             // consumer to another. This will handle the case where a hash 
gets switched from an existing
             // consumer to another existing consumer during removal.
             registerDrainingHashes(consumer, impactedConsumers.orElseThrow());
+            drainingHashesTracker.consumerRemoved(consumer);
         }
     }
 
@@ -349,8 +351,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             return false;
         }
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Adding {}:{} to pending acks for consumer {} with 
sticky key hash {}",
-                    getName(), ledgerId, entryId, consumer, stickyKeyHash);
+            log.debug("[{}] Adding {}:{} to pending acks for consumer id:{} 
name:{} with sticky key hash {}",
+                    getName(), ledgerId, entryId, consumer.consumerId(), 
consumer.consumerName(), stickyKeyHash);
         }
         // allow adding the message to pending acks and sending the message to 
the consumer
         return true;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index eaa147b81b1..df1c23cbbcb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1253,11 +1253,23 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 subStats.lastConsumedTimestamp =
                         Math.max(subStats.lastConsumedTimestamp, 
consumerStats.lastConsumedTimestamp);
                 subStats.lastAckedTimestamp = 
Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
-                if (consumerKeyHashRanges != null && 
consumerKeyHashRanges.containsKey(consumer)) {
-                    consumerStats.keyHashRanges = 
consumerKeyHashRanges.get(consumer).stream()
-                            .map(Range::toString)
-                            .collect(Collectors.toList());
+                List<Range> keyRanges = consumerKeyHashRanges != null ? 
consumerKeyHashRanges.get(consumer) : null;
+                if (keyRanges != null) {
+                    if (((StickyKeyDispatcher) dispatcher).isClassic()) {
+                        // Use string representation for classic mode
+                        consumerStats.keyHashRanges = keyRanges.stream()
+                                .map(Range::toString)
+                                .collect(Collectors.toList());
+                    } else {
+                        // Use array representation for PIP-379 stats
+                        consumerStats.keyHashRangeArrays = keyRanges.stream()
+                                .map(range -> new int[]{range.getStart(), 
range.getEnd()})
+                                .collect(Collectors.toList());
+                    }
                 }
+                subStats.drainingHashesCount += 
consumerStats.drainingHashesCount;
+                subStats.drainingHashesClearedTotal += 
consumerStats.drainingHashesClearedTotal;
+                subStats.drainingHashesUnackedMessages += 
consumerStats.drainingHashesUnackedMessages;
             });
 
             subStats.filterProcessedMsgCount = 
dispatcher.getFilterProcessedMsgCount();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java
index 3554f292552..4812be58cdc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BooleanSupplier;
 import java.util.function.LongSupplier;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Reschedules reads so that the possible pending read is cancelled if it's 
waiting for more entries.
@@ -30,6 +31,7 @@ import java.util.function.LongSupplier;
  * that should be handled. This will also batch multiple calls together to 
reduce the number of
  * operations.
  */
+@Slf4j
 class RescheduleReadHandler {
     private static final int UNSET = -1;
     private static final int NO_PENDING_READ = 0;
@@ -70,15 +72,27 @@ class RescheduleReadHandler {
                 // are entries in the replay queue.
                 if (maxReadOpCount != NO_PENDING_READ && 
readOpCounterSupplier.getAsLong() == maxReadOpCount
                         && hasEntriesInReplayQueue.getAsBoolean()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Cancelling pending read request because 
it's waiting for more entries");
+                    }
                     cancelPendingRead.run();
                 }
                 // Re-schedule read immediately, or join the next scheduled 
read
+                if (log.isDebugEnabled()) {
+                    log.debug("Triggering read");
+                }
                 rescheduleReadImmediately.run();
             };
             long rescheduleDelay = readIntervalMsSupplier.getAsLong();
             if (rescheduleDelay > 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Scheduling after {} ms", rescheduleDelay);
+                }
                 executor.schedule(runnable, rescheduleDelay, 
TimeUnit.MILLISECONDS);
             } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Running immediately");
+                }
                 runnable.run();
             }
         } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index 7ed4542b250..6a41e86f893 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -21,10 +21,15 @@ package org.apache.pulsar.broker;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.StringWriter;
 import java.io.UncheckedIOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.UUID;
@@ -37,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.stream.Stream;
+import lombok.SneakyThrows;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -46,7 +52,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
-
 /**
  * Holds util methods used in test.
  */
@@ -136,6 +141,77 @@ public class BrokerTestUtil {
         }
     }
 
+    /**
+     * Logs the topic stats and internal stats for the given topic
+     * @param logger logger to use
+     * @param baseUrl Pulsar service URL
+     * @param topic topic name
+     */
+    public static void logTopicStats(Logger logger, String baseUrl, String 
topic) {
+        logTopicStats(logger, baseUrl, "public", "default", topic);
+    }
+
+    /**
+     * Logs the topic stats and internal stats for the given topic
+     * @param logger logger to use
+     * @param baseUrl Pulsar service URL
+     * @param tenant tenant name
+     * @param namespace namespace name
+     * @param topic topic name
+     */
+    public static void logTopicStats(Logger logger, String baseUrl, String 
tenant, String namespace, String topic) {
+        String topicStatsUri =
+                String.format("%s/admin/v2/persistent/%s/%s/%s/stats", 
baseUrl, tenant, namespace, topic);
+        logger.info("[{}] stats: {}", topic, 
jsonPrettyPrint(getJsonResourceAsString(topicStatsUri)));
+        String topicStatsInternalUri =
+                String.format("%s/admin/v2/persistent/%s/%s/%s/internalStats", 
baseUrl, tenant, namespace, topic);
+        logger.info("[{}] internalStats: {}", topic, 
jsonPrettyPrint(getJsonResourceAsString(topicStatsInternalUri)));
+    }
+
+    /**
+     * Pretty print the given JSON string
+     * @param jsonString JSON string to pretty print
+     * @return pretty printed JSON string
+     */
+    public static String jsonPrettyPrint(String jsonString) {
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+            Object json = mapper.readValue(jsonString, Object.class);
+            ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
+            return writer.writeValueAsString(json);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    /**
+     * Get the resource as a string from the given URI
+     */
+    @SneakyThrows
+    public static String getJsonResourceAsString(String uri) {
+        URL url = new URL(uri);
+        HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
+        connection.setRequestMethod("GET");
+        connection.setRequestProperty("Accept", "application/json");
+        try {
+            int responseCode = connection.getResponseCode();
+            if (responseCode == 200) {
+                try (BufferedReader in = new BufferedReader(new 
InputStreamReader(connection.getInputStream()))) {
+                    String inputLine;
+                    StringBuilder content = new StringBuilder();
+                    while ((inputLine = in.readLine()) != null) {
+                        content.append(inputLine);
+                    }
+                    return content.toString();
+                }
+            } else {
+                throw new IOException("Failed to get resource: " + uri + ", 
status: " + responseCode);
+            }
+        } finally {
+            connection.disconnect();
+        }
+    }
+
     /**
      * Receive messages concurrently from multiple consumers and handles them 
using the provided message handler.
      * The message handler should return true if it wants to continue 
receiving more messages, false otherwise.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
index e8cadb72e1e..20c1c5498ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
@@ -18,11 +18,19 @@
  */
 package org.apache.pulsar.broker.stats;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Sets;
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.SignatureAlgorithm;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -37,18 +45,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.security.KeyPair;
-import java.security.KeyPairGenerator;
-import java.security.NoSuchAlgorithmException;
-import java.security.PrivateKey;
-import java.time.Duration;
-import java.util.Base64;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-
 public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{
     private final String ADMIN_TOKEN;
     private final String TOKEN_PUBLIC_KEY;
@@ -115,32 +111,6 @@ public class AuthenticatedConsumerStatsTest extends 
ConsumerStatsTest{
 
     @Test
     public void testConsumerStatsOutput() throws Exception {
-        Set<String> allowedFields = Sets.newHashSet(
-                "msgRateOut",
-                "msgThroughputOut",
-                "bytesOutCounter",
-                "msgOutCounter",
-                "messageAckRate",
-                "msgRateRedeliver",
-                "chunkedMessageRate",
-                "consumerName",
-                "availablePermits",
-                "unackedMessages",
-                "avgMessagesPerEntry",
-                "blockedConsumerOnUnackedMsgs",
-                "readPositionWhenJoining",
-                "lastAckedTime",
-                "lastAckedTimestamp",
-                "lastConsumedTime",
-                "lastConsumedTimestamp",
-                "lastConsumedFlowTimestamp",
-                "keyHashRanges",
-                "metadata",
-                "address",
-                "connectedSince",
-                "clientVersion",
-                "appId");
-
         final String topicName = 
"persistent://public/default/testConsumerStatsOutput";
         final String subName = "my-subscription";
 
@@ -154,13 +124,6 @@ public class AuthenticatedConsumerStatsTest extends 
ConsumerStatsTest{
         ObjectMapper mapper = ObjectMapperFactory.create();
         ConsumerStats consumerStats = stats.getSubscriptions()
                 .get(subName).getConsumers().get(0);
-        Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
-        JsonNode node = 
mapper.readTree(mapper.writer().writeValueAsString(consumerStats));
-        Iterator<String> itr = node.fieldNames();
-        while (itr.hasNext()) {
-            String field = itr.next();
-            Assert.assertTrue(allowedFields.contains(field), field + " should 
not be exposed");
-        }
         // assert that role is exposed
         Assert.assertEquals(consumerStats.getAppId(), "admin");
         consumer.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 5b2998216e8..59a911500e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.InstanceOfAssertFactories.INTEGER;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.AssertJUnit.assertEquals;
@@ -31,22 +34,29 @@ import com.google.common.collect.Sets;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.PrometheusMetricsTestUtil;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.PendingAcksMap;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.StickyKeyDispatcher;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -67,13 +77,19 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.DrainingHash;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.assertj.core.groups.Tuple;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.SkipException;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -218,9 +234,24 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
         Assert.assertEquals(updatedStats.getBytesOutCounter(), 1280);
     }
 
-    @Test
-    public void testConsumerStatsOutput() throws Exception {
-        Set<String> allowedFields = Sets.newHashSet(
+    @DataProvider(name = "classicAndSubscriptionType")
+    public Object[][] classicAndSubscriptionType() {
+        return new Object[][]{
+                {false, SubscriptionType.Shared},
+                {true, SubscriptionType.Key_Shared},
+                {false, SubscriptionType.Key_Shared}
+        };
+    }
+
+    @Test(dataProvider = "classicAndSubscriptionType")
+    public void testConsumerStatsOutput(boolean classicDispatchers, 
SubscriptionType subscriptionType)
+            throws Exception {
+        if (this instanceof AuthenticatedConsumerStatsTest) {
+            throw new SkipException("Skip test for 
AuthenticatedConsumerStatsTest");
+        }
+        
conf.setSubscriptionSharedUseClassicPersistentImplementation(classicDispatchers);
+        
conf.setSubscriptionKeySharedUseClassicPersistentImplementation(classicDispatchers);
+        Set<String> expectedFields = Sets.newHashSet(
                 "msgRateOut",
                 "msgThroughputOut",
                 "bytesOutCounter",
@@ -233,21 +264,56 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
                 "unackedMessages",
                 "avgMessagesPerEntry",
                 "blockedConsumerOnUnackedMsgs",
-                "readPositionWhenJoining",
                 "lastAckedTime",
                 "lastAckedTimestamp",
                 "lastConsumedTime",
                 "lastConsumedTimestamp",
                 "lastConsumedFlowTimestamp",
-                "keyHashRanges",
                 "metadata",
                 "address",
                 "connectedSince",
-                "clientVersion");
+                "clientVersion",
+                "drainingHashesCount",
+                "drainingHashesClearedTotal",
+                "drainingHashesUnackedMessages"
+        );
+        if (subscriptionType == SubscriptionType.Key_Shared) {
+            if (classicDispatchers) {
+                expectedFields.addAll(List.of(
+                        "readPositionWhenJoining",
+                        "keyHashRanges"
+                ));
+            } else {
+                expectedFields.addAll(List.of(
+                        "drainingHashes",
+                        "keyHashRangeArrays"
+                ));
+            }
+        }
+        final String topicName = 
newUniqueName("persistent://my-property/my-ns/testConsumerStatsOutput");
+        final String subName = "my-subscription";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(subscriptionType)
+                .subscriptionName(subName)
+                .subscribe();
+
+        String topicStatsUri =
+                String.format("%s/admin/v2/%s/stats", 
pulsar.getWebServiceAddress(), topicName.replace("://", "/"));
+        String topicStatsJson = 
BrokerTestUtil.getJsonResourceAsString(topicStatsUri);
+        ObjectMapper mapper = ObjectMapperFactory.create();
+        JsonNode node = 
mapper.readTree(topicStatsJson).get("subscriptions").get(subName).get("consumers").get(0);
+        
assertThat(node.fieldNames()).toIterable().containsExactlyInAnyOrderElementsOf(expectedFields);
+    }
 
-        final String topicName = 
"persistent://prop/use/ns-abc/testConsumerStatsOutput";
+    @Test
+    public void testLastConsumerFlowTimestamp() throws PulsarClientException, 
PulsarAdminException {
+        final String topicName = 
newUniqueName("persistent://my-property/my-ns/testLastConsumerFlowTimestamp");
         final String subName = "my-subscription";
 
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
@@ -255,18 +321,9 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         TopicStats stats = admin.topics().getStats(topicName);
-        ObjectMapper mapper = ObjectMapperFactory.create();
         ConsumerStats consumerStats = stats.getSubscriptions()
                 .get(subName).getConsumers().get(0);
         Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
-        JsonNode node = 
mapper.readTree(mapper.writer().writeValueAsString(consumerStats));
-        Iterator<String> itr = node.fieldNames();
-        while (itr.hasNext()) {
-            String field = itr.next();
-            Assert.assertTrue(allowedFields.contains(field), field + " should 
not be exposed");
-        }
-
-        consumer.close();
     }
 
 
@@ -481,4 +538,189 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
         assertEquals(0, consumers.get(0).getUnackedMessages());
     }
 
+    @Test
+    public void testKeySharedDrainingHashesConsumerStats() throws Exception {
+        String topic = 
newUniqueName("testKeySharedDrainingHashesConsumerStats");
+        String subscriptionName = "sub";
+        int numberOfKeys = 10;
+
+        // Create a producer for the topic
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        // Create the first consumer (c1) for the topic
+        @Cleanup
+        Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .consumerName("c1")
+                .receiverQueueSize(100)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        // Get the dispatcher and selector for the topic
+        StickyKeyDispatcher dispatcher = getDispatcher(topic, 
subscriptionName);
+        StickyKeyConsumerSelector selector = dispatcher.getSelector();
+
+        // Send 20 messages with keys cycling from 0 to numberOfKeys-1
+        for (int i = 0; i < 20; i++) {
+            String key = String.valueOf(i % numberOfKeys);
+            int stickyKeyHash = 
selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8));
+            log.info("Sending message with value {} key {} hash {}", key, i, 
stickyKeyHash);
+            producer.newMessage()
+                    .key(key)
+                    .value(i)
+                    .send();
+        }
+
+        // Wait until all the already published messages have been pre-fetched 
by c1
+        PendingAcksMap c1PendingAcks = 
dispatcher.getConsumers().get(0).getPendingAcks();
+        Awaitility.await().ignoreExceptions().until(() -> c1PendingAcks.size() 
== 20);
+
+        // Add a new consumer (c2) for the topic
+        @Cleanup
+        Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .consumerName("c2")
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        // Get the subscription stats and consumer stats
+        SubscriptionStats subscriptionStats = 
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+        ConsumerStats c1Stats = subscriptionStats.getConsumers().get(0);
+        ConsumerStats c2Stats = subscriptionStats.getConsumers().get(1);
+
+        Set<Integer> c2HashesByStats = new HashSet<>();
+        Set<Integer> c2HashesByDispatcher = new HashSet<>();
+        Map<Integer, Integer> c1DrainingHashesExpected = new HashMap<>();
+
+        int expectedDrainingHashesUnackMessages = 0;
+        // Determine which hashes are assigned to c2 and which are draining 
from c1
+        // run for the same keys as the sent messages
+        for (int i = 0; i < 20; i++) {
+            // use the same key as in the sent messages
+            String key = String.valueOf(i % numberOfKeys);
+            int hash = 
selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8));
+            // Validate that the hash is assigned to c2 in stats
+            if ("c2".equals(findConsumerNameForHash(subscriptionStats, hash))) 
{
+                c2HashesByStats.add(hash);
+            }
+            // use the selector to determine the expected draining hashes for 
c1
+            org.apache.pulsar.broker.service.Consumer selected = 
selector.select(hash);
+            if ("c2".equals(selected.consumerName())) {
+                c2HashesByDispatcher.add(hash);
+                c1DrainingHashesExpected.compute(hash, (k, v) -> v == null ? 1 
: v + 1);
+                expectedDrainingHashesUnackMessages++;
+            }
+        }
+
+        // Validate that the hashes assigned to c2 match between stats and 
dispatcher
+        
assertThat(c2HashesByStats).containsExactlyInAnyOrderElementsOf(c2HashesByDispatcher);
+
+        // Validate the draining hashes for c1
+        
assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash)
+                .containsExactlyInAnyOrderElementsOf(c2HashesByStats);
+        
assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash, 
DrainingHash::getUnackMsgs)
+                
.containsExactlyInAnyOrderElementsOf(c1DrainingHashesExpected.entrySet().stream()
+                        .map(e -> Tuple.tuple(e.getKey(), 
e.getValue())).toList());
+
+        // Validate that c2 has no draining hashes
+        assertThat(c2Stats.getDrainingHashes()).isEmpty();
+
+        // Validate counters
+        
assertThat(c1Stats.getDrainingHashesCount()).isEqualTo(c2HashesByStats.size());
+        assertThat(c1Stats.getDrainingHashesClearedTotal()).isEqualTo(0);
+        
assertThat(c1Stats.getDrainingHashesUnackedMessages()).isEqualTo(expectedDrainingHashesUnackMessages);
+        assertThat(c2Stats.getDrainingHashesCount()).isEqualTo(0);
+        assertThat(c2Stats.getDrainingHashesClearedTotal()).isEqualTo(0);
+        assertThat(c2Stats.getDrainingHashesUnackedMessages()).isEqualTo(0);
+
+        // Send another 20 messages
+        for (int i = 0; i < 20; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(i % numberOfKeys))
+                    .value(i)
+                    .send();
+        }
+
+        // Validate blocked attempts for c1
+        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+            SubscriptionStats stats = 
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+            
assertThat(stats.getConsumers().get(0).getDrainingHashes()).isNotEmpty().allSatisfy(dh
 -> {
+                assertThat(dh).extracting(DrainingHash::getBlockedAttempts)
+                        .asInstanceOf(INTEGER)
+                        .isGreaterThan(0);
+            });
+        });
+
+        // Acknowledge messages that were sent before c2 joined, to clear all 
draining hashes
+        for (int i = 0; i < 20; i++) {
+            Message<Integer> message = c1.receive(1, TimeUnit.SECONDS);
+            log.info("Acking message with value {} key {}", 
message.getValue(), message.getKey());
+            c1.acknowledge(message);
+
+            if (i == 18) {
+                // Validate that there is one draining hash left
+                
Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3))
+                        .untilAsserted(() -> {
+                            SubscriptionStats stats =
+                                    
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+                            
assertThat(stats.getConsumers().get(0)).satisfies(consumerStats -> {
+                                assertThat(consumerStats)
+                                        .describedAs("Consumer stats should 
have one draining hash %s", consumerStats)
+                                        
.extracting(ConsumerStats::getDrainingHashes)
+                                        .asList().hasSize(1);
+                            });
+                        });
+            }
+
+            if (i == 19) {
+                // Validate that there are no draining hashes left
+                
Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3))
+                        .untilAsserted(() -> {
+                            SubscriptionStats stats =
+                                    
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+                            
assertThat(stats.getConsumers().get(0)).satisfies(consumerStats -> {
+                                
assertThat(consumerStats).extracting(ConsumerStats::getDrainingHashes)
+                                        .asList().isEmpty();
+                            });
+                        });
+            }
+        }
+
+        // Get the subscription stats and consumer stats
+        subscriptionStats = 
admin.topics().getStats(topic).getSubscriptions().get(subscriptionName);
+        c1Stats = subscriptionStats.getConsumers().get(0);
+        c2Stats = subscriptionStats.getConsumers().get(1);
+
+        // Validate counters
+        assertThat(c1Stats.getDrainingHashesCount()).isEqualTo(0);
+        
assertThat(c1Stats.getDrainingHashesClearedTotal()).isEqualTo(c2HashesByStats.size());
+        assertThat(c1Stats.getDrainingHashesUnackedMessages()).isEqualTo(0);
+        assertThat(c2Stats.getDrainingHashesCount()).isEqualTo(0);
+        assertThat(c2Stats.getDrainingHashesClearedTotal()).isEqualTo(0);
+        assertThat(c2Stats.getDrainingHashesUnackedMessages()).isEqualTo(0);
+
+    }
+
+    private String findConsumerNameForHash(SubscriptionStats 
subscriptionStats, int hash) {
+        return findConsumerForHash(subscriptionStats, 
hash).map(ConsumerStats::getConsumerName).orElse(null);
+    }
+
+    private Optional<? extends ConsumerStats> 
findConsumerForHash(SubscriptionStats subscriptionStats, int hash) {
+        return subscriptionStats.getConsumers().stream()
+                .filter(consumerStats -> 
consumerStats.getKeyHashRangeArrays().stream()
+                        .anyMatch(hashRanges -> hashRanges[0] <= hash && 
hashRanges[1] >= hash))
+                .findFirst();
+    }
+
+    @SneakyThrows
+    private StickyKeyDispatcher getDispatcher(String topic, String 
subscription) {
+        return (StickyKeyDispatcher) 
pulsar.getBrokerService().getTopicIfExists(topic).get()
+                .get().getSubscription(subscription).getDispatcher();
+    }
 }
diff --git a/pulsar-broker/src/test/resources/log4j2.xml 
b/pulsar-broker/src/test/resources/log4j2.xml
index 09a89702ee2..a0732096f28 100644
--- a/pulsar-broker/src/test/resources/log4j2.xml
+++ b/pulsar-broker/src/test/resources/log4j2.xml
@@ -36,5 +36,16 @@
     <Root level="INFO">
       <AppenderRef ref="CONSOLE"/>
     </Root>
+    <!-- Uncomment the following logger for debugging Key_Shared / PIP-379
+    <Logger 
name="org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers"
 level="DEBUG" additivity="false">
+      <AppenderRef ref="CONSOLE"/>
+    </Logger>
+    <Logger name="org.apache.pulsar.broker.service.DrainingHashesTracker" 
level="DEBUG" additivity="false">
+      <AppenderRef ref="CONSOLE"/>
+    </Logger>
+    <Logger 
name="org.apache.pulsar.broker.service.persistent.RescheduleReadHandler" 
level="DEBUG" additivity="false">
+      <AppenderRef ref="CONSOLE"/>
+    </Logger>
+    -->
   </Loggers>
 </Configuration>
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index d2d3600df96..16dce5903f4 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -73,8 +73,41 @@ public interface ConsumerStats {
     boolean isBlockedConsumerOnUnackedMsgs();
 
     /** The read position of the cursor when the consumer joining. */
+    @Deprecated
     String getReadPositionWhenJoining();
 
+    /**
+     * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+     * Retrieves the current number of hashes in the draining state for this 
consumer.
+     *
+     * @return the current number of hashes in the draining state for this 
consumer
+     */
+    int getDrainingHashesCount();
+
+    /**
+     * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+     * Retrieves the total number of hashes cleared from the draining state 
since the consumer connected.
+     *
+     * @return the total number of hashes cleared from the draining state 
since the consumer connected
+     */
+    long getDrainingHashesClearedTotal();
+
+    /**
+     * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+     * Retrieves the total number of unacked messages for all draining hashes 
for this consumer.
+     *
+     * @return the total number of unacked messages for all draining hashes 
for this consumer
+     */
+    int getDrainingHashesUnackedMessages();
+
+    /**
+     * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+     * Retrieves the draining hashes for this consumer.
+     *
+     * @return a list of draining hashes for this consumer
+     */
+    List<DrainingHash> getDrainingHashes();
+
     /** Address of this consumer. */
     String getAddress();
 
@@ -88,9 +121,20 @@ public interface ConsumerStats {
     long getLastConsumedTimestamp();
     long getLastConsumedFlowTimestamp();
 
-    /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
+    /**
+     * Hash ranges assigned to this consumer if in Key_Shared subscription 
mode.
+     * This format and field is used when 
`subscriptionKeySharedUseClassicPersistentImplementation` is set to `false`
+     * (default).
+     */
+    List<int[]> getKeyHashRangeArrays();
+
+    /**
+     * Hash ranges assigned to this consumer if in Key_Shared subscription 
mode.
+     * This format and field is used when 
`subscriptionKeySharedUseClassicPersistentImplementation` is set to `true`.
+     */
+    @Deprecated
     List<String> getKeyHashRanges();
 
     /** Metadata (key/value strings) associated with this consumer. */
     Map<String, String> getMetadata();
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java
new file mode 100644
index 00000000000..685b0b74e64
--- /dev/null
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Contains information about a draining hash in a Key_Shared subscription.
+ * @see ConsumerStats
+ */
+public interface DrainingHash {
+    /**
+     * Get the sticky key hash value of the draining hash.
+     * @return the sticky hash value
+     */
+    int getHash();
+    /**
+     * Get number of unacknowledged messages for the draining hash.
+     * @return number of unacknowledged messages
+     */
+    int getUnackMsgs();
+    /**
+     * Get the number of times the hash has blocked an attempted delivery of a 
message.
+     * @return number of times the hash has blocked an attempted delivery of a 
message
+     */
+    int getBlockedAttempts();
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index ce3a080a855..95e7c65266b 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -121,6 +121,30 @@ public interface SubscriptionStats {
     /** This is for Key_Shared subscription to get the recentJoinedConsumers 
in the Key_Shared subscription. */
     Map<String, String> getConsumersAfterMarkDeletePosition();
 
+    /**
+     * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+     * Retrieves the current number of hashes in the draining state.
+     *
+     * @return the current number of hashes in the draining state
+     */
+    int getDrainingHashesCount();
+
+    /**
+     * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+     * Retrieves the total number of hashes cleared from the draining state 
for the connected consumers.
+     *
+     * @return the total number of hashes cleared from the draining state for 
the connected consumers
+     */
+    long getDrainingHashesClearedTotal();
+
+    /**
+     * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+     * Retrieves the total number of unacked messages for all draining hashes.
+     *
+     * @return the total number of unacked messages for all draining hashes
+     */
+    int getDrainingHashesUnackedMessages();
+
     /** SubscriptionProperties (key/value strings) associated with this 
subscribe. */
     Map<String, String> getSubscriptionProperties();
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index de36b330b7f..8811247cb2d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 import lombok.Data;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.DrainingHash;
 import org.apache.pulsar.common.util.DateFormatter;
 
 /**
@@ -80,6 +81,30 @@ public class ConsumerStatsImpl implements ConsumerStats {
     /** The read position of the cursor when the consumer joining. */
     public String readPositionWhenJoining;
 
+    /**
+     * For Key_Shared AUTO_SPLIT ordered subscriptions: The current number of 
hashes in the draining state.
+     */
+    public int drainingHashesCount;
+
+    /**
+     * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of 
hashes cleared from the draining state for
+     * the consumer.
+     */
+    public long drainingHashesClearedTotal;
+
+    /**
+     * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of 
unacked messages for all draining hashes.
+     */
+    public int drainingHashesUnackedMessages;
+
+    /**
+     * For Key_Shared subscription in AUTO_SPLIT ordered mode:
+     * Retrieves the draining hashes for this consumer.
+     *
+     * @return a list of draining hashes for this consumer
+     */
+    public List<DrainingHash> drainingHashes;
+
     /** Address of this consumer. */
     private String address;
     /** Timestamp of connection. */
@@ -96,7 +121,17 @@ public class ConsumerStatsImpl implements ConsumerStats {
 
     public long lastConsumedFlowTimestamp;
 
-    /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
+    /**
+     * Hash ranges assigned to this consumer if in Key_Shared subscription 
mode.
+     * This format and field is used when 
`subscriptionKeySharedUseClassicPersistentImplementation` is set to `false`
+     * (default).
+     */
+    public List<int[]> keyHashRangeArrays;
+
+    /**
+     * Hash ranges assigned to this consumer if in Key_Shared subscription 
mode.
+     * This format and field is used when 
`subscriptionKeySharedUseClassicPersistentImplementation` is set to `true`.
+     */
     public List<String> keyHashRanges;
 
     /** Metadata (key/value strings) associated with this consumer. */
@@ -114,6 +149,12 @@ public class ConsumerStatsImpl implements ConsumerStats {
         this.unackedMessages += stats.unackedMessages;
         this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs;
         this.readPositionWhenJoining = stats.readPositionWhenJoining;
+        this.drainingHashesCount = stats.drainingHashesCount;
+        this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal;
+        this.drainingHashesUnackedMessages = 
stats.drainingHashesUnackedMessages;
+        this.drainingHashes = stats.drainingHashes;
+        this.keyHashRanges = stats.keyHashRanges;
+        this.keyHashRangeArrays = stats.keyHashRangeArrays;
         return this;
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java
new file mode 100644
index 00000000000..134bdac597b
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.DrainingHash;
+
+/**
+ * Contains information about a draining hash in a Key_Shared subscription.
+ * @see ConsumerStats
+ */
+@Data
+public class DrainingHashImpl implements DrainingHash {
+    /**
+     * Get the sticky key hash value of the draining hash.
+     * @return the sticky hash value
+     */
+    public int hash;
+    /**
+     * Get number of unacknowledged messages for the draining hash.
+     * @return number of unacknowledged messages
+     */
+    public int unackMsgs;
+    /**
+     * Get the number of times the hash has blocked an attempted delivery of a 
message.
+     * @return number of times the hash has blocked an attempted delivery of a 
message
+     */
+    public int blockedAttempts;
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 12734a5586c..02df9b78700 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -129,6 +129,22 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
     /** This is for Key_Shared subscription to get the recentJoinedConsumers 
in the Key_Shared subscription. */
     public Map<String, String> consumersAfterMarkDeletePosition;
 
+    /**
+     * For Key_Shared AUTO_SPLIT ordered subscriptions: The current number of 
hashes in the draining state.
+     */
+    public int drainingHashesCount;
+
+    /**
+     * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of 
hashes cleared from the draining state
+     * for the connected consumers.
+     */
+    public long drainingHashesClearedTotal;
+
+    /**
+     * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of 
unacked messages for all draining hashes.
+     */
+    public int drainingHashesUnackedMessages;
+
     /** The number of non-contiguous deleted messages ranges. */
     public int nonContiguousDeletedMessagesRanges;
 
@@ -180,6 +196,9 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         lastMarkDeleteAdvancedTimestamp = 0L;
         consumers.clear();
         consumersAfterMarkDeletePosition.clear();
+        drainingHashesCount = 0;
+        drainingHashesClearedTotal = 0L;
+        drainingHashesUnackedMessages = 0;
         nonContiguousDeletedMessagesRanges = 0;
         nonContiguousDeletedMessagesRangesSerializedSize = 0;
         earliestMsgPublishTimeInBacklog = 0L;
@@ -226,6 +245,9 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         }
         this.allowOutOfOrderDelivery |= stats.allowOutOfOrderDelivery;
         
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
+        this.drainingHashesCount += stats.drainingHashesCount;
+        this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal;
+        this.drainingHashesUnackedMessages += 
stats.drainingHashesUnackedMessages;
         this.nonContiguousDeletedMessagesRanges += 
stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += 
stats.nonContiguousDeletedMessagesRangesSerializedSize;
         if (this.earliestMsgPublishTimeInBacklog != 0 && 
stats.earliestMsgPublishTimeInBacklog != 0) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index 7b235cfa341..b737d68d5ea 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.DrainingHash;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.FunctionInstanceStats;
@@ -96,6 +97,7 @@ import 
org.apache.pulsar.common.policies.data.impl.BundlesDataImpl;
 import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
@@ -243,6 +245,7 @@ public class ObjectMapperFactory {
         resolver.addMapping(DispatchRate.class, DispatchRateImpl.class);
         resolver.addMapping(TopicStats.class, TopicStatsImpl.class);
         resolver.addMapping(ConsumerStats.class, ConsumerStatsImpl.class);
+        resolver.addMapping(DrainingHash.class, DrainingHashImpl.class);
         resolver.addMapping(NonPersistentPublisherStats.class, 
NonPersistentPublisherStatsImpl.class);
         resolver.addMapping(NonPersistentReplicatorStats.class, 
NonPersistentReplicatorStatsImpl.class);
         resolver.addMapping(NonPersistentSubscriptionStats.class, 
NonPersistentSubscriptionStatsImpl.class);

Reply via email to