This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b955c6520d8 [fix] [broker] Internal reader of __change_events can not
started after metadata store session rebuilt (#23018)
b955c6520d8 is described below
commit b955c6520d8db948048a1b2dc548a001ee396076
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 29 23:06:45 2024 +0800
[fix] [broker] Internal reader of __change_events can not started after
metadata store session rebuilt (#23018)
---
.../extensions/ExtensibleLoadManagerImpl.java | 3 +-
.../loadbalance/impl/ModularLoadManagerImpl.java | 3 +-
.../apache/pulsar/broker/service/Ipv4Proxy.java | 197 ++++++++++++++
.../broker/service/NetworkErrorTestBase.java | 297 +++++++++++++++++++++
.../pulsar/broker/service/ZkSessionExpireTest.java | 184 +++++++++++++
.../metadata/impl/AbstractMetadataStore.java | 10 +
6 files changed, 692 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index a737a94b998..9450c2a9cc4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -127,7 +127,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
private static final Set<String> INTERNAL_TOPICS =
Set.of(BROKER_LOAD_DATA_STORE_TOPIC,
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);
- private PulsarService pulsar;
+ @VisibleForTesting
+ protected PulsarService pulsar;
private ServiceConfiguration conf;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 8f095b7d84d..ada1ab665b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -158,8 +158,9 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// Policies used to determine which brokers are available for particular
namespaces.
private SimpleResourceAllocationPolicies policies;
+ @VisibleForTesting
// Pulsar service used to initialize this.
- private PulsarService pulsar;
+ protected PulsarService pulsar;
private PulsarResources pulsarResources;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java
new file mode 100644
index 00000000000..a84dab4d17d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+
+public class Ipv4Proxy {
+ @Getter
+ private final int localPort;
+ private final String backendServerHost;
+ private final int backendServerPort;
+ private final EventLoopGroup serverGroup = new NioEventLoopGroup(1);
+ private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private ChannelFuture localServerChannel;
+ private ServerBootstrap serverBootstrap = new ServerBootstrap();
+ private List<Channel> frontChannels = Collections.synchronizedList(new
ArrayList<>());
+ private AtomicBoolean rejectAllConnections = new AtomicBoolean();
+
+ public Ipv4Proxy(int localPort, String backendServerHost, int
backendServerPort) {
+ this.localPort = localPort;
+ this.backendServerHost = backendServerHost;
+ this.backendServerPort = backendServerPort;
+ }
+
+ public synchronized void startup() throws InterruptedException {
+ localServerChannel = serverBootstrap.group(serverGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ch.pipeline().addLast(new FrontendHandler());
+ }
+ }).childOption(ChannelOption.AUTO_READ, false)
+ .bind(localPort).sync();
+ }
+
+ public synchronized void stop() throws InterruptedException{
+ localServerChannel.channel().close().sync();
+ serverGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+
+ private static void closeOnFlush(Channel ch) {
+ if (ch.isActive()) {
+
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ public void disconnectFrontChannels() throws InterruptedException {
+ for (Channel channel : frontChannels) {
+ channel.close();
+ }
+ }
+
+ public void rejectAllConnections() throws InterruptedException {
+ rejectAllConnections.set(true);
+ }
+
+ public void unRejectAllConnections() throws InterruptedException {
+ rejectAllConnections.set(false);
+ }
+
+ private class FrontendHandler extends ChannelInboundHandlerAdapter {
+
+ private Channel backendChannel;
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ if (rejectAllConnections.get()) {
+ ctx.close();
+ return;
+ }
+ final Channel frontendChannel = ctx.channel();
+ frontChannels.add(frontendChannel);
+ Bootstrap backendBootstrap = new Bootstrap();
+ backendBootstrap.group(frontendChannel.eventLoop())
+ .channel(ctx.channel().getClass())
+ .handler(new BackendHandler(frontendChannel))
+ .option(ChannelOption.AUTO_READ, false);
+ ChannelFuture backendChannelFuture =
+ backendBootstrap.connect(Ipv4Proxy.this.backendServerHost,
Ipv4Proxy.this.backendServerPort);
+ backendChannel = backendChannelFuture.channel();
+ backendChannelFuture.addListener((ChannelFutureListener) future ->
{
+ if (future.isSuccess()) {
+ frontendChannel.read();
+ } else {
+ frontChannels.remove(frontendChannel);
+ frontendChannel.close();
+ }
+ });
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+ if (backendChannel.isActive()) {
+
backendChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future ->
{
+ if (future.isSuccess()) {
+ ctx.channel().read();
+ } else {
+ future.channel().close();
+ }
+ });
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ frontChannels.remove(ctx.channel());
+ if (backendChannel != null) {
+ closeOnFlush(backendChannel);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
+ cause.printStackTrace();
+ closeOnFlush(ctx.channel());
+ }
+ }
+
+ private class BackendHandler extends ChannelInboundHandlerAdapter {
+
+ private final Channel frontendChannel;
+
+ public BackendHandler(Channel inboundChannel) {
+ this.frontendChannel = inboundChannel;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ if (!frontendChannel.isActive()) {
+ closeOnFlush(ctx.channel());
+ } else {
+ ctx.read();
+ }
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+
frontendChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future
-> {
+ if (future.isSuccess()) {
+ ctx.channel().read();
+ } else {
+ future.channel().close();
+ }
+ });
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ closeOnFlush(frontendChannel);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
+ cause.printStackTrace();
+ closeOnFlush(ctx.channel());
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
new file mode 100644
index 00000000000..36f8cb47612
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.tests.TestRetrySupport;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.reflect.WhiteboxImpl;
+
+@Slf4j
+public abstract class NetworkErrorTestBase extends TestRetrySupport {
+
+ protected final String defaultTenant = "public";
+ protected final String defaultNamespace = defaultTenant + "/default";
+ protected final String cluster1 = "r1";
+ protected URL url1;
+ protected URL urlTls1;
+ protected URL url2;
+ protected URL urlTls2;
+ protected ServiceConfiguration config1 = new ServiceConfiguration();
+ protected ServiceConfiguration config2 = new ServiceConfiguration();
+ protected ZookeeperServerTest brokerConfigZk1;
+ protected Ipv4Proxy metadataZKProxy;
+ protected LocalBookkeeperEnsemble bkEnsemble1;
+ protected PulsarService pulsar1;
+ protected PulsarService pulsar2;
+ protected BrokerService broker1;
+ protected BrokerService broker2;
+ protected PulsarAdmin admin1;
+ protected PulsarAdmin admin2;
+ protected PulsarClient client1;
+ protected PulsarClient client2;
+
+ private final static AtomicReference<String> preferBroker = new
AtomicReference<>();
+
+ protected void startZKAndBK() throws Exception {
+ // Start ZK & BK.
+ bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble1.start();
+
+ metadataZKProxy = new Ipv4Proxy(getOneFreePort(), "127.0.0.1",
bkEnsemble1.getZookeeperPort());
+ metadataZKProxy.startup();
+ }
+
+ protected void startBrokers() throws Exception {
+ // Start brokers.
+ setConfigDefaults(config1, cluster1, metadataZKProxy.getLocalPort());
+ pulsar1 = new PulsarService(config1);
+ pulsar1.start();
+ broker1 = pulsar1.getBrokerService();
+ url1 = new URL(pulsar1.getWebServiceAddress());
+ urlTls1 = new URL(pulsar1.getWebServiceAddressTls());
+
+ setConfigDefaults(config2, cluster1, bkEnsemble1.getZookeeperPort());
+ pulsar2 = new PulsarService(config2);
+ pulsar2.start();
+ broker2 = pulsar2.getBrokerService();
+ url2 = new URL(pulsar2.getWebServiceAddress());
+ urlTls2 = new URL(pulsar2.getWebServiceAddressTls());
+
+ log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(),
broker2.getListenPort());
+ }
+
+ protected int getOneFreePort() throws IOException {
+ ServerSocket serverSocket = new ServerSocket(0);
+ int port = serverSocket.getLocalPort();
+ serverSocket.close();
+ return port;
+ }
+
+ protected void startAdminClient() throws Exception {
+ admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
+ admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
+ }
+
+ protected void startPulsarClient() throws Exception{
+ ClientBuilder clientBuilder1 =
PulsarClient.builder().serviceUrl(url1.toString());
+ client1 = initClient(clientBuilder1);
+ ClientBuilder clientBuilder2 =
PulsarClient.builder().serviceUrl(url2.toString());
+ client2 = initClient(clientBuilder2);
+ }
+
+ protected void createDefaultTenantsAndClustersAndNamespace() throws
Exception {
+ admin1.clusters().createCluster(cluster1, ClusterData.builder()
+ .serviceUrl(url1.toString())
+ .serviceUrlTls(urlTls1.toString())
+ .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin1.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(cluster1)));
+ admin1.namespaces().createNamespace(defaultNamespace,
Sets.newHashSet(cluster1));
+ }
+
+ @Override
+ protected void setup() throws Exception {
+ incrementSetupNumber();
+
+ log.info("--- Starting OneWayReplicatorTestBase::setup ---");
+
+ startZKAndBK();
+
+ startBrokers();
+
+ startAdminClient();
+
+ createDefaultTenantsAndClustersAndNamespace();
+
+ startPulsarClient();
+
+ Thread.sleep(100);
+ log.info("--- OneWayReplicatorTestBase::setup completed ---");
+ }
+
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName, int zkPort) {
+ config.setClusterName(clusterName);
+ config.setAdvertisedAddress("localhost");
+ config.setWebServicePort(Optional.of(0));
+ config.setWebServicePortTls(Optional.of(0));
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
+ config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + zkPort +
"/config_meta");
+ config.setBrokerDeleteInactiveTopicsEnabled(false);
+ config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
+ config.setBrokerShutdownTimeoutMs(0L);
+ config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ config.setBrokerServicePort(Optional.of(0));
+ config.setBrokerServicePortTls(Optional.of(0));
+ config.setBacklogQuotaCheckIntervalInSeconds(5);
+ config.setDefaultNumberOfNamespaceBundles(1);
+ config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ config.setEnableReplicatedSubscriptions(true);
+ config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+ config.setLoadBalancerSheddingEnabled(false);
+ config.setForceDeleteNamespaceAllowed(true);
+
config.setLoadManagerClassName(PreferBrokerModularLoadManager.class.getName());
+ config.setMetadataStoreSessionTimeoutMillis(5000);
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ // shutdown.
+ markCurrentSetupNumberCleaned();
+ log.info("--- Shutting down ---");
+
+ // Stop brokers.
+ if (client1 != null) {
+ client1.close();
+ client1 = null;
+ }
+ if (admin1 != null) {
+ admin1.close();
+ admin1 = null;
+ }
+ if (client2 != null) {
+ client2.close();
+ client2 = null;
+ }
+ if (admin2 != null) {
+ admin2.close();
+ admin2 = null;
+ }
+ if (pulsar1 != null) {
+ pulsar1.close();
+ pulsar1 = null;
+ }
+ if (pulsar2 != null) {
+ pulsar2.close();
+ pulsar2 = null;
+ }
+
+ // Stop ZK and BK.
+ if (bkEnsemble1 != null) {
+ bkEnsemble1.stop();
+ bkEnsemble1 = null;
+ }
+ if (metadataZKProxy != null) {
+ metadataZKProxy.stop();
+ }
+ if (brokerConfigZk1 != null) {
+ brokerConfigZk1.stop();
+ brokerConfigZk1 = null;
+ }
+
+ // Reset configs.
+ config1 = new ServiceConfiguration();
+ preferBroker.set(null);
+ }
+
+ protected PulsarClient initClient(ClientBuilder clientBuilder) throws
Exception {
+ return clientBuilder.build();
+ }
+
+ protected static class PreferBrokerModularLoadManager extends
ModularLoadManagerImpl {
+
+ @Override
+ public String setNamespaceBundleAffinity(String bundle, String broker)
{
+ if (StringUtils.isNotBlank(broker)) {
+ return broker;
+ }
+ Set<String> availableBrokers =
NetworkErrorTestBase.getAvailableBrokers(super.pulsar);
+ String prefer = preferBroker.get();
+ if (availableBrokers.contains(prefer)) {
+ return prefer;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ protected static class PreferExtensibleLoadManager extends
ExtensibleLoadManagerImpl {
+
+ @Override
+ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle,
+ Set<String>
excludeBrokerSet,
+ LookupOptions
options) {
+ Set<String> availableBrokers =
NetworkErrorTestBase.getAvailableBrokers(super.pulsar);
+ String prefer = preferBroker.get();
+ if (availableBrokers.contains(prefer)) {
+ return CompletableFuture.completedFuture(Optional.of(prefer));
+ } else {
+ return super.selectAsync(bundle, excludeBrokerSet, options);
+ }
+ }
+ }
+
+ public void setPreferBroker(PulsarService target) {
+ for (PulsarService pulsar : Arrays.asList(pulsar1, pulsar2)) {
+ for (String broker : getAvailableBrokers(pulsar)) {
+ if (broker.endsWith(target.getBrokerListenPort().orElse(-1) +
"")
+ ||
broker.endsWith(target.getListenPortHTTPS().orElse(-1) + "")
+ ||
broker.endsWith(target.getListenPortHTTP().orElse(-1) + "")
+ ||
broker.endsWith(target.getBrokerListenPortTls().orElse(-1) + "")) {
+ preferBroker.set(broker);
+ }
+ }
+ }
+ }
+
+ public static Set<String> getAvailableBrokers(PulsarService pulsar) {
+ Object loadManagerWrapper = pulsar.getLoadManager().get();
+ Object loadManager = WhiteboxImpl.getInternalState(loadManagerWrapper,
"loadManager");
+ if (loadManager instanceof ModularLoadManagerImpl) {
+ return ((ModularLoadManagerImpl)
loadManager).getAvailableBrokers();
+ } else if (loadManager instanceof ExtensibleLoadManagerImpl) {
+ return new HashSet<>(((ExtensibleLoadManagerImpl)
loadManager).getBrokerRegistry()
+ .getAvailableBrokersAsync().join());
+ } else {
+ throw new RuntimeException("Not support for the load manager: " +
loadManager.getClass().getName());
+ }
+ }
+
+ public void clearPreferBroker() {
+ preferBroker.set(null);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
new file mode 100644
index 00000000000..143557b008b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ZkSessionExpireTest extends NetworkErrorTestBase {
+
+ private java.util.function.Consumer<ServiceConfiguration> settings;
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ private void
setupWithSettings(java.util.function.Consumer<ServiceConfiguration> settings)
throws Exception {
+ this.settings = settings;
+ super.setup();
+ }
+
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName, int zkPort) {
+ super.setConfigDefaults(config, clusterName, zkPort);
+ settings.accept(config);
+ }
+
+ @DataProvider(name = "settings")
+ public Object[][] settings() {
+ return new Object[][]{
+ {false, NetworkErrorTestBase.PreferBrokerModularLoadManager.class},
+ {true, NetworkErrorTestBase.PreferBrokerModularLoadManager.class}
+ // Create a separate PR to add this test case.
+ // {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class}.
+ };
+ }
+
+ @Test(timeOut = 600 * 1000, dataProvider = "settings")
+ public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic,
Class loadManager) throws Exception {
+ // Setup.
+ setupWithSettings(config -> {
+ config.setManagedLedgerMaxEntriesPerLedger(1);
+ config.setSystemTopicEnabled(enableSystemTopic);
+ config.setTopicLevelPoliciesEnabled(enableSystemTopic);
+ if (loadManager != null) {
+ config.setLoadManagerClassName(loadManager.getName());
+ }
+ });
+
+ // Init topic.
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp");
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin1.topics().createSubscription(topicName, "s1",
MessageId.earliest);
+
+ // Inject a prefer mechanism, so that all topics will be assigned to
broker1, which can be injected a ZK
+ // session expire error.
+ setPreferBroker(pulsar1);
+ admin1.namespaces().unload(defaultNamespace);
+ admin2.namespaces().unload(defaultNamespace);
+
+ // Confirm all brokers registered.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getAvailableBrokers(pulsar1).size(), 2);
+ assertEquals(getAvailableBrokers(pulsar2).size(), 2);
+ });
+
+ // Load up a topic, and it will be assigned to broker1.
+ ProducerImpl<String> p1 = (ProducerImpl<String>)
client1.newProducer(Schema.STRING).topic(topicName)
+ .sendTimeout(10, TimeUnit.SECONDS).create();
+ Topic broker1Topic1 = pulsar1.getBrokerService().getTopic(topicName,
false).join().get();
+ assertNotNull(broker1Topic1);
+ clearPreferBroker();
+
+ // Inject a ZK session expire error, and wait for broker1 to offline.
+ metadataZKProxy.rejectAllConnections();
+ metadataZKProxy.disconnectFrontChannels();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getAvailableBrokers(pulsar2).size(), 1);
+ });
+
+ // Send messages continuously.
+ // Verify: the topic was transferred to broker2.
+ CompletableFuture<MessageId> broker1Send1 =
p1.sendAsync("broker1_msg1");
+ Producer<String> p2 =
client2.newProducer(Schema.STRING).topic(topicName)
+ .sendTimeout(10, TimeUnit.SECONDS).create();
+ CompletableFuture<MessageId> broker2Send1 =
p2.sendAsync("broker2_msg1");
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<Optional<Topic>> future =
pulsar2.getBrokerService().getTopic(topicName, false);
+ assertNotNull(future);
+ assertTrue(future.isDone() && !future.isCompletedExceptionally());
+ Optional<Topic> optional = future.join();
+ assertTrue(optional != null && !optional.isEmpty());
+ });
+
+ // Both two brokers assumed they are the owner of the topic.
+ Topic broker1Topic2 = pulsar1.getBrokerService().getTopic(topicName,
false).join().get();
+ Topic broker2Topic2 = pulsar2.getBrokerService().getTopic(topicName,
false).join().get();
+ assertNotNull(broker1Topic2);
+ assertNotNull(broker2Topic2);
+
+ // Send messages continuously.
+ // Publishing to broker-1 will fail.
+ // Publishing to broker-2 will success.
+ CompletableFuture<MessageId> broker1Send2 =
p1.sendAsync("broker1_msg2");
+ CompletableFuture<MessageId> broker2Send2 =
p2.sendAsync("broker2_msg2");
+ try {
+ broker1Send1.join();
+ broker1Send2.join();
+ p1.getClientCnx();
+ fail("expected a publish error");
+ } catch (Exception ex) {
+ // Expected.
+ }
+ broker2Send1.join();
+ broker2Send2.join();
+
+ // Broker rebuild ZK session.
+ metadataZKProxy.unRejectAllConnections();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(getAvailableBrokers(pulsar1).size(), 2);
+ assertEquals(getAvailableBrokers(pulsar2).size(), 2);
+ });
+
+ // Verify: the topic on broker-1 will be unloaded.
+ // Verify: the topic on broker-2 is fine.
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+ CompletableFuture<Optional<Topic>> future =
pulsar1.getBrokerService().getTopic(topicName, false);
+ assertTrue(future == null || future.isCompletedExceptionally());
+ });
+ Topic broker2Topic3 = pulsar2.getBrokerService().getTopic(topicName,
false).join().get();
+ assertNotNull(broker2Topic3);
+
+ // Send messages continuously.
+ // Verify: p1.send will success(it will connect to broker-2).
+ // Verify: p2.send will success.
+ CompletableFuture<MessageId> broker1Send3 =
p1.sendAsync("broker1_msg3");
+ CompletableFuture<MessageId> broker2Send3 =
p2.sendAsync("broker2_msg3");
+ broker1Send3.join();
+ broker2Send3.join();
+
+ long msgBacklog =
admin2.topics().getStats(topicName).getSubscriptions().get("s1").getMsgBacklog();
+ log.info("msgBacklog: {}", msgBacklog);
+
+ // cleanup.
+ p1.close();
+ p2.close();
+ admin2.topics().delete(topicName, false);
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f35f1974632..c458d0da214 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -496,6 +496,16 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected void receivedSessionEvent(SessionEvent event) {
isConnected = event.isConnected();
+
+ // Clear cache after session expired.
+ if (event == SessionEvent.SessionReestablished || event ==
SessionEvent.Reconnected) {
+ for (MetadataCacheImpl metadataCache : metadataCaches) {
+ metadataCache.invalidateAll();
+ }
+ invalidateAll();
+ }
+
+ // Notice listeners.
try {
executor.execute(() -> {
sessionListeners.forEach(l -> {