This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b955c6520d8 [fix] [broker] Internal reader of __change_events can not 
started after metadata store session rebuilt (#23018)
b955c6520d8 is described below

commit b955c6520d8db948048a1b2dc548a001ee396076
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 29 23:06:45 2024 +0800

    [fix] [broker] Internal reader of __change_events can not started after 
metadata store session rebuilt (#23018)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |   3 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   3 +-
 .../apache/pulsar/broker/service/Ipv4Proxy.java    | 197 ++++++++++++++
 .../broker/service/NetworkErrorTestBase.java       | 297 +++++++++++++++++++++
 .../pulsar/broker/service/ZkSessionExpireTest.java | 184 +++++++++++++
 .../metadata/impl/AbstractMetadataStore.java       |  10 +
 6 files changed, 692 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index a737a94b998..9450c2a9cc4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -127,7 +127,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     private static final Set<String> INTERNAL_TOPICS =
             Set.of(BROKER_LOAD_DATA_STORE_TOPIC, 
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);
 
-    private PulsarService pulsar;
+    @VisibleForTesting
+    protected PulsarService pulsar;
 
     private ServiceConfiguration conf;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 8f095b7d84d..ada1ab665b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -158,8 +158,9 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     // Policies used to determine which brokers are available for particular 
namespaces.
     private SimpleResourceAllocationPolicies policies;
 
+    @VisibleForTesting
     // Pulsar service used to initialize this.
-    private PulsarService pulsar;
+    protected PulsarService pulsar;
 
     private PulsarResources pulsarResources;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java
new file mode 100644
index 00000000000..a84dab4d17d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+
+public class Ipv4Proxy {
+    @Getter
+    private final int localPort;
+    private final String backendServerHost;
+    private final int backendServerPort;
+    private final EventLoopGroup serverGroup = new NioEventLoopGroup(1);
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+    private ChannelFuture localServerChannel;
+    private ServerBootstrap serverBootstrap = new ServerBootstrap();
+    private List<Channel> frontChannels = Collections.synchronizedList(new 
ArrayList<>());
+    private AtomicBoolean rejectAllConnections = new AtomicBoolean();
+
+    public Ipv4Proxy(int localPort, String backendServerHost, int 
backendServerPort) {
+        this.localPort = localPort;
+        this.backendServerHost = backendServerHost;
+        this.backendServerPort = backendServerPort;
+    }
+
+    public synchronized void startup() throws InterruptedException {
+        localServerChannel = serverBootstrap.group(serverGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .handler(new LoggingHandler(LogLevel.INFO))
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel ch) {
+                    ch.pipeline().addLast(new FrontendHandler());
+                }
+            }).childOption(ChannelOption.AUTO_READ, false)
+            .bind(localPort).sync();
+    }
+
+    public synchronized void stop() throws InterruptedException{
+        localServerChannel.channel().close().sync();
+        serverGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+    }
+
+    private static void closeOnFlush(Channel ch) {
+        if (ch.isActive()) {
+            
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+
+    public void disconnectFrontChannels() throws InterruptedException {
+        for (Channel channel : frontChannels) {
+            channel.close();
+        }
+    }
+
+    public void rejectAllConnections() throws InterruptedException {
+        rejectAllConnections.set(true);
+    }
+
+    public void unRejectAllConnections() throws InterruptedException {
+        rejectAllConnections.set(false);
+    }
+
+    private class FrontendHandler extends ChannelInboundHandlerAdapter {
+
+        private Channel backendChannel;
+
+        @Override
+        public void channelActive(ChannelHandlerContext ctx) {
+            if (rejectAllConnections.get()) {
+                ctx.close();
+                return;
+            }
+            final Channel frontendChannel = ctx.channel();
+            frontChannels.add(frontendChannel);
+            Bootstrap backendBootstrap = new Bootstrap();
+            backendBootstrap.group(frontendChannel.eventLoop())
+                    .channel(ctx.channel().getClass())
+                    .handler(new BackendHandler(frontendChannel))
+                    .option(ChannelOption.AUTO_READ, false);
+            ChannelFuture backendChannelFuture =
+                    backendBootstrap.connect(Ipv4Proxy.this.backendServerHost, 
Ipv4Proxy.this.backendServerPort);
+            backendChannel = backendChannelFuture.channel();
+            backendChannelFuture.addListener((ChannelFutureListener) future -> 
{
+                if (future.isSuccess()) {
+                    frontendChannel.read();
+                } else {
+                    frontChannels.remove(frontendChannel);
+                    frontendChannel.close();
+                }
+            });
+        }
+
+        @Override
+        public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+            if (backendChannel.isActive()) {
+                
backendChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> 
{
+                    if (future.isSuccess()) {
+                        ctx.channel().read();
+                    } else {
+                        future.channel().close();
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) {
+            frontChannels.remove(ctx.channel());
+            if (backendChannel != null) {
+                closeOnFlush(backendChannel);
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+            cause.printStackTrace();
+            closeOnFlush(ctx.channel());
+        }
+    }
+
+    private class BackendHandler extends ChannelInboundHandlerAdapter {
+
+        private final Channel frontendChannel;
+
+        public BackendHandler(Channel inboundChannel) {
+            this.frontendChannel = inboundChannel;
+        }
+
+        @Override
+        public void channelActive(ChannelHandlerContext ctx) {
+            if (!frontendChannel.isActive()) {
+                closeOnFlush(ctx.channel());
+            } else {
+                ctx.read();
+            }
+        }
+
+        @Override
+        public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+            
frontendChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future 
-> {
+                if (future.isSuccess()) {
+                    ctx.channel().read();
+                } else {
+                    future.channel().close();
+                }
+            });
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) {
+            closeOnFlush(frontendChannel);
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+            cause.printStackTrace();
+            closeOnFlush(ctx.channel());
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
new file mode 100644
index 00000000000..36f8cb47612
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.reflect.WhiteboxImpl;
+
+@Slf4j
+public abstract class NetworkErrorTestBase extends TestRetrySupport {
+
+    protected final String defaultTenant = "public";
+    protected final String defaultNamespace = defaultTenant + "/default";
+    protected final String cluster1 = "r1";
+    protected URL url1;
+    protected URL urlTls1;
+    protected URL url2;
+    protected URL urlTls2;
+    protected ServiceConfiguration config1 = new ServiceConfiguration();
+    protected ServiceConfiguration config2 = new ServiceConfiguration();
+    protected ZookeeperServerTest brokerConfigZk1;
+    protected Ipv4Proxy metadataZKProxy;
+    protected LocalBookkeeperEnsemble bkEnsemble1;
+    protected PulsarService pulsar1;
+    protected PulsarService pulsar2;
+    protected BrokerService broker1;
+    protected BrokerService broker2;
+    protected PulsarAdmin admin1;
+    protected PulsarAdmin admin2;
+    protected PulsarClient client1;
+    protected PulsarClient client2;
+
+    private final static AtomicReference<String> preferBroker = new 
AtomicReference<>();
+
+    protected void startZKAndBK() throws Exception {
+        // Start ZK & BK.
+        bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble1.start();
+
+        metadataZKProxy = new Ipv4Proxy(getOneFreePort(), "127.0.0.1", 
bkEnsemble1.getZookeeperPort());
+        metadataZKProxy.startup();
+    }
+
+    protected void startBrokers() throws Exception {
+        // Start brokers.
+        setConfigDefaults(config1, cluster1, metadataZKProxy.getLocalPort());
+        pulsar1 = new PulsarService(config1);
+        pulsar1.start();
+        broker1 = pulsar1.getBrokerService();
+        url1 = new URL(pulsar1.getWebServiceAddress());
+        urlTls1 = new URL(pulsar1.getWebServiceAddressTls());
+
+        setConfigDefaults(config2, cluster1, bkEnsemble1.getZookeeperPort());
+        pulsar2 = new PulsarService(config2);
+        pulsar2.start();
+        broker2 = pulsar2.getBrokerService();
+        url2 = new URL(pulsar2.getWebServiceAddress());
+        urlTls2 = new URL(pulsar2.getWebServiceAddressTls());
+
+        log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(), 
broker2.getListenPort());
+    }
+
+    protected int getOneFreePort() throws IOException {
+        ServerSocket serverSocket = new ServerSocket(0);
+        int port = serverSocket.getLocalPort();
+        serverSocket.close();
+        return port;
+    }
+
+    protected void startAdminClient() throws Exception {
+        admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
+        admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
+    }
+
+    protected void startPulsarClient() throws Exception{
+        ClientBuilder clientBuilder1 = 
PulsarClient.builder().serviceUrl(url1.toString());
+        client1 = initClient(clientBuilder1);
+        ClientBuilder clientBuilder2 = 
PulsarClient.builder().serviceUrl(url2.toString());
+        client2 = initClient(clientBuilder2);
+    }
+
+    protected void createDefaultTenantsAndClustersAndNamespace() throws 
Exception {
+        admin1.clusters().createCluster(cluster1, ClusterData.builder()
+                .serviceUrl(url1.toString())
+                .serviceUrlTls(urlTls1.toString())
+                .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+                .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+                .brokerClientTlsEnabled(false)
+                .build());
+        admin1.tenants().createTenant(defaultTenant, new 
TenantInfoImpl(Collections.emptySet(),
+                Sets.newHashSet(cluster1)));
+        admin1.namespaces().createNamespace(defaultNamespace, 
Sets.newHashSet(cluster1));
+    }
+
+    @Override
+    protected void setup() throws Exception {
+        incrementSetupNumber();
+
+        log.info("--- Starting OneWayReplicatorTestBase::setup ---");
+
+        startZKAndBK();
+
+        startBrokers();
+
+        startAdminClient();
+
+        createDefaultTenantsAndClustersAndNamespace();
+
+        startPulsarClient();
+
+        Thread.sleep(100);
+        log.info("--- OneWayReplicatorTestBase::setup completed ---");
+    }
+
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName, int zkPort) {
+        config.setClusterName(clusterName);
+        config.setAdvertisedAddress("localhost");
+        config.setWebServicePort(Optional.of(0));
+        config.setWebServicePortTls(Optional.of(0));
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
+        config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + zkPort + 
"/config_meta");
+        config.setBrokerDeleteInactiveTopicsEnabled(false);
+        config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        config.setBrokerServicePort(Optional.of(0));
+        config.setBrokerServicePortTls(Optional.of(0));
+        config.setBacklogQuotaCheckIntervalInSeconds(5);
+        config.setDefaultNumberOfNamespaceBundles(1);
+        config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+        config.setEnableReplicatedSubscriptions(true);
+        config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+        config.setLoadBalancerSheddingEnabled(false);
+        config.setForceDeleteNamespaceAllowed(true);
+        
config.setLoadManagerClassName(PreferBrokerModularLoadManager.class.getName());
+        config.setMetadataStoreSessionTimeoutMillis(5000);
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        // shutdown.
+        markCurrentSetupNumberCleaned();
+        log.info("--- Shutting down ---");
+
+        // Stop brokers.
+        if (client1 != null) {
+            client1.close();
+            client1 = null;
+        }
+        if (admin1 != null) {
+            admin1.close();
+            admin1 = null;
+        }
+        if (client2 != null) {
+            client2.close();
+            client2 = null;
+        }
+        if (admin2 != null) {
+            admin2.close();
+            admin2 = null;
+        }
+        if (pulsar1 != null) {
+            pulsar1.close();
+            pulsar1 = null;
+        }
+        if (pulsar2 != null) {
+            pulsar2.close();
+            pulsar2 = null;
+        }
+
+        // Stop ZK and BK.
+        if (bkEnsemble1 != null) {
+            bkEnsemble1.stop();
+            bkEnsemble1 = null;
+        }
+        if (metadataZKProxy != null) {
+            metadataZKProxy.stop();
+        }
+        if (brokerConfigZk1 != null) {
+            brokerConfigZk1.stop();
+            brokerConfigZk1 = null;
+        }
+
+        // Reset configs.
+        config1 = new ServiceConfiguration();
+        preferBroker.set(null);
+    }
+
+    protected PulsarClient initClient(ClientBuilder clientBuilder) throws 
Exception {
+        return clientBuilder.build();
+    }
+
+    protected static class PreferBrokerModularLoadManager extends 
ModularLoadManagerImpl {
+
+        @Override
+        public String setNamespaceBundleAffinity(String bundle, String broker) 
{
+            if (StringUtils.isNotBlank(broker)) {
+                return broker;
+            }
+            Set<String> availableBrokers = 
NetworkErrorTestBase.getAvailableBrokers(super.pulsar);
+            String prefer = preferBroker.get();
+            if (availableBrokers.contains(prefer)) {
+                return prefer;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    protected static class PreferExtensibleLoadManager extends 
ExtensibleLoadManagerImpl {
+
+        @Override
+        public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle,
+                                                               Set<String> 
excludeBrokerSet,
+                                                               LookupOptions 
options) {
+            Set<String> availableBrokers = 
NetworkErrorTestBase.getAvailableBrokers(super.pulsar);
+            String prefer = preferBroker.get();
+            if (availableBrokers.contains(prefer)) {
+                return CompletableFuture.completedFuture(Optional.of(prefer));
+            } else {
+                return super.selectAsync(bundle, excludeBrokerSet, options);
+            }
+        }
+    }
+
+    public void setPreferBroker(PulsarService target) {
+        for (PulsarService pulsar : Arrays.asList(pulsar1, pulsar2)) {
+            for (String broker : getAvailableBrokers(pulsar)) {
+                if (broker.endsWith(target.getBrokerListenPort().orElse(-1) + 
"")
+                        || 
broker.endsWith(target.getListenPortHTTPS().orElse(-1) + "")
+                        || 
broker.endsWith(target.getListenPortHTTP().orElse(-1) + "")
+                        || 
broker.endsWith(target.getBrokerListenPortTls().orElse(-1) + "")) {
+                    preferBroker.set(broker);
+                }
+            }
+        }
+    }
+
+    public static Set<String> getAvailableBrokers(PulsarService pulsar) {
+        Object loadManagerWrapper = pulsar.getLoadManager().get();
+        Object loadManager = WhiteboxImpl.getInternalState(loadManagerWrapper, 
"loadManager");
+        if (loadManager instanceof ModularLoadManagerImpl) {
+            return ((ModularLoadManagerImpl) 
loadManager).getAvailableBrokers();
+        } else if (loadManager instanceof ExtensibleLoadManagerImpl) {
+            return new HashSet<>(((ExtensibleLoadManagerImpl) 
loadManager).getBrokerRegistry()
+                    .getAvailableBrokersAsync().join());
+        } else {
+            throw new RuntimeException("Not support for the load manager: " + 
loadManager.getClass().getName());
+        }
+    }
+
+    public void clearPreferBroker() {
+        preferBroker.set(null);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
new file mode 100644
index 00000000000..143557b008b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ZkSessionExpireTest extends NetworkErrorTestBase {
+
+    private java.util.function.Consumer<ServiceConfiguration> settings;
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    private void 
setupWithSettings(java.util.function.Consumer<ServiceConfiguration> settings) 
throws Exception {
+        this.settings = settings;
+        super.setup();
+    }
+
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName, int zkPort) {
+        super.setConfigDefaults(config, clusterName, zkPort);
+        settings.accept(config);
+    }
+
+    @DataProvider(name = "settings")
+    public Object[][] settings() {
+        return new Object[][]{
+            {false, NetworkErrorTestBase.PreferBrokerModularLoadManager.class},
+            {true, NetworkErrorTestBase.PreferBrokerModularLoadManager.class}
+            // Create a separate PR to add this test case.
+            // {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class}.
+        };
+    }
+
+    @Test(timeOut = 600 * 1000, dataProvider = "settings")
+    public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, 
Class loadManager) throws Exception {
+        // Setup.
+        setupWithSettings(config -> {
+            config.setManagedLedgerMaxEntriesPerLedger(1);
+            config.setSystemTopicEnabled(enableSystemTopic);
+            config.setTopicLevelPoliciesEnabled(enableSystemTopic);
+            if (loadManager != null) {
+                config.setLoadManagerClassName(loadManager.getName());
+            }
+        });
+
+        // Init topic.
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp");
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin1.topics().createSubscription(topicName, "s1", 
MessageId.earliest);
+
+        // Inject a prefer mechanism, so that all topics will be assigned to 
broker1, which can be injected a ZK
+        // session expire error.
+        setPreferBroker(pulsar1);
+        admin1.namespaces().unload(defaultNamespace);
+        admin2.namespaces().unload(defaultNamespace);
+
+        // Confirm all brokers registered.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(getAvailableBrokers(pulsar1).size(), 2);
+            assertEquals(getAvailableBrokers(pulsar2).size(), 2);
+        });
+
+        // Load up a topic, and it will be assigned to broker1.
+        ProducerImpl<String> p1 = (ProducerImpl<String>) 
client1.newProducer(Schema.STRING).topic(topicName)
+                .sendTimeout(10, TimeUnit.SECONDS).create();
+        Topic broker1Topic1 = pulsar1.getBrokerService().getTopic(topicName, 
false).join().get();
+        assertNotNull(broker1Topic1);
+        clearPreferBroker();
+
+        // Inject a ZK session expire error, and wait for broker1 to offline.
+        metadataZKProxy.rejectAllConnections();
+        metadataZKProxy.disconnectFrontChannels();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(getAvailableBrokers(pulsar2).size(), 1);
+        });
+
+        // Send messages continuously.
+        // Verify: the topic was transferred to broker2.
+        CompletableFuture<MessageId> broker1Send1 = 
p1.sendAsync("broker1_msg1");
+        Producer<String> p2 = 
client2.newProducer(Schema.STRING).topic(topicName)
+                .sendTimeout(10, TimeUnit.SECONDS).create();
+        CompletableFuture<MessageId> broker2Send1 = 
p2.sendAsync("broker2_msg1");
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Optional<Topic>> future = 
pulsar2.getBrokerService().getTopic(topicName, false);
+            assertNotNull(future);
+            assertTrue(future.isDone() && !future.isCompletedExceptionally());
+            Optional<Topic> optional = future.join();
+            assertTrue(optional != null && !optional.isEmpty());
+        });
+
+        // Both two brokers assumed they are the owner of the topic.
+        Topic broker1Topic2 = pulsar1.getBrokerService().getTopic(topicName, 
false).join().get();
+        Topic broker2Topic2 = pulsar2.getBrokerService().getTopic(topicName, 
false).join().get();
+        assertNotNull(broker1Topic2);
+        assertNotNull(broker2Topic2);
+
+        // Send messages continuously.
+        // Publishing to broker-1 will fail.
+        // Publishing to broker-2 will success.
+        CompletableFuture<MessageId> broker1Send2 = 
p1.sendAsync("broker1_msg2");
+        CompletableFuture<MessageId> broker2Send2 = 
p2.sendAsync("broker2_msg2");
+        try {
+            broker1Send1.join();
+            broker1Send2.join();
+            p1.getClientCnx();
+            fail("expected a publish error");
+        } catch (Exception ex) {
+            // Expected.
+        }
+        broker2Send1.join();
+        broker2Send2.join();
+
+        // Broker rebuild ZK session.
+        metadataZKProxy.unRejectAllConnections();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(getAvailableBrokers(pulsar1).size(), 2);
+            assertEquals(getAvailableBrokers(pulsar2).size(), 2);
+        });
+
+        // Verify: the topic on broker-1 will be unloaded.
+        // Verify: the topic on broker-2 is fine.
+        Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+            CompletableFuture<Optional<Topic>> future = 
pulsar1.getBrokerService().getTopic(topicName, false);
+            assertTrue(future == null || future.isCompletedExceptionally());
+        });
+        Topic broker2Topic3 = pulsar2.getBrokerService().getTopic(topicName, 
false).join().get();
+        assertNotNull(broker2Topic3);
+
+        // Send messages continuously.
+        // Verify: p1.send will success(it will connect to broker-2).
+        // Verify: p2.send will success.
+        CompletableFuture<MessageId> broker1Send3 = 
p1.sendAsync("broker1_msg3");
+        CompletableFuture<MessageId> broker2Send3 = 
p2.sendAsync("broker2_msg3");
+        broker1Send3.join();
+        broker2Send3.join();
+
+        long msgBacklog = 
admin2.topics().getStats(topicName).getSubscriptions().get("s1").getMsgBacklog();
+        log.info("msgBacklog: {}", msgBacklog);
+
+        // cleanup.
+        p1.close();
+        p2.close();
+        admin2.topics().delete(topicName, false);
+    }
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f35f1974632..c458d0da214 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -496,6 +496,16 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected void receivedSessionEvent(SessionEvent event) {
         isConnected = event.isConnected();
+
+        // Clear cache after session expired.
+        if (event == SessionEvent.SessionReestablished || event == 
SessionEvent.Reconnected) {
+            for (MetadataCacheImpl metadataCache : metadataCaches) {
+                metadataCache.invalidateAll();
+            }
+            invalidateAll();
+        }
+
+        // Notice listeners.
         try {
             executor.execute(() -> {
                 sessionListeners.forEach(l -> {

Reply via email to