This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit af6c2fa5659185193fa4af413aa5cea0007b81cd Author: lipenghui <[email protected]> AuthorDate: Thu Apr 22 01:04:52 2021 +0800 Add underReplicate state in the topic internal stats (#10013) * Add underReplicate state in the topic internal stats * Apply comments. (cherry picked from commit bfba8c8597cfcf0013a2ae1a6490dfd48f15fa76) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++ .../org/apache/pulsar/broker/PulsarService.java | 7 +- .../broker/cache/LocalZooKeeperCacheService.java | 10 ++- .../broker/service/persistent/PersistentTopic.java | 43 +++++++++---- .../java/org/apache/pulsar/schema/SchemaTest.java | 2 +- .../stats/client/PulsarBrokerStatsClientTest.java | 22 ++++--- .../data/PersistentTopicInternalStats.java | 1 + .../pulsar/zookeeper/ZooKeeperChildrenCache.java | 4 ++ .../pulsar/tests/integration/admin/AdminTest.java | 75 ++++++++++++++++++++++ .../src/test/resources/pulsar-messaging.xml | 1 + 10 files changed, 154 insertions(+), 26 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 66d4c71..c0a7dd4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -39,6 +39,7 @@ import io.netty.util.Recycler.Handle; import java.time.Clock; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -47,6 +48,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -119,6 +121,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext; import org.apache.bookkeeper.mledger.util.CallbackMutex; import org.apache.bookkeeper.mledger.util.Futures; +import org.apache.bookkeeper.net.BookieId; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; @@ -3532,4 +3535,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class); + public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) { + LedgerInfo ledgerInfo = ledgers.get(ledgerId); + if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) { + return CompletableFuture.completedFuture(Collections.emptySet()); + } + + return getLedgerHandle(ledgerId).thenCompose(lh -> { + Set<BookieId> ensembles = new HashSet<>(); + lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll); + return CompletableFuture.completedFuture(ensembles); + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index ef8df6f..5c79909 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -43,8 +43,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -1366,4 +1367,8 @@ public class PulsarService implements AutoCloseable { public Optional<Integer> getBrokerListenPortTls() { return brokerService.getListenPortTls(); } + + public CompletableFuture<Set<String>> getAvailableBookiesAsync() { + return this.localZkCacheService.availableBookiesCache().getAsync(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java index 48b3f88..cf21f02 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java @@ -34,8 +34,9 @@ import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.ZooKeeperCache; -import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache; +import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; @@ -50,6 +51,7 @@ public class LocalZooKeeperCacheService { private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers"; public static final String OWNER_INFO_ROOT = "/namespace"; public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies"; + public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available"; private final ZooKeeperCache cache; @@ -57,6 +59,7 @@ public class LocalZooKeeperCacheService { private ZooKeeperManagedLedgerCache managedLedgerListCache; private ResourceQuotaCache resourceQuotaCache; private ZooKeeperDataCache<LocalPolicies> policiesCache; + private ZooKeeperChildrenCache availableBookiesCache; private ConfigurationCacheService configurationCacheService; @@ -121,6 +124,7 @@ public class LocalZooKeeperCacheService { this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT); this.resourceQuotaCache = new ResourceQuotaCache(cache); this.resourceQuotaCache.initZK(); + this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT); } private void initZK() throws PulsarServerException { @@ -248,6 +252,10 @@ public class LocalZooKeeperCacheService { return this.managedLedgerListCache; } + public ZooKeeperChildrenCache availableBookiesCache() { + return this.availableBookiesCache; + } + public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) { return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 85ddafb..d301443 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.BiFunction; +import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -65,6 +66,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.net.BookieId; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; @@ -1673,20 +1675,35 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal stats.ledgers = Lists.newArrayList(); List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null; - ml.getLedgersInfo().forEach((id, li) -> { - LedgerInfo info = new LedgerInfo(); - info.ledgerId = li.getLedgerId(); - info.entries = li.getEntries(); - info.size = li.getSize(); - info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); - stats.ledgers.add(info); - if (futures != null) { - futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { - if (ex == null) { - info.metadata = lMetadata; + CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync(); + availableBookiesFuture.whenComplete((bookies, e) -> { + if (e != null) { + log.error("[{}] Failed to fetch available bookies.", topic, e); + statFuture.completeExceptionally(e); + } else { + ml.getLedgersInfo().forEach((id, li) -> { + LedgerInfo info = new LedgerInfo(); + info.ledgerId = li.getLedgerId(); + info.entries = li.getEntries(); + info.size = li.getSize(); + info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); + stats.ledgers.add(info); + if (futures != null) { + futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { + if (ex == null) { + info.metadata = lMetadata; + } + return null; + })); + futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> { + if (ex == null) { + info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString) + .collect(Collectors.toList())); + } + return null; + })); } - return null; - })); + }); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d72cbf1..4ffdcad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -260,7 +260,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .getSchemaLedgerList(TopicName.get(topic).getSchemaName()); assertEquals(ledgers.size(), 2); admin.topics().delete(topic, true, true); - assertEquals(this.pulsar.getSchemaRegistryService() + assertEquals(this.pulsar.getSchemaRegistryService()PulsarService.java .trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(), 0); for (Long ledger : ledgers) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index 0bdb2fe..6f3c261 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -18,6 +18,14 @@ */ package org.apache.pulsar.stats.client; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.net.URL; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.ServerErrorException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -39,16 +47,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import javax.ws.rs.ClientErrorException; -import javax.ws.rs.ServerErrorException; -import java.net.URL; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.spy; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { @BeforeMethod @@ -124,6 +122,10 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); PersistentTopicInternalStats internalStats = topic.getInternalStats(true).get(); assertNotNull(internalStats.ledgers.get(0).metadata); + // For the mock test, the default ensembles is ["192.0.2.1:1234","192.0.2.2:1234","192.0.2.3:1234"] + // The registed bookie ID is 192.168.1.1:5000 + assertTrue(internalStats.ledgers.get(0).underReplicated); + CursorStats cursor = internalStats.cursors.get(subscriptionName); assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs); assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0 diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java index 4bd61c0..f1306be 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java @@ -57,6 +57,7 @@ public class PersistentTopicInternalStats { public long size; public boolean offloaded; public String metadata; + public boolean underReplicated; } /** diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java index 3cc381f..6b05f41 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java @@ -66,6 +66,10 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String> return children; } + public CompletableFuture<Set<String>> getAsync() { + return getAsync(this.path); + } + public CompletableFuture<Set<String>> getAsync(String path) { return cache.getChildrenAsync(path, this); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java new file mode 100644 index 0000000..3400b69 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.admin; + +import static org.testng.Assert.assertNotNull; + +import java.util.function.Supplier; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.tests.integration.messaging.MessagingBase; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Integration tests for Pulsar Admin. + */ +@Slf4j +public class AdminTest extends MessagingBase { + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testUnderReplicatedState(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception { + + String topicName = getNonPartitionedTopic("replicated-state", true); + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl.get()) + .build(); + + @Cleanup + final PulsarClient client = PulsarClient.builder() + .serviceUrl(serviceUrl.get()) + .build(); + + @Cleanup + final Producer<String> producer = client.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + + for (int i = 0; i < 10; i++) { + MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send(); + assertNotNull(messageId); + } + + log.info("Successfully to publish 10 messages to {}", topicName); + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName); + Assert.assertTrue(stats.ledgers.size() > 0); + for (PersistentTopicInternalStats.LedgerInfo ledger : stats.ledgers) { + Assert.assertFalse(ledger.underReplicated); + } + } +} diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml index 9045413..feb1cce 100644 --- a/tests/integration/src/test/resources/pulsar-messaging.xml +++ b/tests/integration/src/test/resources/pulsar-messaging.xml @@ -26,6 +26,7 @@ <class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" /> <class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" /> <class name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" /> + <class name="org.apache.pulsar.tests.integration.admin.AdminTest" /> </classes> </test> </suite>
