poorbarcode commented on code in PR #24136:
URL: https://github.com/apache/pulsar/pull/24136#discussion_r2018339057


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java:
##########
@@ -56,7 +63,112 @@ protected String getProducerName() {
     @Override
     protected CompletableFuture<Void> prepareCreateProducer() {
         if 
(brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication())
 {
-            return CompletableFuture.completedFuture(null);
+            TopicName completeTopicName = TopicName.get(localTopicName);
+            TopicName baseTopicName;
+            if (completeTopicName.isPartitioned()) {
+                baseTopicName = 
TopicName.get(completeTopicName.getPartitionedTopicName());
+            } else {
+                baseTopicName = completeTopicName;
+            }
+            // Set useFallbackForNonPIP344Brokers to true when mix of PIP-344 
and non-PIP-344 brokers are used, it
+            // can still work.
+            return 
client.getLookup().getPartitionedTopicMetadata(baseTopicName, false, true)
+                    .thenCompose((localMetadata) -> replicationAdmin.topics()
+                            // https://github.com/apache/pulsar/pull/4963
+                            // Use the admin API instead of the client to 
fetch partitioned metadata
+                            // to prevent automatic topic creation on the 
remote cluster.
+                            // PIP-344 introduced an option to disable 
auto-creation when fetching partitioned
+                            // topic metadata via the client, but this 
requires Pulsar 3.0.x.
+                            // This change is a workaround to support Pulsar 
2.4.2.
+                            
.getPartitionedTopicMetadataAsync(baseTopicName.toString())
+                            .exceptionally(ex -> {
+                                Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
+                                if (throwable instanceof NotFoundException) {
+                                    // Topic does not exist on the remote 
cluster.
+                                    return new PartitionedTopicMetadata(0);
+                                }
+                                throw new CompletionException("Failed to get 
partitioned topic metadata", throwable);
+                            }).thenCompose(remoteMetadata -> {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("[{}] Local metadata partitions: 
{} Remote metadata partitions: {}",
+                                            replicatorId, 
localMetadata.partitions, remoteMetadata.partitions);
+                                }
+
+                                // Non-partitioned topic
+                                if (localMetadata.partitions == 0) {
+                                    if (localMetadata.partitions == 
remoteMetadata.partitions) {
+                                        return 
replicationAdmin.topics().createNonPartitionedTopicAsync(localTopicName)
+                                                .exceptionally(ex -> {
+                                                    Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
+                                                    if (throwable instanceof 
ConflictException) {
+                                                        // Topic already 
exists on the remote cluster.
+                                                        return null;
+                                                    } else {
+                                                        throw new 
CompletionException(
+                                                                "Failed to 
create non-partitioned topic", throwable);
+                                                    }
+                                                });
+                                    } else {
+                                        return FutureUtil.failedFuture(new 
PulsarClientException.NotAllowedException(
+                                                "Topic type is not matched 
between local and remote cluster: local "
+                                                        + "partitions: " + 
localMetadata.partitions
+                                                        + ", remote 
partitions: " + remoteMetadata.partitions));
+                                    }
+                                } else {
+                                    if (remoteMetadata.partitions == 0) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] Creating 
partitioned topic {} with {} partitions",
+                                                    replicatorId, 
baseTopicName, localMetadata.partitions);
+                                        }
+                                        // We maybe need to create a 
partitioned topic on remote cluster.
+                                        return replicationAdmin.topics()
+                                                
.createPartitionedTopicAsync(baseTopicName.toString(),
+                                                        
localMetadata.partitions)
+                                                .exceptionally(ex -> {
+                                                    Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
+                                                    if (throwable instanceof 
ConflictException) {
+                                                        // Topic already 
exists on the remote cluster.
+                                                        // This can happen if 
the topic was created, or the topic is
+                                                        // non-partitioned.
+                                                        return null;
+                                                    } else {
+                                                        throw new 
CompletionException(
+                                                                "Failed to 
create partitioned topic", throwable);
+                                                    }
+                                                })
+                                                .thenCompose((__) -> 
replicationAdmin.topics()
+                                                        
.getPartitionedTopicMetadataAsync(baseTopicName.toString()))
+                                                .thenCompose(metadata -> {
+                                                    // Double check if the 
partitioned topic is created
+                                                    // successfully.
+                                                    // When partitions is 
equals to 0, it means this topic is
+                                                    // non-partitioned, we 
should throw an exception.
+                                                    if 
(completeTopicName.getPartitionIndex() >= metadata.partitions) {
+                                                        return 
FutureUtil.failedFuture(
+                                                                new 
PulsarClientException.NotAllowedException(
+                                                                        "Topic 
type is not matched between "
+                                                                               
 + "local and "
+                                                                               
 + "remote cluster: local "
+                                                                               
 + "partitions: "
+                                                                               
 + localMetadata.partitions
+                                                                               
 + ", remote partitions: "
+                                                                               
 + remoteMetadata.partitions));
+                                                    }
+                                                    return 
CompletableFuture.completedFuture(null);
+                                                });
+                                    }
+                                    // Update partitioned topic partitions if 
needed.
+                                    if (completeTopicName.getPartitionIndex() 
> remoteMetadata.partitions
+                                            && localMetadata.partitions > 
remoteMetadata.partitions) {
+                                        log.info("[{}] Updating partitioned 
topic {} to {} partitions", replicatorId,
+                                                baseTopicName, 
localMetadata.partitions);
+                                        return replicationAdmin.topics()
+                                                
.updatePartitionedTopicAsync(baseTopicName.toString(),

Review Comment:
   Please do not edit users' partition count automatically, it will break their 
consumption ordering when they use `Producer -> MessageRouter` or `Key_Shared` 
subscription.
   
   Instead of modifying users' partition count, we'd better add some checks and 
print suitbale errors when users enable Geo-Replication by calling the admin API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to