This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5f999251ed4 [fix] [bk] Correctct the bookie info after ZK client is
reconnected (#21035)
5f999251ed4 is described below
commit 5f999251ed44bc31caf0cedc6511a790cbfe83ba
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 25 19:21:21 2023 +0800
[fix] [bk] Correctct the bookie info after ZK client is reconnected (#21035)
Motivation: After [PIP-118: reconnect broker when ZooKeeper session
expires](https://github.com/apache/pulsar/pull/13341), the Broker will not shut
down after losing the connection of the local metadata store in the default
configuration. However, before the ZK client is reconnected, the events of BK
online and offline are lost, resulting in incorrect BK info in the memory. You
can reproduce the issue by the test `BkEnsemblesChaosTest.
testBookieInfoIsCorrectEvenIfLostNotificationDueT [...]
Modifications: Refresh BK info in memory after the ZK client is reconnected.
(cherry picked from commit db20035bba9eb21a6b3e4e6752ab716b9df35d80)
---
.../broker/service/BkEnsemblesChaosTest.java | 71 +++++++
.../CanReconnectZKClientPulsarServiceBaseTest.java | 215 +++++++++++++++++++++
.../apache/pulsar/metadata/api/MetadataCache.java | 5 +
.../bookkeeper/PulsarRegistrationClient.java | 23 ++-
.../metadata/impl/AbstractMetadataStore.java | 8 +
5 files changed, 320 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
new file mode 100644
index 00000000000..d49489d8a84
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class BkEnsemblesChaosTest extends
CanReconnectZKClientPulsarServiceBaseTest {
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test
+ public void
testBookieInfoIsCorrectEvenIfLostNotificationDueToZKClientReconnect() throws
Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
+ final byte[] msgValue = "test".getBytes();
+ admin.topics().createNonPartitionedTopic(topicName);
+ // Ensure broker works.
+ Producer<byte[]> producer1 =
client.newProducer().topic(topicName).create();
+ producer1.send(msgValue);
+ producer1.close();
+ admin.topics().unload(topicName);
+
+ // Restart some bookies, which triggers the ZK node of Bookie deleted
and created.
+ // And make the local metadata store reconnect to lose some
notification of the ZK node change.
+ for (int i = 0; i < numberOfBookies - 1; i++){
+ bkEnsemble.stopBK(i);
+ }
+ makeLocalMetadataStoreKeepReconnect();
+ for (int i = 0; i < numberOfBookies - 1; i++){
+ bkEnsemble.startBK(i);
+ }
+ // Sleep 100ms to lose the notifications of ZK node create.
+ Thread.sleep(100);
+ stopLocalMetadataStoreAlwaysReconnect();
+
+ // Ensure broker still works.
+ admin.topics().unload(topicName);
+ Producer<byte[]> producer2 =
client.newProducer().topic(topicName).create();
+ producer2.send(msgValue);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
new file mode 100644
index 00000000000..bc6df685ffc
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import io.netty.channel.Channel;
+import java.net.URL;
+import java.nio.channels.SelectionKey;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.apache.zookeeper.ClientCnxn;
+import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.reflect.WhiteboxImpl;
+
+@Slf4j
+public abstract class CanReconnectZKClientPulsarServiceBaseTest extends
TestRetrySupport {
+
+ protected final String defaultTenant = "public";
+ protected final String defaultNamespace = defaultTenant + "/default";
+ protected int numberOfBookies = 3;
+ protected final String clusterName = "r1";
+ protected URL url;
+ protected URL urlTls;
+ protected ServiceConfiguration config = new ServiceConfiguration();
+ protected ZookeeperServerTest brokerConfigZk;
+ protected LocalBookkeeperEnsemble bkEnsemble;
+ protected PulsarService pulsar;
+ protected BrokerService broker;
+ protected PulsarAdmin admin;
+ protected PulsarClient client;
+ protected ZooKeeper localZkOfBroker;
+ protected Object localMetaDataStoreClientCnx;
+ protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal =
new AtomicBoolean();
+ protected void startZKAndBK() throws Exception {
+ // Start ZK.
+ brokerConfigZk = new ZookeeperServerTest(0);
+ brokerConfigZk.start();
+
+ // Start BK.
+ bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0);
+ bkEnsemble.start();
+ }
+
+ protected void startBrokers() throws Exception {
+ // Start brokers.
+ setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk);
+ pulsar = new PulsarService(config);
+ pulsar.start();
+ broker = pulsar.getBrokerService();
+ ZKMetadataStore zkMetadataStore = (ZKMetadataStore)
pulsar.getLocalMetadataStore();
+ localZkOfBroker = zkMetadataStore.getZkClient();
+ ClientCnxn cnxn = WhiteboxImpl.getInternalState(localZkOfBroker,
"cnxn");
+ Object sendThread = WhiteboxImpl.getInternalState(cnxn, "sendThread");
+ localMetaDataStoreClientCnx =
WhiteboxImpl.getInternalState(sendThread, "clientCnxnSocket");
+
+ url = new URL(pulsar.getWebServiceAddress());
+ urlTls = new URL(pulsar.getWebServiceAddressTls());
+ admin = PulsarAdmin.builder().serviceHttpUrl(url.toString()).build();
+ client = PulsarClient.builder().serviceUrl(url.toString()).build();
+ }
+
+ protected void makeLocalMetadataStoreKeepReconnect() throws Exception {
+ if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false,
true)) {
+ throw new RuntimeException("Local metadata store is already
keeping reconnect");
+ }
+ if
(localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO"))
{
+ makeLocalMetadataStoreKeepReconnectNIO();
+ } else {
+ // ClientCnxnSocketNetty.
+ makeLocalMetadataStoreKeepReconnectNetty();
+ }
+ }
+
+ protected void makeLocalMetadataStoreKeepReconnectNIO() {
+ new Thread(() -> {
+ while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+ try {
+ SelectionKey sockKey =
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey");
+ if (sockKey != null) {
+ sockKey.channel().close();
+ }
+ // Prevents high cpu usage.
+ Thread.sleep(5);
+ } catch (Exception e) {
+ log.error("Try close the ZK connection of local metadata
store failed: {}", e.toString());
+ }
+ }
+ }).start();
+ }
+
+ protected void makeLocalMetadataStoreKeepReconnectNetty() {
+ new Thread(() -> {
+ while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+ try {
+ Channel channel =
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "channel");
+ if (channel != null) {
+ channel.close();
+ }
+ // Prevents high cpu usage.
+ Thread.sleep(5);
+ } catch (Exception e) {
+ log.error("Try close the ZK connection of local metadata
store failed: {}", e.toString());
+ }
+ }
+ }).start();
+ }
+
+ protected void stopLocalMetadataStoreAlwaysReconnect() {
+ LocalMetadataStoreInReconnectFinishSignal.set(false);
+ }
+
+ protected void createDefaultTenantsAndClustersAndNamespace() throws
Exception {
+ admin.clusters().createCluster(clusterName, ClusterData.builder()
+ .serviceUrl(url.toString())
+ .serviceUrlTls(urlTls.toString())
+ .brokerServiceUrl(pulsar.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+
+ admin.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(clusterName)));
+
+ admin.namespaces().createNamespace(defaultNamespace,
Sets.newHashSet(clusterName));
+ }
+
+ @Override
+ protected void setup() throws Exception {
+ incrementSetupNumber();
+
+ log.info("--- Starting OneWayReplicatorTestBase::setup ---");
+
+ startZKAndBK();
+
+ startBrokers();
+
+ createDefaultTenantsAndClustersAndNamespace();
+
+ Thread.sleep(100);
+ log.info("--- OneWayReplicatorTestBase::setup completed ---");
+ }
+
+ private void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble bookkeeperEnsemble,
ZookeeperServerTest brokerConfigZk) {
+ config.setClusterName(clusterName);
+ config.setAdvertisedAddress("localhost");
+ config.setWebServicePort(Optional.of(0));
+ config.setWebServicePortTls(Optional.of(0));
+ config.setMetadataStoreUrl("zk:127.0.0.1:" +
bookkeeperEnsemble.getZookeeperPort());
+ config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" +
brokerConfigZk.getZookeeperPort() + "/foo");
+ config.setBrokerDeleteInactiveTopicsEnabled(false);
+ config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
+ config.setBrokerShutdownTimeoutMs(0L);
+ config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ config.setBrokerServicePort(Optional.of(0));
+ config.setBrokerServicePortTls(Optional.of(0));
+ config.setBacklogQuotaCheckIntervalInSeconds(5);
+ config.setDefaultNumberOfNamespaceBundles(1);
+ config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ config.setEnableReplicatedSubscriptions(true);
+ config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ markCurrentSetupNumberCleaned();
+ log.info("--- Shutting down ---");
+
+ stopLocalMetadataStoreAlwaysReconnect();
+
+ // Stop brokers.
+ client.close();
+ admin.close();
+ if (pulsar != null) {
+ pulsar.close();
+ }
+
+ // Stop ZK and BK.
+ bkEnsemble.stop();
+ brokerConfigZk.stop();
+
+ // Reset configs.
+ config = new ServiceConfiguration();
+ setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk);
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 94da382b74d..6d558e70971 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -148,6 +148,11 @@ public interface MetadataCache<T> {
*/
void invalidate(String path);
+ /**
+ * Force the invalidation of all object in the metadata cache.
+ */
+ void invalidateAll();
+
/**
* Invalidate and reload an object in the metadata cache.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 306b6398b5c..be945d988fb 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -51,11 +51,13 @@ import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@Slf4j
public class PulsarRegistrationClient implements RegistrationClient {
- private final MetadataStore store;
+ private final AbstractMetadataStore store;
private final String ledgersRootPath;
// registration paths
private final String bookieRegistrationPath;
@@ -68,10 +70,11 @@ public class PulsarRegistrationClient implements
RegistrationClient {
private final Map<BookieId, Versioned<BookieServiceInfo>>
writableBookieInfo;
private final Map<BookieId, Versioned<BookieServiceInfo>>
readOnlyBookieInfo;
private final FutureUtil.Sequencer<Void> sequencer;
+ private SessionEvent lastMetadataSessionEvent;
public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
- this.store = store;
+ this.store = (AbstractMetadataStore) store;
this.ledgersRootPath = ledgersRootPath;
this.bookieServiceInfoMetadataCache =
store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
this.sequencer = Sequencer.create();
@@ -88,6 +91,7 @@ public class PulsarRegistrationClient implements
RegistrationClient {
.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-registration-client"));
store.registerListener(this::updatedBookies);
+ this.store.registerSessionListener(this::refreshBookies);
}
@Override
@@ -95,6 +99,21 @@ public class PulsarRegistrationClient implements
RegistrationClient {
executor.shutdownNow();
}
+ private void refreshBookies(SessionEvent sessionEvent) {
+ lastMetadataSessionEvent = sessionEvent;
+ if (!SessionEvent.Reconnected.equals(sessionEvent) &&
!SessionEvent.SessionReestablished.equals(sessionEvent)){
+ return;
+ }
+ // Clean caches.
+ store.invalidateCaches(bookieRegistrationPath,
bookieAllRegistrationPath, bookieReadonlyRegistrationPath);
+ bookieServiceInfoMetadataCache.invalidateAll();
+ // Refresh caches of the listeners.
+ getReadOnlyBookies().thenAccept(bookies ->
+ readOnlyBookiesWatchers.forEach(w -> executor.execute(() ->
w.onBookiesChanged(bookies))));
+ getWritableBookies().thenAccept(bookies ->
+ writableBookiesWatchers.forEach(w -> executor.execute(() ->
w.onBookiesChanged(bookies))));
+ }
+
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
return getBookiesThenFreshCache(bookieRegistrationPath);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 4cadf2397a7..cc148c2a311 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Instant;
@@ -523,6 +524,13 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
existsCache.synchronous().invalidateAll();
}
+ public void invalidateCaches(String...paths) {
+ LoadingCache<String, List<String>> loadingCache =
childrenCache.synchronous();
+ for (String path : paths) {
+ loadingCache.invalidate(path);
+ }
+ }
+
/**
* Run the task in the executor thread and fail the future if the executor
is shutting down.
*/