poorbarcode commented on code in PR #25170:
URL: https://github.com/apache/pulsar/pull/25170#discussion_r2734856966
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -867,12 +869,331 @@ protected CompletableFuture<Void>
internalSetNamespaceReplicationClusters(List<S
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__
-> replicationClusterSet);
}))
+ .thenCompose(replicationClusterSet ->
+ getNamespacePoliciesAsync(namespaceName)
+ .thenCompose(policies ->
+
validateReplicationClusterCompatibility(replicationClusterSet,
+ policies.replication_clusters))
+ .thenApply(__ -> replicationClusterSet))
.thenCompose(replicationClusterSet ->
updatePoliciesAsync(namespaceName, policies -> {
policies.replication_clusters = replicationClusterSet;
return policies;
}));
}
+ /**
+ * Validates compatibility between clusters when enabling namespace-level
replication.
+ * This validation is only performed for newly added clusters.
+ * This includes:
+ * <ul>
+ * <li>All topics' partitions that have been created should be the same
across clusters,
+ * including the __change_events system topic</li>
+ * <li>Auto-creation policies between clusters should be the same,
including broker-level
+ * and namespace-level settings</li>
+ * </ul>
+ *
+ * @param replicationClusterSet the new set of clusters to be configured
+ * @param existingClusters the existing set of replication clusters
+ * @return a CompletableFuture that completes when validation passes, or
fails with RestException
+ */
+ private CompletableFuture<Void>
validateReplicationClusterCompatibility(Set<String> replicationClusterSet,
+
Set<String> existingClusters) {
+ String localCluster = pulsar().getConfiguration().getClusterName();
+
+ // Skip validation if local cluster is not in the replication set
+ if (!replicationClusterSet.contains(localCluster)) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Find newly added clusters
+ Set<String> newlyAddedClusters = new HashSet<>(replicationClusterSet);
+ if (existingClusters != null) {
+ newlyAddedClusters.removeAll(existingClusters);
+ }
+
+ if (newlyAddedClusters.isEmpty()) {
+ // No new clusters added, skip validation
+ return CompletableFuture.completedFuture(null);
+ }
+
+ List<String> newRemoteClusters = newlyAddedClusters.stream()
+ .filter(cluster -> !cluster.equals(localCluster))
+ .collect(Collectors.toList());
+
+ if (newRemoteClusters.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Validate compatibility with each newly added remote cluster
+ List<CompletableFuture<Void>> validationFutures =
newRemoteClusters.stream()
+ .map(remoteCluster ->
validateClusterPairCompatibility(localCluster, remoteCluster))
+ .collect(Collectors.toList());
+
+ return FutureUtil.waitForAll(validationFutures);
+ }
+
+ /**
+ * Validates compatibility between the local cluster and a remote cluster.
+ */
+ private CompletableFuture<Void> validateClusterPairCompatibility(String
localCluster, String remoteCluster) {
+ return clusterResources().getClusterAsync(remoteCluster)
+ .thenCompose(clusterDataOpt -> {
+ if (clusterDataOpt.isEmpty()) {
+ throw new RestException(Status.NOT_FOUND, "Cluster " +
remoteCluster + " does not exist");
+ }
+ ClusterData clusterData = clusterDataOpt.get();
+ PulsarAdmin remoteAdmin;
+ try {
+ remoteAdmin = pulsar().getBrokerService()
+ .getClusterPulsarAdmin(remoteCluster,
Optional.of(clusterData));
+ } catch (Exception e) {
+ throw new RestException(Status.INTERNAL_SERVER_ERROR,
+ "Failed to update clusters because failed to
create admin client for cluster "
+ + remoteCluster + ": " + e.getMessage());
+ }
+
+ // Validate both partition compatibility and auto-creation
policy compatibility
+ return validatePartitionCompatibility(remoteAdmin,
remoteCluster)
+ .thenCompose(__ ->
validateAutoTopicCreationCompatibility(remoteAdmin, remoteCluster));
+ });
+ }
+
+ /**
+ * Validates partition compatibility between local and remote clusters.
+ * <ul>
+ * <li>All partitioned topics (including __change_events) that exist in
the local cluster
+ * must have the same partition count in the remote cluster (if they
exist there).</li>
+ * <li>Non-partitioned topics in the local cluster must not exist as
partitioned topics
+ * in the remote cluster.</li>
+ * </ul>
+ */
+ private CompletableFuture<Void> validatePartitionCompatibility(PulsarAdmin
remoteAdmin, String remoteCluster) {
+ // Get local partitioned topics
+ CompletableFuture<List<String>> localPartitionedTopicsFuture =
+
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName);
+
+ // Get persistent topics only (non-persistent topics don't have
persistent state
+ // and getListOfNonPersistentTopics triggers global namespace
ownership validation
+ // which fails when namespace has no clusters configured yet)
+ CompletableFuture<List<String>> localAllTopicsFuture =
+
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+
+ return localPartitionedTopicsFuture.thenCombine(localAllTopicsFuture,
+ (localPartitionedTopics, localAllTopics) -> {
+ // Find non-partitioned topics (topics that are not in the
partitioned list
+ // and are not partition suffixes of partitioned topics)
+ Set<String> partitionedTopicSet = new
HashSet<>(localPartitionedTopics);
+ List<String> localNonPartitionedTopics =
localAllTopics.stream()
+ .map(TopicName::get)
+ .filter(topicName -> !partitionedTopicSet.contains(
+ topicName.getPartitionedTopicName()))
+ .map(TopicName::toString)
+ .distinct()
+ .collect(Collectors.toList());
+
+ return new TopicLists(localPartitionedTopics,
localNonPartitionedTopics);
+ })
+ .thenCompose(topicLists -> {
+ List<CompletableFuture<Void>> validations = new
ArrayList<>();
+
+ // Validate partitioned topics have same partition count
+ for (String topic : topicLists.partitionedTopics) {
+ validations.add(compareTopicPartitions(topic,
remoteAdmin, remoteCluster));
+ }
+
+ // Validate non-partitioned topics don't exist as
partitioned on remote
+ for (String topic : topicLists.nonPartitionedTopics) {
+
validations.add(validateNonPartitionedTopicCompatibility(topic, remoteAdmin,
remoteCluster));
+ }
+
+ return FutureUtil.waitForAll(validations);
+ });
+ }
+
+ /**
+ * Helper class to hold partitioned and non-partitioned topic lists.
+ */
+ private static class TopicLists {
+ final List<String> partitionedTopics;
+ final List<String> nonPartitionedTopics;
+
+ TopicLists(List<String> partitionedTopics, List<String>
nonPartitionedTopics) {
+ this.partitionedTopics = partitionedTopics;
+ this.nonPartitionedTopics = nonPartitionedTopics;
+ }
+ }
+
+ /**
+ * Validates that a non-partitioned topic on local does not exist as a
partitioned topic on remote.
+ */
+ private CompletableFuture<Void>
validateNonPartitionedTopicCompatibility(String topic, PulsarAdmin remoteAdmin,
+
String remoteCluster) {
+ return remoteAdmin.topics().getPartitionedTopicMetadataAsync(topic)
+ .thenAccept(remoteMetadata -> {
+ // If remote has partitions > 0, it's a partitioned topic,
which is incompatible
+ if (remoteMetadata.partitions > 0) {
+ throw new RestException(Status.CONFLICT,
+ String.format("Topic type mismatch for topic
'%s': local cluster has a "
+ + "non-partitioned topic, but
remote cluster '%s' has a partitioned "
+ + "topic with %d partitions. "
+ + "Please ensure topic types
are the same before enabling replication.",
+ topic, remoteCluster,
remoteMetadata.partitions));
+ }
+ })
+ .exceptionally(ex -> {
+ // If topic doesn't exist on remote, that's fine
+ if (ex.getCause() instanceof
PulsarAdminException.NotFoundException
+ || ex instanceof
PulsarAdminException.NotFoundException) {
+ return null;
+ }
+ throw new CompletionException(ex);
+ });
+ }
+
+ /**
+ * Compares the partition count of a topic between local and remote
clusters.
+ * If the topic exists on local but not on remote, validation passes.
+ * If the topic exists on both clusters, partition counts must match.
+ */
+ private CompletableFuture<Void> compareTopicPartitions(String topic,
PulsarAdmin remoteAdmin,
+ String
remoteCluster) {
+ TopicName topicName = TopicName.get(topic);
+
+ // Get local partition metadata
+ CompletableFuture<Optional<PartitionedTopicMetadata>>
localMetadataFuture =
+ pulsar().getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName);
+
+ // Get remote partition metadata, return Optional.empty() if topic
doesn't exist
+ CompletableFuture<Optional<PartitionedTopicMetadata>>
remoteMetadataFuture =
+ remoteAdmin.topics().getPartitionedTopicMetadataAsync(topic)
+ .thenApply(Optional::of)
+ .exceptionally(ex -> {
+ // If topic doesn't exist on remote, return empty
+ if (ex.getCause() instanceof
PulsarAdminException.NotFoundException
+ || ex instanceof
PulsarAdminException.NotFoundException) {
+ return Optional.empty();
+ }
+ throw new CompletionException(ex);
+ });
+
+ return localMetadataFuture.thenCombine(remoteMetadataFuture,
(localMetadataOpt, remoteMetadataOpt) -> {
+ // If topic doesn't exist on remote, validation passes
+ if (remoteMetadataOpt.isEmpty()) {
+ return null;
+ }
+
+ int localPartitions = localMetadataOpt.map(m ->
m.partitions).orElse(0);
+ int remotePartitions = remoteMetadataOpt.get().partitions;
+
+ if (localPartitions != remotePartitions) {
+ String topicType =
SystemTopicNames.isTopicPoliciesSystemTopic(topic)
+ ? "__change_events system topic" : "topic";
+ throw new RestException(Status.CONFLICT,
+ String.format("Partition count mismatch for %s '%s':
local cluster has %d partitions, "
+ + "remote cluster '%s' has %d
partitions. "
+ + "Please ensure partition counts are
the same before enabling replication.",
+ topicType, topic, localPartitions,
remoteCluster, remotePartitions));
+ }
+ return null;
+ });
+ }
+
+ /**
+ * Validates that the effective auto-topic creation policies are the same
between local and remote clusters.
+ * The effective policy is computed by: namespace-level policy overrides
broker-level if it exists.
+ */
+ private CompletableFuture<Void>
validateAutoTopicCreationCompatibility(PulsarAdmin remoteAdmin,
Review Comment:
https://github.com/apache/pulsar/pull/24485 will always allow replicator to
create topics
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]