This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5f999251ed4 [fix] [bk] Correctct the bookie info after ZK client is 
reconnected (#21035)
5f999251ed4 is described below

commit 5f999251ed44bc31caf0cedc6511a790cbfe83ba
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 25 19:21:21 2023 +0800

    [fix] [bk] Correctct the bookie info after ZK client is reconnected (#21035)
    
    Motivation: After [PIP-118: reconnect broker when ZooKeeper session 
expires](https://github.com/apache/pulsar/pull/13341), the Broker will not shut 
down after losing the connection of the local metadata store in the default 
configuration. However, before the ZK client is reconnected, the events of BK 
online and offline are lost, resulting in incorrect BK info in the memory. You 
can reproduce the issue by the test `BkEnsemblesChaosTest. 
testBookieInfoIsCorrectEvenIfLostNotificationDueT [...]
    
    Modifications: Refresh BK info in memory after the ZK client is reconnected.
    (cherry picked from commit db20035bba9eb21a6b3e4e6752ab716b9df35d80)
---
 .../broker/service/BkEnsemblesChaosTest.java       |  71 +++++++
 .../CanReconnectZKClientPulsarServiceBaseTest.java | 215 +++++++++++++++++++++
 .../apache/pulsar/metadata/api/MetadataCache.java  |   5 +
 .../bookkeeper/PulsarRegistrationClient.java       |  23 ++-
 .../metadata/impl/AbstractMetadataStore.java       |   8 +
 5 files changed, 320 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
new file mode 100644
index 00000000000..d49489d8a84
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesChaosTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class BkEnsemblesChaosTest extends 
CanReconnectZKClientPulsarServiceBaseTest {
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Test
+    public void 
testBookieInfoIsCorrectEvenIfLostNotificationDueToZKClientReconnect() throws 
Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+        final byte[] msgValue = "test".getBytes();
+        admin.topics().createNonPartitionedTopic(topicName);
+        // Ensure broker works.
+        Producer<byte[]> producer1 = 
client.newProducer().topic(topicName).create();
+        producer1.send(msgValue);
+        producer1.close();
+        admin.topics().unload(topicName);
+
+        // Restart some bookies, which triggers the ZK node of Bookie deleted 
and created.
+        // And make the local metadata store reconnect to lose some 
notification of the ZK node change.
+        for (int i = 0; i < numberOfBookies - 1; i++){
+            bkEnsemble.stopBK(i);
+        }
+        makeLocalMetadataStoreKeepReconnect();
+        for (int i = 0; i < numberOfBookies - 1; i++){
+            bkEnsemble.startBK(i);
+        }
+        // Sleep 100ms to lose the notifications of ZK node create.
+        Thread.sleep(100);
+        stopLocalMetadataStoreAlwaysReconnect();
+
+        // Ensure broker still works.
+        admin.topics().unload(topicName);
+        Producer<byte[]> producer2 = 
client.newProducer().topic(topicName).create();
+        producer2.send(msgValue);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
new file mode 100644
index 00000000000..bc6df685ffc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import io.netty.channel.Channel;
+import java.net.URL;
+import java.nio.channels.SelectionKey;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.apache.zookeeper.ClientCnxn;
+import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.reflect.WhiteboxImpl;
+
+@Slf4j
+public abstract class CanReconnectZKClientPulsarServiceBaseTest extends 
TestRetrySupport {
+
+    protected final String defaultTenant = "public";
+    protected final String defaultNamespace = defaultTenant + "/default";
+    protected int numberOfBookies = 3;
+    protected final String clusterName = "r1";
+    protected URL url;
+    protected URL urlTls;
+    protected ServiceConfiguration config = new ServiceConfiguration();
+    protected ZookeeperServerTest brokerConfigZk;
+    protected LocalBookkeeperEnsemble bkEnsemble;
+    protected PulsarService pulsar;
+    protected BrokerService broker;
+    protected PulsarAdmin admin;
+    protected PulsarClient client;
+    protected ZooKeeper localZkOfBroker;
+    protected Object localMetaDataStoreClientCnx;
+    protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = 
new AtomicBoolean();
+    protected void startZKAndBK() throws Exception {
+        // Start ZK.
+        brokerConfigZk = new ZookeeperServerTest(0);
+        brokerConfigZk.start();
+
+        // Start BK.
+        bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0);
+        bkEnsemble.start();
+    }
+
+    protected void startBrokers() throws Exception {
+        // Start brokers.
+        setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk);
+        pulsar = new PulsarService(config);
+        pulsar.start();
+        broker = pulsar.getBrokerService();
+        ZKMetadataStore zkMetadataStore = (ZKMetadataStore) 
pulsar.getLocalMetadataStore();
+        localZkOfBroker = zkMetadataStore.getZkClient();
+        ClientCnxn cnxn = WhiteboxImpl.getInternalState(localZkOfBroker, 
"cnxn");
+        Object sendThread = WhiteboxImpl.getInternalState(cnxn, "sendThread");
+        localMetaDataStoreClientCnx = 
WhiteboxImpl.getInternalState(sendThread, "clientCnxnSocket");
+
+        url = new URL(pulsar.getWebServiceAddress());
+        urlTls = new URL(pulsar.getWebServiceAddressTls());
+        admin = PulsarAdmin.builder().serviceHttpUrl(url.toString()).build();
+        client = PulsarClient.builder().serviceUrl(url.toString()).build();
+    }
+
+    protected void makeLocalMetadataStoreKeepReconnect() throws Exception {
+        if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false, 
true)) {
+            throw new RuntimeException("Local metadata store is already 
keeping reconnect");
+        }
+        if 
(localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO"))
 {
+            makeLocalMetadataStoreKeepReconnectNIO();
+        } else {
+            // ClientCnxnSocketNetty.
+            makeLocalMetadataStoreKeepReconnectNetty();
+        }
+    }
+
+    protected void makeLocalMetadataStoreKeepReconnectNIO() {
+        new Thread(() -> {
+            while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+                try {
+                    SelectionKey sockKey = 
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey");
+                    if (sockKey != null) {
+                        sockKey.channel().close();
+                    }
+                    // Prevents high cpu usage.
+                    Thread.sleep(5);
+                } catch (Exception e) {
+                    log.error("Try close the ZK connection of local metadata 
store failed: {}", e.toString());
+                }
+            }
+        }).start();
+    }
+
+    protected void makeLocalMetadataStoreKeepReconnectNetty() {
+        new Thread(() -> {
+            while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+                try {
+                    Channel channel = 
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "channel");
+                    if (channel != null) {
+                        channel.close();
+                    }
+                    // Prevents high cpu usage.
+                    Thread.sleep(5);
+                } catch (Exception e) {
+                    log.error("Try close the ZK connection of local metadata 
store failed: {}", e.toString());
+                }
+            }
+        }).start();
+    }
+
+    protected void stopLocalMetadataStoreAlwaysReconnect() {
+        LocalMetadataStoreInReconnectFinishSignal.set(false);
+    }
+
+    protected void createDefaultTenantsAndClustersAndNamespace() throws 
Exception {
+        admin.clusters().createCluster(clusterName, ClusterData.builder()
+                .serviceUrl(url.toString())
+                .serviceUrlTls(urlTls.toString())
+                .brokerServiceUrl(pulsar.getBrokerServiceUrl())
+                .brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(false)
+                .build());
+
+        admin.tenants().createTenant(defaultTenant, new 
TenantInfoImpl(Collections.emptySet(),
+                Sets.newHashSet(clusterName)));
+
+        admin.namespaces().createNamespace(defaultNamespace, 
Sets.newHashSet(clusterName));
+    }
+
+    @Override
+    protected void setup() throws Exception {
+        incrementSetupNumber();
+
+        log.info("--- Starting OneWayReplicatorTestBase::setup ---");
+
+        startZKAndBK();
+
+        startBrokers();
+
+        createDefaultTenantsAndClustersAndNamespace();
+
+        Thread.sleep(100);
+        log.info("--- OneWayReplicatorTestBase::setup completed ---");
+    }
+
+    private void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
+                                   LocalBookkeeperEnsemble bookkeeperEnsemble, 
ZookeeperServerTest brokerConfigZk) {
+        config.setClusterName(clusterName);
+        config.setAdvertisedAddress("localhost");
+        config.setWebServicePort(Optional.of(0));
+        config.setWebServicePortTls(Optional.of(0));
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bookkeeperEnsemble.getZookeeperPort());
+        config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + 
brokerConfigZk.getZookeeperPort() + "/foo");
+        config.setBrokerDeleteInactiveTopicsEnabled(false);
+        config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        config.setBrokerServicePort(Optional.of(0));
+        config.setBrokerServicePortTls(Optional.of(0));
+        config.setBacklogQuotaCheckIntervalInSeconds(5);
+        config.setDefaultNumberOfNamespaceBundles(1);
+        config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+        config.setEnableReplicatedSubscriptions(true);
+        config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        markCurrentSetupNumberCleaned();
+        log.info("--- Shutting down ---");
+
+        stopLocalMetadataStoreAlwaysReconnect();
+
+        // Stop brokers.
+        client.close();
+        admin.close();
+        if (pulsar != null) {
+            pulsar.close();
+        }
+
+        // Stop ZK and BK.
+        bkEnsemble.stop();
+        brokerConfigZk.stop();
+
+        // Reset configs.
+        config = new ServiceConfiguration();
+        setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk);
+    }
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 94da382b74d..6d558e70971 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -148,6 +148,11 @@ public interface MetadataCache<T> {
      */
     void invalidate(String path);
 
+    /**
+     * Force the invalidation of all object in the metadata cache.
+     */
+    void invalidateAll();
+
     /**
      * Invalidate and reload an object in the metadata cache.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 306b6398b5c..be945d988fb 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -51,11 +51,13 @@ import org.apache.pulsar.metadata.api.CacheGetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 
 @Slf4j
 public class PulsarRegistrationClient implements RegistrationClient {
 
-    private final MetadataStore store;
+    private final AbstractMetadataStore store;
     private final String ledgersRootPath;
     // registration paths
     private final String bookieRegistrationPath;
@@ -68,10 +70,11 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
     private final Map<BookieId, Versioned<BookieServiceInfo>> 
writableBookieInfo;
     private final Map<BookieId, Versioned<BookieServiceInfo>> 
readOnlyBookieInfo;
     private final FutureUtil.Sequencer<Void> sequencer;
+    private SessionEvent lastMetadataSessionEvent;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
-        this.store = store;
+        this.store = (AbstractMetadataStore) store;
         this.ledgersRootPath = ledgersRootPath;
         this.bookieServiceInfoMetadataCache = 
store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
         this.sequencer = Sequencer.create();
@@ -88,6 +91,7 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-registration-client"));
 
         store.registerListener(this::updatedBookies);
+        this.store.registerSessionListener(this::refreshBookies);
     }
 
     @Override
@@ -95,6 +99,21 @@ public class PulsarRegistrationClient implements 
RegistrationClient {
         executor.shutdownNow();
     }
 
+    private void refreshBookies(SessionEvent sessionEvent) {
+        lastMetadataSessionEvent = sessionEvent;
+        if (!SessionEvent.Reconnected.equals(sessionEvent) && 
!SessionEvent.SessionReestablished.equals(sessionEvent)){
+            return;
+        }
+        // Clean caches.
+        store.invalidateCaches(bookieRegistrationPath, 
bookieAllRegistrationPath, bookieReadonlyRegistrationPath);
+        bookieServiceInfoMetadataCache.invalidateAll();
+        // Refresh caches of the listeners.
+        getReadOnlyBookies().thenAccept(bookies ->
+                readOnlyBookiesWatchers.forEach(w -> executor.execute(() -> 
w.onBookiesChanged(bookies))));
+        getWritableBookies().thenAccept(bookies ->
+                writableBookiesWatchers.forEach(w -> executor.execute(() -> 
w.onBookiesChanged(bookies))));
+    }
+
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
         return getBookiesThenFreshCache(bookieRegistrationPath);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 4cadf2397a7..cc148c2a311 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.Instant;
@@ -523,6 +524,13 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         existsCache.synchronous().invalidateAll();
     }
 
+    public void invalidateCaches(String...paths) {
+        LoadingCache<String, List<String>> loadingCache = 
childrenCache.synchronous();
+        for (String path : paths) {
+            loadingCache.invalidate(path);
+        }
+    }
+
     /**
      * Run the task in the executor thread and fail the future if the executor 
is shutting down.
      */

Reply via email to