This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9cb6890591a6dbebf67f7cf0dfd3d337735a6a9c Author: congbo <[email protected]> AuthorDate: Thu Feb 18 21:33:32 2021 +0800 [Pulsar Admin] Expose schema ledger in `topic stats-internal` (#9284) ## Motivation Expose schema ledger in `topic stats-internal` (cherry picked from commit c754f9e90c4ec08a1be744d497ce81fa3dcfd568) --- .../broker/service/persistent/PersistentTopic.java | 67 ++++++++++- .../service/schema/BookkeeperSchemaStorage.java | 24 ++++ .../pulsar/broker/admin/AdminApiSchemaTest.java | 125 +++++++++++++++++++++ .../client/impl/BatchMessageIndexAckTest.java | 99 ++++++++++++++++ .../data/PersistentTopicInternalStats.java | 1 + site2/docs/admin-api-topics.md | 96 ++++++++++------ 6 files changed, 375 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a27147e..89075df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -31,6 +31,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -93,6 +94,7 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPolicyListener; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; @@ -1721,14 +1723,71 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal cs.properties = cursor.getProperties(); stats.cursors.put(cursor.getName(), cs); }); - if (futures != null) { - FutureUtil.waitForAll(futures).handle((res, ex) -> { - statFuture.complete(stats); + + //Schema store ledgers + String schemaId; + try { + schemaId = TopicName.get(topic).getSchemaName(); + } catch (Throwable t) { + statFuture.completeExceptionally(t); + return statFuture; + } + + + CompletableFuture<Void> schemaStoreLedgersFuture = new CompletableFuture<>(); + stats.schemaLedgers = Collections.synchronizedList(new ArrayList<>()); + if (brokerService.getPulsar().getSchemaStorage() != null + && brokerService.getPulsar().getSchemaStorage() instanceof BookkeeperSchemaStorage) { + ((BookkeeperSchemaStorage) brokerService.getPulsar().getSchemaStorage()) + .getStoreLedgerIdsBySchemaId(schemaId) + .thenAccept(ledgers -> { + List<CompletableFuture<Void>> getLedgerMetadataFutures = new ArrayList<>(); + ledgers.forEach(ledgerId -> { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + getLedgerMetadataFutures.add(completableFuture); + brokerService.getPulsar().getBookKeeperClient() + .getLedgerMetadata(ledgerId) + .thenAccept(metadata -> { + LedgerInfo schemaLedgerInfo = new LedgerInfo(); + schemaLedgerInfo.ledgerId = metadata.getLedgerId(); + schemaLedgerInfo.entries = metadata.getLastEntryId() + 1; + schemaLedgerInfo.size = metadata.getLength(); + if (includeLedgerMetadata) { + info.metadata = metadata.toSafeString(); + } + stats.schemaLedgers.add(schemaLedgerInfo); + completableFuture.complete(null); + }).exceptionally(e -> { + completableFuture.completeExceptionally(e); + return null; + }); + }); + FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> { + schemaStoreLedgersFuture.complete(null); + }).exceptionally(e -> { + schemaStoreLedgersFuture.completeExceptionally(e); + return null; + }); + }).exceptionally(e -> { + schemaStoreLedgersFuture.completeExceptionally(e); return null; }); } else { - statFuture.complete(stats); + schemaStoreLedgersFuture.complete(null); } + schemaStoreLedgersFuture.thenRun(() -> { + if (futures != null) { + FutureUtil.waitForAll(futures).handle((res, ex) -> { + statFuture.complete(stats); + return null; + }); + } else { + statFuture.complete(stats); + } + }).exceptionally(e -> { + statFuture.completeExceptionally(e); + return null; + }); return statFuture; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index db948c5..0911ac6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -33,9 +33,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -626,6 +628,28 @@ public class BookkeeperSchemaStorage implements SchemaStorage { return future; } + public CompletableFuture<List<Long>> getStoreLedgerIdsBySchemaId(String schemaId) { + CompletableFuture<List<Long>> ledgerIdsFuture = new CompletableFuture<>(); + getSchemaLocator(getSchemaPath(schemaId)).thenAccept(locator -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Get all store schema ledgerIds - locator: {}", schemaId, locator); + } + + if (!locator.isPresent()) { + ledgerIdsFuture.complete(Collections.emptyList()); + return; + } + Set<Long> ledgerIds = new HashSet<>(); + SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator; + schemaLocator.getIndexList().forEach(indexEntry -> ledgerIds.add(indexEntry.getPosition().getLedgerId())); + ledgerIdsFuture.complete(new ArrayList<>(ledgerIds)); + }).exceptionally(e -> { + ledgerIdsFuture.completeExceptionally(e); + return null; + }); + return ledgerIdsFuture; + } + interface Functions { static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle ledger, long entry) { final CompletableFuture<LedgerEntry> future = new CompletableFuture<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java index 140cd46..d4163b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java @@ -20,23 +20,38 @@ package org.apache.pulsar.broker.admin; import static java.nio.charset.StandardCharsets.US_ASCII; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.net.BookieId; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.StringSchema; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfoWithVersion; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -213,4 +228,114 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest { assertEquals(schemaInfo, keyValueSchema.getSchemaInfo()); } + + @Test + void getTopicIntervalStateIncludeSchemaStoreLedger() throws PulsarAdminException { + String topicName = "persistent://schematest/test/get-schema-ledger-info"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, "test", MessageId.earliest); + Schema<Foo> schema = Schema.AVRO(Foo.class); + admin.schemas().createSchema(topicName, schema.getSchemaInfo()); + long ledgerId = 1; + long entryId = 10; + long length = 10; + doReturn(CompletableFuture.completedFuture(new LedgerMetadata() { + @Override + public long getLedgerId() { + return ledgerId; + } + + @Override + public int getEnsembleSize() { + return 0; + } + + @Override + public int getWriteQuorumSize() { + return 0; + } + + @Override + public int getAckQuorumSize() { + return 0; + } + + @Override + public long getLastEntryId() { + return entryId; + } + + @Override + public long getLength() { + return length; + } + + @Override + public boolean hasPassword() { + return false; + } + + @Override + public byte[] getPassword() { + return new byte[0]; + } + + @Override + public DigestType getDigestType() { + return null; + } + + @Override + public long getCtime() { + return 0; + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public Map<String, byte[]> getCustomMetadata() { + return null; + } + + @Override + public List<BookieId> getEnsembleAt(long entryId) { + return null; + } + + @Override + public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() { + return null; + } + + @Override + public State getState() { + return null; + } + + @Override + public String toSafeString() { + return "test"; + } + + @Override + public int getMetadataFormatVersion() { + return 0; + } + + @Override + public long getCToken() { + return 0; + } + })).when(mockBookKeeper).getLedgerMetadata(anyLong()); + PersistentTopicInternalStats persistentTopicInternalStats = admin.topics().getInternalStats(topicName); + List<PersistentTopicInternalStats.LedgerInfo> list = persistentTopicInternalStats.schemaLedgers; + assertEquals(1, list.size()); + PersistentTopicInternalStats.LedgerInfo ledgerInfo = list.get(0); + assertEquals(ledgerId, ledgerInfo.ledgerId); + assertEquals(entryId + 1, ledgerInfo.entries); + assertEquals(length, ledgerInfo.size); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java index 6ff7d23..da76e73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java @@ -20,6 +20,9 @@ package org.apache.pulsar.client.impl; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.net.BookieId; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -37,10 +40,15 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; + @Slf4j public class BatchMessageIndexAckTest extends ProducerConsumerBase { @@ -50,6 +58,97 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); super.internalSetup(); super.producerBaseSetup(); + doReturn(CompletableFuture.completedFuture(new LedgerMetadata() { + @Override + public long getLedgerId() { + return 0; + } + + @Override + public int getEnsembleSize() { + return 0; + } + + @Override + public int getWriteQuorumSize() { + return 0; + } + + @Override + public int getAckQuorumSize() { + return 0; + } + + @Override + public long getLastEntryId() { + return 0; + } + + @Override + public long getLength() { + return 0; + } + + @Override + public boolean hasPassword() { + return false; + } + + @Override + public byte[] getPassword() { + return new byte[0]; + } + + @Override + public DigestType getDigestType() { + return null; + } + + @Override + public long getCtime() { + return 0; + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public Map<String, byte[]> getCustomMetadata() { + return null; + } + + @Override + public List<BookieId> getEnsembleAt(long entryId) { + return null; + } + + @Override + public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() { + return null; + } + + @Override + public State getState() { + return null; + } + + @Override + public String toSafeString() { + return null; + } + + @Override + public int getMetadataFormatVersion() { + return 0; + } + + @Override + public long getCToken() { + return 0; + } + })).when(mockBookKeeper).getLedgerMetadata(anyLong()); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java index e527980..4bd61c0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java @@ -43,6 +43,7 @@ public class PersistentTopicInternalStats { public List<LedgerInfo> ledgers; public Map<String, CursorStats> cursors; + public List<LedgerInfo> schemaLedgers; // LedgerInfo for compacted topic if exist. public LedgerInfo compactedLedger; diff --git a/site2/docs/admin-api-topics.md b/site2/docs/admin-api-topics.md index 5af61ad..6038a42 100644 --- a/site2/docs/admin-api-topics.md +++ b/site2/docs/admin-api-topics.md @@ -358,6 +358,20 @@ You can get the detailed statistics of a topic. - **size**: The size of messages written to this ledger (in bytes). - **offloaded**: Whether this ledger is offloaded. + + - **metadata**: The ledger metadata. + + - **schemaLedgers**: The ordered list of all ledgers for this topic schema. + + - **ledgerId**: The ID of this ledger. + + - **entries**: The total number of entries belong to this ledger. + + - **size**: The size of messages written to this ledger (in bytes). + + - **offloaded**: Whether this ledger is offloaded. + + - **metadata**: The ledger metadata. - **compactedLedger**: The ledgers holding un-acked messages after topic compaction. @@ -395,44 +409,60 @@ The following is an example of the detailed statistics of a topic. ```json { - "entriesAddedCounter": 20449518, - "numberOfEntries": 3233, - "totalSize": 331482, - "currentLedgerEntries": 3233, - "currentLedgerSize": 331482, - "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825", - "lastLedgerCreationFailureTimestamp": null, - "waitingCursorsCount": 1, - "pendingAddEntriesCount": 0, - "lastConfirmedEntry": "324711539:3232", - "state": "LedgerOpened", - "ledgers": [ + "entriesAddedCounter":0, + "numberOfEntries":0, + "totalSize":0, + "currentLedgerEntries":0, + "currentLedgerSize":0, + "lastLedgerCreatedTimestamp":"2021-01-22T21:12:14.868+08:00", + "lastLedgerCreationFailureTimestamp":null, + "waitingCursorsCount":0, + "pendingAddEntriesCount":0, + "lastConfirmedEntry":"3:-1", + "state":"LedgerOpened", + "ledgers":[ { - "ledgerId": 324711539, - "entries": 0, - "size": 0, - "offloaded": true + "ledgerId":3, + "entries":0, + "size":0, + "offloaded":false, + "metadata":null } ], - "compactedLedger": { - "ledgerId": 324711540, - "entries": 10, - "size": 100, - "offloaded": false + "cursors":{ + "test":{ + "markDeletePosition":"3:-1", + "readPosition":"3:-1", + "waitingReadOp":false, + "pendingReadOps":0, + "messagesConsumedCounter":0, + "cursorLedger":4, + "cursorLedgerLastEntry":1, + "individuallyDeletedMessages":"[]", + "lastLedgerSwitchTimestamp":"2021-01-22T21:12:14.966+08:00", + "state":"Open", + "numberOfEntriesSinceFirstNotAckedMessage":0, + "totalNonContiguousDeletedMessagesRange":0, + "properties":{ + + } + } }, - "cursors": { - "my-subscription": { - "markDeletePosition": "324711539:3133", - "readPosition": "324711539:3233", - "waitingReadOp": true, - "pendingReadOps": 0, - "messagesConsumedCounter": 20449501, - "cursorLedger": 324702104, - "cursorLedgerLastEntry": 21, - "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]", - "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313", - "state": "Open" + "schemaLedgers":[ + { + "ledgerId":1, + "entries":11, + "size":10, + "offloaded":false, + "metadata":null } + ], + "compactedLedger":{ + "ledgerId":-1, + "entries":-1, + "size":-1, + "offloaded":false, + "metadata":null } } ```
