This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 97ba09e Report compacted topic ledger info when calling get internal
stats. (#7988)
97ba09e is described below
commit 97ba09e63be256412ff013611dca1f000cff9b9a
Author: Marvin Cai <[email protected]>
AuthorDate: Mon Sep 7 17:13:25 2020 -0700
Report compacted topic ledger info when calling get internal stats. (#7988)
Fixes #7895
### Motivation
For get-internal-stats of persistent topic admin cli:
https://pulsar.apache.org/docs/en/2.6.0/admin-api-persistent-topics/#get-internal-stats,
we can also return the compacted topic ledger id if compaction is enabled. So
we'll able to read from ledger without creating additional subscription, it can
benefit like querying compacted topic from Pulsar SQL.
### Modifications
Expose CompactedTopicContext from CompactedTopicImpl, try to get ledger
information of compacted topic ledger if exist in PersistentTopic.
### Verifying this change
This change added tests and can be verified as follows:
- Added unit test to verify correct compacted ledger info is returned after
compaction.
* Report compacted topic ledger info when calling get internal stats.
* Update documentation to add information about returning compacted topic
ledger when get-internal-stats.
---
.../broker/service/persistent/PersistentTopic.java | 21 +++++++++++++++++++++
.../pulsar/compaction/CompactedTopicImpl.java | 14 +++++++++++++-
.../apache/pulsar/compaction/CompactionTest.java | 7 +++++++
.../policies/data/PersistentTopicInternalStats.java | 3 +++
site2/docs/admin-api-persistent-topics.md | 19 ++++++++++++++++++-
5 files changed, 62 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8ff51f6..becd094 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -38,6 +38,7 @@ import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -1605,6 +1606,26 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
stats.ledgers.add(info);
});
+ // Add ledger info for compacted topic ledger if exist.
+ LedgerInfo info = new LedgerInfo();
+ info.ledgerId = -1;
+ info.entries = -1;
+ info.size = -1;
+
+ try {
+ Optional<CompactedTopicImpl.CompactedTopicContext>
compactedTopicContext =
+
((CompactedTopicImpl)compactedTopic).getCompactedTopicContext();
+ if (compactedTopicContext.isPresent()) {
+ CompactedTopicImpl.CompactedTopicContext ledgerContext =
compactedTopicContext.get();
+ info.ledgerId = ledgerContext.getLedger().getId();
+ info.entries = ledgerContext.getLedger().getLastAddConfirmed()
+ 1;
+ info.size = ledgerContext.getLedger().getLength();
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ log.warn("[{}]Fail to get ledger information for compacted
topic.", topic);
+ }
+ stats.compactedLedger = info;
+
stats.cursors = Maps.newTreeMap();
ml.getCursors().forEach(c -> {
ManagedCursorImpl cursor = (ManagedCursorImpl) c;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 2d430a7..74d24c6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -28,8 +28,11 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import lombok.Getter;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -250,7 +253,8 @@ public class CompactedTopicImpl implements CompactedTopic {
});
}
- static class CompactedTopicContext {
+ @Getter
+ public static class CompactedTopicContext {
final LedgerHandle ledger;
final AsyncLoadingCache<Long,MessageIdData> cache;
@@ -260,6 +264,14 @@ public class CompactedTopicImpl implements CompactedTopic {
}
}
+ /**
+ * Getter for CompactedTopicContext.
+ * @return CompactedTopicContext
+ */
+ public Optional<CompactedTopicContext> getCompactedTopicContext() throws
ExecutionException, InterruptedException {
+ return compactedTopicContext == null? Optional.empty() :
Optional.of(compactedTopicContext.get());
+ }
+
private static int comparePositionAndMessageId(PositionImpl p,
MessageIdData m) {
return ComparisonChain.start()
.compare(p.getLedgerId(), m.getLedgerId())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 2505a6e..4ef450d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
@@ -132,6 +133,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
compactor.compact(topic).get();
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic);
+ // Compacted topic ledger should have same number of entry equals to
number of unique key.
+ Assert.assertEquals(expected.size(),
internalStats.compactedLedger.entries);
+ Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+ Assert.assertFalse(internalStats.compactedLedger.offloaded);
+
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index aa9c595..63fd3cb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -44,6 +44,9 @@ public class PersistentTopicInternalStats {
public List<LedgerInfo> ledgers;
public Map<String, CursorStats> cursors;
+ // LedgerInfo for compacted topic if exist.
+ public LedgerInfo compactedLedger;
+
/**
* Ledger information.
*/
diff --git a/site2/docs/admin-api-persistent-topics.md
b/site2/docs/admin-api-persistent-topics.md
index 340e7c7..df950f9 100644
--- a/site2/docs/admin-api-persistent-topics.md
+++ b/site2/docs/admin-api-persistent-topics.md
@@ -364,6 +364,16 @@ It shows detailed statistics of a topic.
- **offloaded**: Whether this ledger is offloaded
+ - **compactedLedger**: The ledgers holding un-acked messages after topic
compaction.
+
+ - **ledgerId**: Id of this ledger
+
+ - **entries**: Total number of entries belong to this ledger
+
+ - **size**: Size of messages written to this ledger (in bytes)
+
+ - **offloaded**: Will always be false for compacted topic ledger.
+
- **cursors**: The list of all cursors on this topic. There will be one
for every subscription you saw in the topic stats.
- **markDeletePosition**: All of messages before the
markDeletePosition are acknowledged by the subscriber.
@@ -403,9 +413,16 @@ It shows detailed statistics of a topic.
{
"ledgerId": 324711539,
"entries": 0,
- "size": 0
+ "size": 0,
+ "offloaded": true
}
],
+ "compactedLedger": {
+ "ledgerId": 324711540,
+ "entries": 10,
+ "size": 100,
+ "offloaded": false
+ },
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",