poorbarcode commented on code in PR #25170:
URL: https://github.com/apache/pulsar/pull/25170#discussion_r2736855626


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -867,12 +869,331 @@ protected CompletableFuture<Void> 
internalSetNamespaceReplicationClusters(List<S
                                     }).collect(Collectors.toList());
                             return FutureUtil.waitForAll(futures).thenApply(__ 
-> replicationClusterSet);
                         }))
+                .thenCompose(replicationClusterSet ->
+                    getNamespacePoliciesAsync(namespaceName)
+                        .thenCompose(policies ->
+                            
validateReplicationClusterCompatibility(replicationClusterSet,
+                            policies.replication_clusters))
+                        .thenApply(__ -> replicationClusterSet))
                 .thenCompose(replicationClusterSet -> 
updatePoliciesAsync(namespaceName, policies -> {
                     policies.replication_clusters = replicationClusterSet;
                     return policies;
                 }));
     }
 
+    /**
+     * Validates compatibility between clusters when enabling namespace-level 
replication.
+     * This validation is only performed for newly added clusters.
+     * This includes:
+     * <ul>
+     *   <li>All topics' partitions that have been created should be the same 
across clusters,
+     *       including the __change_events system topic</li>
+     *   <li>Auto-creation policies between clusters should be the same, 
including broker-level
+     *       and namespace-level settings</li>
+     * </ul>
+     *
+     * @param replicationClusterSet the new set of clusters to be configured
+     * @param existingClusters the existing set of replication clusters
+     * @return a CompletableFuture that completes when validation passes, or 
fails with RestException
+     */
+    private CompletableFuture<Void> 
validateReplicationClusterCompatibility(Set<String> replicationClusterSet,
+                                                                             
Set<String> existingClusters) {
+        String localCluster = pulsar().getConfiguration().getClusterName();
+
+        // Skip validation if local cluster is not in the replication set
+        if (!replicationClusterSet.contains(localCluster)) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        // Find newly added clusters
+        Set<String> newlyAddedClusters = new HashSet<>(replicationClusterSet);
+        if (existingClusters != null) {
+            newlyAddedClusters.removeAll(existingClusters);
+        }
+
+        if (newlyAddedClusters.isEmpty()) {
+            // No new clusters added, skip validation
+            return CompletableFuture.completedFuture(null);
+        }
+
+        List<String> newRemoteClusters = newlyAddedClusters.stream()
+                .filter(cluster -> !cluster.equals(localCluster))
+                .collect(Collectors.toList());
+
+        if (newRemoteClusters.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        // Validate compatibility with each newly added remote cluster
+        List<CompletableFuture<Void>> validationFutures = 
newRemoteClusters.stream()
+                .map(remoteCluster -> 
validateClusterPairCompatibility(localCluster, remoteCluster))
+                .collect(Collectors.toList());
+
+        return FutureUtil.waitForAll(validationFutures);
+    }
+
+    /**
+     * Validates compatibility between the local cluster and a remote cluster.
+     */
+    private CompletableFuture<Void> validateClusterPairCompatibility(String 
localCluster, String remoteCluster) {
+        return clusterResources().getClusterAsync(remoteCluster)
+                .thenCompose(clusterDataOpt -> {
+                    if (clusterDataOpt.isEmpty()) {
+                        throw new RestException(Status.NOT_FOUND, "Cluster " + 
remoteCluster + " does not exist");
+                    }
+                    ClusterData clusterData = clusterDataOpt.get();
+                    PulsarAdmin remoteAdmin;
+                    try {
+                        remoteAdmin = pulsar().getBrokerService()
+                                .getClusterPulsarAdmin(remoteCluster, 
Optional.of(clusterData));
+                    } catch (Exception e) {
+                        throw new RestException(Status.INTERNAL_SERVER_ERROR,
+                            "Failed to update clusters because failed to 
create admin client for cluster "
+                            + remoteCluster + ": " + e.getMessage());
+                    }
+
+                    // Validate both partition compatibility and auto-creation 
policy compatibility
+                    return validatePartitionCompatibility(remoteAdmin, 
remoteCluster)
+                            .thenCompose(__ -> 
validateAutoTopicCreationCompatibility(remoteAdmin, remoteCluster));
+                });
+    }
+
+    /**
+     * Validates partition compatibility between local and remote clusters.
+     * <ul>
+     *   <li>All partitioned topics (including __change_events) that exist in 
the local cluster
+     *       must have the same partition count in the remote cluster (if they 
exist there).</li>
+     *   <li>Non-partitioned topics in the local cluster must not exist as 
partitioned topics
+     *       in the remote cluster.</li>
+     * </ul>
+     */
+    private CompletableFuture<Void> validatePartitionCompatibility(PulsarAdmin 
remoteAdmin, String remoteCluster) {
+        // Get local partitioned topics
+        CompletableFuture<List<String>> localPartitionedTopicsFuture =
+                
pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName);
+
+        // Get persistent topics only (non-persistent topics don't have 
persistent state
+        // and getListOfNonPersistentTopics triggers global namespace 
ownership validation
+        // which fails when namespace has no clusters configured yet)
+        CompletableFuture<List<String>> localAllTopicsFuture =
+                
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+
+        return localPartitionedTopicsFuture.thenCombine(localAllTopicsFuture,
+                (localPartitionedTopics, localAllTopics) -> {
+                    // Find non-partitioned topics (topics that are not in the 
partitioned list
+                    // and are not partition suffixes of partitioned topics)
+                    Set<String> partitionedTopicSet = new 
HashSet<>(localPartitionedTopics);
+                    List<String> localNonPartitionedTopics = 
localAllTopics.stream()
+                            .map(TopicName::get)
+                            .filter(topicName -> !partitionedTopicSet.contains(
+                                    topicName.getPartitionedTopicName()))
+                            .map(TopicName::toString)
+                            .distinct()
+                            .collect(Collectors.toList());
+
+                    return new TopicLists(localPartitionedTopics, 
localNonPartitionedTopics);
+                })
+                .thenCompose(topicLists -> {
+                    List<CompletableFuture<Void>> validations = new 
ArrayList<>();
+
+                    // Validate partitioned topics have same partition count
+                    for (String topic : topicLists.partitionedTopics) {
+                        validations.add(compareTopicPartitions(topic, 
remoteAdmin, remoteCluster));
+                    }
+
+                    // Validate non-partitioned topics don't exist as 
partitioned on remote
+                    for (String topic : topicLists.nonPartitionedTopics) {
+                        
validations.add(validateNonPartitionedTopicCompatibility(topic, remoteAdmin, 
remoteCluster));
+                    }
+
+                    return FutureUtil.waitForAll(validations);
+                });
+    }
+
+    /**
+     * Helper class to hold partitioned and non-partitioned topic lists.
+     */
+    private static class TopicLists {
+        final List<String> partitionedTopics;
+        final List<String> nonPartitionedTopics;
+
+        TopicLists(List<String> partitionedTopics, List<String> 
nonPartitionedTopics) {
+            this.partitionedTopics = partitionedTopics;
+            this.nonPartitionedTopics = nonPartitionedTopics;
+        }
+    }
+
+    /**
+     * Validates that a non-partitioned topic on local does not exist as a 
partitioned topic on remote.
+     */
+    private CompletableFuture<Void> 
validateNonPartitionedTopicCompatibility(String topic, PulsarAdmin remoteAdmin,
+                                                                              
String remoteCluster) {
+        return remoteAdmin.topics().getPartitionedTopicMetadataAsync(topic)
+                .thenAccept(remoteMetadata -> {
+                    // If remote has partitions > 0, it's a partitioned topic, 
which is incompatible
+                    if (remoteMetadata.partitions > 0) {
+                        throw new RestException(Status.CONFLICT,
+                                String.format("Topic type mismatch for topic 
'%s': local cluster has a "
+                                                + "non-partitioned topic, but 
remote cluster '%s' has a partitioned "
+                                                + "topic with %d partitions. "
+                                                + "Please ensure topic types 
are the same before enabling replication.",
+                                        topic, remoteCluster, 
remoteMetadata.partitions));
+                    }
+                })
+                .exceptionally(ex -> {
+                    // If topic doesn't exist on remote, that's fine
+                    if (ex.getCause() instanceof 
PulsarAdminException.NotFoundException
+                            || ex instanceof 
PulsarAdminException.NotFoundException) {
+                        return null;
+                    }
+                    throw new CompletionException(ex);
+                });
+    }
+
+    /**
+     * Compares the partition count of a topic between local and remote 
clusters.
+     * If the topic exists on local but not on remote, validation passes.
+     * If the topic exists on both clusters, partition counts must match.
+     */
+    private CompletableFuture<Void> compareTopicPartitions(String topic, 
PulsarAdmin remoteAdmin,
+                                                           String 
remoteCluster) {
+        TopicName topicName = TopicName.get(topic);
+
+        // Get local partition metadata
+        CompletableFuture<Optional<PartitionedTopicMetadata>> 
localMetadataFuture =
+                pulsar().getPulsarResources().getNamespaceResources()
+                        
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName);
+
+        // Get remote partition metadata, return Optional.empty() if topic 
doesn't exist
+        CompletableFuture<Optional<PartitionedTopicMetadata>> 
remoteMetadataFuture =
+                remoteAdmin.topics().getPartitionedTopicMetadataAsync(topic)
+                        .thenApply(Optional::of)
+                        .exceptionally(ex -> {
+                            // If topic doesn't exist on remote, return empty
+                            if (ex.getCause() instanceof 
PulsarAdminException.NotFoundException
+                                    || ex instanceof 
PulsarAdminException.NotFoundException) {
+                                return Optional.empty();
+                            }
+                            throw new CompletionException(ex);
+                        });
+
+        return localMetadataFuture.thenCombine(remoteMetadataFuture, 
(localMetadataOpt, remoteMetadataOpt) -> {
+            // If topic doesn't exist on remote, validation passes
+            if (remoteMetadataOpt.isEmpty()) {
+                return null;
+            }
+
+            int localPartitions = localMetadataOpt.map(m -> 
m.partitions).orElse(0);
+            int remotePartitions = remoteMetadataOpt.get().partitions;
+
+            if (localPartitions != remotePartitions) {
+                String topicType = 
SystemTopicNames.isTopicPoliciesSystemTopic(topic)
+                        ? "__change_events system topic" : "topic";
+                throw new RestException(Status.CONFLICT,
+                        String.format("Partition count mismatch for %s '%s': 
local cluster has %d partitions, "
+                                        + "remote cluster '%s' has %d 
partitions. "
+                                        + "Please ensure partition counts are 
the same before enabling replication.",
+                                topicType, topic, localPartitions, 
remoteCluster, remotePartitions));
+            }
+            return null;
+        });
+    }
+
+    /**
+     * Validates that the effective auto-topic creation policies are the same 
between local and remote clusters.
+     * The effective policy is computed by: namespace-level policy overrides 
broker-level if it exists.
+     */
+    private CompletableFuture<Void> 
validateAutoTopicCreationCompatibility(PulsarAdmin remoteAdmin,
+                                                                            
String remoteCluster) {
+        String namespaceStr = namespaceName.toString();
+
+        // Get local broker config
+        ServiceConfiguration localConfig = pulsar().getConfiguration();
+        TopicType localBrokerAutoCreationType = 
localConfig.getAllowAutoTopicCreationType();
+        int localBrokerDefaultPartitions = 
localConfig.getDefaultNumPartitions();

Review Comment:
   > When both sides are effective non-partitioned, but defaultNumPartitions is 
different, why are they treated as incompatible?
   
   Pulsar does not allow to set a special default partition number with a 
default topic type Non-Partitioned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to