This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new be87cd9bfe3 [fix][broker]system topic was created with different
partitions acrossing clusters after enabled namespace-level replication (#25312)
be87cd9bfe3 is described below
commit be87cd9bfe39c35f99689691ac96b50cade84c54
Author: fengyubiao <[email protected]>
AuthorDate: Tue Mar 17 17:43:41 2026 +0800
[fix][broker]system topic was created with different partitions acrossing
clusters after enabled namespace-level replication (#25312)
---
.../pulsar/broker/service/BrokerService.java | 123 +++++++++++++++---
.../pulsar/broker/admin/PersistentTopicsTest.java | 6 +
.../service/OneWayReplicatorUsingGlobalZKTest.java | 137 +++++++++++++++++++++
3 files changed, 251 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 79a26ec56ae..d6226254f3c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -114,6 +114,7 @@ import
org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -144,8 +145,10 @@ import
org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
import org.apache.pulsar.broker.validator.BindAddressValidator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -3386,20 +3389,32 @@ public class BrokerService implements Closeable {
return CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0));
}
- // Allow auto create non-partitioned topic.
- boolean autoCreatePartitionedTopic =
pulsar.getBrokerService()
- .isDefaultTopicTypePartitioned(topicName,
policies);
- if (!autoCreatePartitionedTopic ||
topicName.isPartitioned()) {
- return CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0));
- }
+ return
getRemotePartitionedTopicMetadataForAutoCreation(topicName, policies)
+ .thenCompose(remoteTopicExistsInfo -> {
+ // If remote topic exists, prioritize topic
shape from remote clusters.
+ if (remoteTopicExistsInfo.isExists()) {
+ if (remoteTopicExistsInfo.getTopicType()
== TopicType.PARTITIONED) {
+ return
createPartitionedTopicMetadataAsync(topicName,
+
remoteTopicExistsInfo.getPartitions());
+ }
+ return
CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
+ }
- // Create partitioned metadata.
- return
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName,
policies)
- .exceptionallyCompose(ex -> {
+ // Allow auto create non-partitioned topic.
+ boolean autoCreatePartitionedTopic =
pulsar.getBrokerService()
+
.isDefaultTopicTypePartitioned(topicName, policies);
+ if (!autoCreatePartitionedTopic ||
topicName.isPartitioned()) {
+ return
CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
+ }
+
+ // Create partitioned metadata.
+ return
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName,
+ policies);
+ }).exceptionallyCompose(ex -> {
// The partitioned topic might be created
concurrently.
if (ex.getCause() instanceof
MetadataStoreException.AlreadyExistsException) {
- log.info("[{}] The partitioned topic is
already created, try to refresh the cache"
- + " and read again.", topicName);
+ log.info("[{}] The partitioned topic is
already created, try to refresh "
+ + "the cache and read again.",
topicName);
CompletableFuture<PartitionedTopicMetadata> recheckFuture =
fetchPartitionedTopicMetadataAsync(topicName, true);
recheckFuture.exceptionally(ex2 -> {
@@ -3428,20 +3443,25 @@ public class BrokerService implements Closeable {
private CompletableFuture<PartitionedTopicMetadata>
createDefaultPartitionedTopicAsync(TopicName topicName,
Optional<Policies> policies) {
final int defaultNumPartitions =
pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
+ return createPartitionedTopicMetadataAsync(topicName,
defaultNumPartitions);
+ }
+
+ private CompletableFuture<PartitionedTopicMetadata>
createPartitionedTopicMetadataAsync(TopicName topicName,
+
int numPartitions) {
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
- if (defaultNumPartitions <= 0) {
+ if (numPartitions <= 0) {
return FutureUtil.failedFuture(
new IllegalArgumentException("Default number of partitions
should be more than 0"));
}
- if (maxPartitions > 0 && defaultNumPartitions > maxPartitions) {
+ if (maxPartitions > 0 && numPartitions > maxPartitions) {
return FutureUtil.failedFuture(
new IllegalArgumentException("Number of partitions should
be less than or equal to "
+ maxPartitions));
}
- PartitionedTopicMetadata configMetadata = new
PartitionedTopicMetadata(defaultNumPartitions);
+ PartitionedTopicMetadata configMetadata = new
PartitionedTopicMetadata(numPartitions);
- return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions,
true)
+ return checkMaxTopicsPerNamespace(topicName, numPartitions, true)
.thenCompose(__ -> {
PartitionedTopicResources partitionResources =
pulsar.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources();
@@ -3453,6 +3473,79 @@ public class BrokerService implements Closeable {
});
}
+ private CompletableFuture<TopicExistsInfo>
getRemotePartitionedTopicMetadataForAutoCreation(
+ TopicName topicName, Optional<Policies> policies) {
+ if (!pulsar.getConfig().isCreateTopicToRemoteClusterForReplication()) {
+ return
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+ }
+ if (topicName.isPartitioned() || !topicName.isPersistent() ||
policies.isEmpty()) {
+ return
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+ }
+ Set<String> replicationClusters = policies.get().replication_clusters;
+ if (replicationClusters == null || replicationClusters.isEmpty()) {
+ return
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+ }
+ String localCluster = pulsar.getConfiguration().getClusterName();
+ if (!replicationClusters.contains(localCluster) ||
replicationClusters.size() <= 1) {
+ return
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+ }
+ List<String> remoteClusters = replicationClusters.stream()
+ .filter(cluster -> !cluster.equals(localCluster))
+ .sorted()
+ .toList();
+ return findRemoteTopicMetadataForAutoCreation(topicName,
remoteClusters, 0, null);
+ }
+
+ private CompletableFuture<TopicExistsInfo>
findRemoteTopicMetadataForAutoCreation(
+ TopicName topicName, List<String> remoteClusters, int index,
Throwable errOccurred) {
+ if (index >= remoteClusters.size()) {
+ if (errOccurred != null) {
+ log.error("[{}] Failed to check remote topic partitioned
metadata on cluster {}. Fallback to "
+ + "default auto topic creation policy.",
+ topicName, remoteClusters, errOccurred);
+ }
+ return
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+ }
+ final String remoteCluster = remoteClusters.get(index);
+ return
pulsar.getPulsarResources().getClusterResources().getClusterAsync(remoteCluster)
+ .thenCompose(clusterData -> {
+ if (clusterData.isEmpty()) {
+ log.warn("[{}] Skip checking remote cluster {} because
cluster data is missing",
+ topicName, remoteCluster);
+ return findRemoteTopicMetadataForAutoCreation(topicName,
remoteClusters, index + 1, null);
+ }
+ PulsarClient client = getReplicationClient(remoteCluster,
clusterData);
+ CompletableFuture<TopicExistsInfo> future = new
CompletableFuture<>();
+ client.getPartitionsForTopic(topicName.toString(),
false).handle((topics, t) -> {
+ if (t != null) {
+ Throwable actEx =
FutureUtil.unwrapCompletionException(t);
+ if (actEx instanceof
PulsarClientException.NotFoundException
+ | actEx instanceof
PulsarClientException.TopicDoesNotExistException
+ | actEx instanceof
PulsarAdminException.NotFoundException) {
+
future.complete(TopicExistsInfo.newTopicNotExists());
+ } else {
+ FutureUtil.completeAfter(future,
+
findRemoteTopicMetadataForAutoCreation(topicName, remoteClusters, index + 1,
actEx));
+ }
+ return null;
+ }
+ if (topics.isEmpty()) {
+ future.complete(TopicExistsInfo.newTopicNotExists());
+ } else if (topics.size() == 1 &&
!TopicName.get(topics.get(0)).isPartitioned()) {
+
future.complete(TopicExistsInfo.newNonPartitionedTopicExists());
+ } else {
+ int maxPartitionNum = 0;
+ for (String topic : topics) {
+ maxPartitionNum = Math.max(maxPartitionNum,
TopicName.get(topic).getPartitionIndex());
+ }
+
future.complete(TopicExistsInfo.newPartitionedTopicExists(maxPartitionNum + 1));
+ }
+ return null;
+ });
+ return future;
+ });
+ }
+
public CompletableFuture<PartitionedTopicMetadata>
fetchPartitionedTopicMetadataAsync(TopicName topicName) {
return fetchPartitionedTopicMetadataAsync(topicName, false);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 7ed05939045..1f80d9a1925 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -138,6 +138,12 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
uriInfo = mock(UriInfo.class);
}
+ @Override
+ protected void doInitConf() throws Exception {
+ configureInitialConfig(conf);
+ conf.setCreateTopicToRemoteClusterForReplication(false);
+ }
+
@Override
@BeforeMethod
protected void setup() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 0ae33d247f1..aa8d0aabcb6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -37,10 +37,13 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
@@ -50,14 +53,18 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.policies.data.TopicType;
+import
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -669,6 +676,136 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
}
}
+ @DataProvider
+ public Object[][] localSystemTopicPartitions() {
+ return new Object[][] {
+ {0},
+ {3}
+ };
+ }
+
+ @Test(dataProvider = "localSystemTopicPartitions")
+ public void testSystemTopicCreationWithDifferentTopicCreationRule(int
localSystemTopicPartitions) throws Exception {
+ String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns");
+ Predicate<String> topicNameFilter = t ->
TopicName.get(t).getNamespace().equals(ns);
+ String systemTopic = "persistent://" + ns + "/__change_events";
+ admin1.namespaces().createNamespace(ns);
+ admin1.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1)), false);
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(admin1.namespaces().getNamespaceReplicationClusters(ns).size(), 1);
+
assertEquals(admin2.namespaces().getNamespaceReplicationClusters(ns).size(), 1);
+ });
+
+ // Trigger system topic creation on cluster1, following {@param
localSystemTopicPartitions}.
+ AutoTopicCreationOverride autoTopicCreation1 = null;
+ if (localSystemTopicPartitions == 0) {
+ autoTopicCreation1 =
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+ .topicType("non-partitioned").build();
+ } else {
+ autoTopicCreation1 =
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+
.topicType("partitioned").defaultNumPartitions(localSystemTopicPartitions).build();
+ }
+ admin1.namespaces().setAutoTopicCreation(ns, autoTopicCreation1);
+ Awaitility.await().untilAsserted(() -> {
+ AutoTopicCreationOverride autoTopicCreationOverride =
+ admin1.namespaces().getAutoTopicCreationAsync(ns).get(3,
TimeUnit.SECONDS);
+ assertNotNull(autoTopicCreationOverride);
+ if (localSystemTopicPartitions == 0) {
+
assertTrue("non-partitioned".equalsIgnoreCase(autoTopicCreationOverride.getTopicType()));
+ } else {
+
assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(),
localSystemTopicPartitions);
+ }
+ });
+ // Use a topic loading to trigger system topic creation.
+ String topicUsedToTriggerSystemTopic =
BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp");
+
admin1.topics().createNonPartitionedTopic(topicUsedToTriggerSystemTopic);
+ admin1.topics().delete(topicUsedToTriggerSystemTopic, false);
+ // Verify: the system topic was created as expected.
+ Awaitility.await().untilAsserted(() -> {
+ TopicExistsInfo existsInfo = pulsar1.getNamespaceService()
+ .checkTopicExistsAsync(TopicName.get(systemTopic)).get(3,
TimeUnit.SECONDS);
+ assertTrue(existsInfo.isExists());
+ if (localSystemTopicPartitions == 0) {
+ assertEquals(existsInfo.getTopicType(),
TopicType.NON_PARTITIONED);
+ } else {
+ assertEquals(existsInfo.getTopicType(), TopicType.PARTITIONED);
+ assertEquals(existsInfo.getPartitions(),
localSystemTopicPartitions);
+ }
+ });
+
+ // Enable replication.
+ // Set topic auto-creation rule to "partitions: 2".
+ final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns +
"/tp");
+ final Set<String> clusters = new HashSet<>(Arrays.asList(cluster1,
cluster2));
+ admin1.namespaces().setNamespaceReplicationClusters(ns, clusters,
true);
+ AutoTopicCreationOverride autoTopicCreation2 =
+
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+
.topicType("partitioned").defaultNumPartitions(2).build();
+ admin1.namespaces().setAutoTopicCreation(ns, autoTopicCreation2);
+ admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation2);
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(admin1.namespaces().getAutoTopicCreationAsync(ns).join()
+ .getDefaultNumPartitions(), 2);
+
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join()
+ .getDefaultNumPartitions(), 2);
+ });
+
+ admin2.topics().createNonPartitionedTopic(tp);
+ Producer<String> p2 =
client2.newProducer(Schema.STRING).topic(tp).create();
+ p2.send("msg-1");
+ p2.close();
+ Producer<String> p1 =
client1.newProducer(Schema.STRING).topic(tp).create();
+ p1.send("msg-1");
+ p1.close();
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic1 = (PersistentTopic)
broker1.getTopic(tp, false).join().get();
+ assertFalse(persistentTopic1.getReplicators().isEmpty());
+ PersistentTopic persistentTopic2 = (PersistentTopic)
broker2.getTopic(tp, false).join().get();
+ assertFalse(persistentTopic2.getReplicators().isEmpty());
+ });
+
+ // Verify: the topics are the same between two clusters.
+ Awaitility.await().untilAsserted(() -> {
+ List<String> topics1 =
pulsar1.getBrokerService().getTopics().keySet()
+
.stream().filter(topicNameFilter).collect(Collectors.toList());
+ List<String> topics2 =
pulsar2.getBrokerService().getTopics().keySet()
+
.stream().filter(topicNameFilter).collect(Collectors.toList());
+ Collections.sort(topics1);
+ Collections.sort(topics2);
+ boolean systemTopicCreated1 = false;
+ for (String tp1 : topics1) {
+ if (tp1.contains("__change_events")) {
+ systemTopicCreated1 = true;
+ break;
+ }
+ }
+ boolean systemTopicCreated2 = false;
+ for (String tp2 : topics2) {
+ if (tp2.contains("__change_events")) {
+ systemTopicCreated2 = true;
+ break;
+ }
+ }
+ log.info("topics1: {}", topics1);
+ log.info("topics2: {}", topics2);
+ assertTrue(systemTopicCreated1);
+ assertTrue(systemTopicCreated2);
+ assertEquals(topics1, topics2);
+ });
+
+ // cleanup.
+ admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
+ admin2.topics().setReplicationClusters(tp, Arrays.asList(cluster2));
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic1 = (PersistentTopic)
broker1.getTopic(tp, false).join().get();
+ assertTrue(persistentTopic1.getReplicators().isEmpty());
+ PersistentTopic persistentTopic2 = (PersistentTopic)
broker2.getTopic(tp, false).join().get();
+ assertTrue(persistentTopic2.getReplicators().isEmpty());
+ });
+ admin1.topics().delete(tp, false);
+ admin2.topics().delete(tp, false);
+ }
+
@Test
public void testUpdateNamespacePolicies() throws Exception {
// Create a namespace and allow both clusters to access.