lhotari commented on code in PR #24136:
URL: https://github.com/apache/pulsar/pull/24136#discussion_r2018978470


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java:
##########
@@ -56,7 +63,112 @@ protected String getProducerName() {
     @Override
     protected CompletableFuture<Void> prepareCreateProducer() {
         if 
(brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication())
 {
-            return CompletableFuture.completedFuture(null);
+            TopicName completeTopicName = TopicName.get(localTopicName);
+            TopicName baseTopicName;
+            if (completeTopicName.isPartitioned()) {
+                baseTopicName = 
TopicName.get(completeTopicName.getPartitionedTopicName());
+            } else {
+                baseTopicName = completeTopicName;
+            }
+            // Set useFallbackForNonPIP344Brokers to true when mix of PIP-344 
and non-PIP-344 brokers are used, it
+            // can still work.
+            return 
client.getLookup().getPartitionedTopicMetadata(baseTopicName, false, true)
+                    .thenCompose((localMetadata) -> replicationAdmin.topics()
+                            // https://github.com/apache/pulsar/pull/4963
+                            // Use the admin API instead of the client to 
fetch partitioned metadata
+                            // to prevent automatic topic creation on the 
remote cluster.
+                            // PIP-344 introduced an option to disable 
auto-creation when fetching partitioned
+                            // topic metadata via the client, but this 
requires Pulsar 3.0.x.
+                            // This change is a workaround to support Pulsar 
2.4.2.
+                            
.getPartitionedTopicMetadataAsync(baseTopicName.toString())
+                            .exceptionally(ex -> {
+                                Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
+                                if (throwable instanceof NotFoundException) {
+                                    // Topic does not exist on the remote 
cluster.
+                                    return new PartitionedTopicMetadata(0);
+                                }
+                                throw new CompletionException("Failed to get 
partitioned topic metadata", throwable);
+                            }).thenCompose(remoteMetadata -> {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("[{}] Local metadata partitions: 
{} Remote metadata partitions: {}",
+                                            replicatorId, 
localMetadata.partitions, remoteMetadata.partitions);
+                                }
+
+                                // Non-partitioned topic
+                                if (localMetadata.partitions == 0) {
+                                    if (localMetadata.partitions == 
remoteMetadata.partitions) {
+                                        return 
replicationAdmin.topics().createNonPartitionedTopicAsync(localTopicName)
+                                                .exceptionally(ex -> {
+                                                    Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
+                                                    if (throwable instanceof 
ConflictException) {
+                                                        // Topic already 
exists on the remote cluster.
+                                                        return null;
+                                                    } else {
+                                                        throw new 
CompletionException(
+                                                                "Failed to 
create non-partitioned topic", throwable);
+                                                    }
+                                                });
+                                    } else {
+                                        return FutureUtil.failedFuture(new 
PulsarClientException.NotAllowedException(
+                                                "Topic type is not matched 
between local and remote cluster: local "
+                                                        + "partitions: " + 
localMetadata.partitions
+                                                        + ", remote 
partitions: " + remoteMetadata.partitions));
+                                    }
+                                } else {
+                                    if (remoteMetadata.partitions == 0) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] Creating 
partitioned topic {} with {} partitions",
+                                                    replicatorId, 
baseTopicName, localMetadata.partitions);
+                                        }
+                                        // We maybe need to create a 
partitioned topic on remote cluster.
+                                        return replicationAdmin.topics()
+                                                
.createPartitionedTopicAsync(baseTopicName.toString(),
+                                                        
localMetadata.partitions)
+                                                .exceptionally(ex -> {
+                                                    Throwable throwable = 
FutureUtil.unwrapCompletionException(ex);
+                                                    if (throwable instanceof 
ConflictException) {
+                                                        // Topic already 
exists on the remote cluster.
+                                                        // This can happen if 
the topic was created, or the topic is
+                                                        // non-partitioned.
+                                                        return null;
+                                                    } else {
+                                                        throw new 
CompletionException(
+                                                                "Failed to 
create partitioned topic", throwable);
+                                                    }
+                                                })
+                                                .thenCompose((__) -> 
replicationAdmin.topics()
+                                                        
.getPartitionedTopicMetadataAsync(baseTopicName.toString()))
+                                                .thenCompose(metadata -> {
+                                                    // Double check if the 
partitioned topic is created
+                                                    // successfully.
+                                                    // When partitions is 
equals to 0, it means this topic is
+                                                    // non-partitioned, we 
should throw an exception.
+                                                    if 
(completeTopicName.getPartitionIndex() >= metadata.partitions) {
+                                                        return 
FutureUtil.failedFuture(
+                                                                new 
PulsarClientException.NotAllowedException(
+                                                                        "Topic 
type is not matched between "
+                                                                               
 + "local and "
+                                                                               
 + "remote cluster: local "
+                                                                               
 + "partitions: "
+                                                                               
 + localMetadata.partitions
+                                                                               
 + ", remote partitions: "
+                                                                               
 + remoteMetadata.partitions));
+                                                    }
+                                                    return 
CompletableFuture.completedFuture(null);
+                                                });
+                                    }
+                                    // Update partitioned topic partitions if 
needed.
+                                    if (completeTopicName.getPartitionIndex() 
> remoteMetadata.partitions
+                                            && localMetadata.partitions > 
remoteMetadata.partitions) {
+                                        log.info("[{}] Updating partitioned 
topic {} to {} partitions", replicatorId,
+                                                baseTopicName, 
localMetadata.partitions);
+                                        return replicationAdmin.topics()
+                                                
.updatePartitionedTopicAsync(baseTopicName.toString(),

Review Comment:
   > Before this PR, Pulsar would never modify customers' partition count 
without users' permission, which I have described in the previous comments, see 
[#24136 
(comment)](https://github.com/apache/pulsar/pull/24136#discussion_r2018451234)
   
   @poorbarcode I'd say that it's unexpected behavior when 
`createTopicToRemoteClusterForReplication=true` since it breaks replication if 
the partitions aren't created. However, I think that there's already existing 
code elsewhere in the replication implementation that modifies the partitions 
and creates them if they don't exist. @nodece did you check if that exists 
already?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to