sijie commented on a change in pull request #9284:
URL: https://github.com/apache/pulsar/pull/9284#discussion_r563014756
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
##########
@@ -213,4 +228,113 @@ public void createKeyValueSchema() throws Exception {
assertEquals(schemaInfo, keyValueSchema.getSchemaInfo());
}
+
+ @Test void getTopicIntervalStateIncludeSchemaStoreLedger() throws
PulsarAdminException {
Review comment:
```suggestion
@Test
void getTopicIntervalStateIncludeSchemaStoreLedger() throws
PulsarAdminException {
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1774,14 +1776,71 @@ public TopicStats getStats(boolean getPreciseBacklog) {
cs.properties = cursor.getProperties();
stats.cursors.put(cursor.getName(), cs);
});
- if (futures != null) {
- FutureUtil.waitForAll(futures).handle((res, ex) -> {
- statFuture.complete(stats);
+
+ //Schema store ledgers
+ String schemaId;
+ try {
+ schemaId = TopicName.get(topic).getSchemaName();
+ } catch (Throwable t) {
+ statFuture.completeExceptionally(t);
+ return statFuture;
+ }
+
+
+ CompletableFuture<Void> schemaStoreLedgersFuture = new
CompletableFuture<>();
+ stats.schemaLedgers = new ArrayList<>();
+ if (brokerService.getPulsar().getSchemaStorage() != null
+ && brokerService.getPulsar().getSchemaStorage() instanceof
BookkeeperSchemaStorage) {
+ ((BookkeeperSchemaStorage)
brokerService.getPulsar().getSchemaStorage())
+ .getStoreLedgerIdsBySchemaId(schemaId)
+ .thenAccept(ledgers -> {
+ List<CompletableFuture<Void>> getLedgerMetadataFutures
= new ArrayList<>();
+ ledgers.forEach(ledgerId -> {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ getLedgerMetadataFutures.add(completableFuture);
+ brokerService.getPulsar().getBookKeeperClient()
+ .getLedgerMetadata(ledgerId)
+ .thenAccept(metadata -> {
+ LedgerInfo schemaLedgerInfo = new
LedgerInfo();
+ schemaLedgerInfo.ledgerId =
metadata.getLedgerId();
+ schemaLedgerInfo.entries =
metadata.getLastEntryId() + 1;
+ schemaLedgerInfo.size =
metadata.getLength();
+ if (includeLedgerMetadata) {
+ info.metadata =
metadata.toSafeString();
+ }
+
stats.schemaLedgers.add(schemaLedgerInfo);
Review comment:
you need to synchronize `stats.schemaLedgers`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]