lhotari commented on code in PR #24136: URL: https://github.com/apache/pulsar/pull/24136#discussion_r2018978470
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java: ########## @@ -56,7 +63,112 @@ protected String getProducerName() { @Override protected CompletableFuture<Void> prepareCreateProducer() { if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) { - return CompletableFuture.completedFuture(null); + TopicName completeTopicName = TopicName.get(localTopicName); + TopicName baseTopicName; + if (completeTopicName.isPartitioned()) { + baseTopicName = TopicName.get(completeTopicName.getPartitionedTopicName()); + } else { + baseTopicName = completeTopicName; + } + // Set useFallbackForNonPIP344Brokers to true when mix of PIP-344 and non-PIP-344 brokers are used, it + // can still work. + return client.getLookup().getPartitionedTopicMetadata(baseTopicName, false, true) + .thenCompose((localMetadata) -> replicationAdmin.topics() + // https://github.com/apache/pulsar/pull/4963 + // Use the admin API instead of the client to fetch partitioned metadata + // to prevent automatic topic creation on the remote cluster. + // PIP-344 introduced an option to disable auto-creation when fetching partitioned + // topic metadata via the client, but this requires Pulsar 3.0.x. + // This change is a workaround to support Pulsar 2.4.2. + .getPartitionedTopicMetadataAsync(baseTopicName.toString()) + .exceptionally(ex -> { + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if (throwable instanceof NotFoundException) { + // Topic does not exist on the remote cluster. + return new PartitionedTopicMetadata(0); + } + throw new CompletionException("Failed to get partitioned topic metadata", throwable); + }).thenCompose(remoteMetadata -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Local metadata partitions: {} Remote metadata partitions: {}", + replicatorId, localMetadata.partitions, remoteMetadata.partitions); + } + + // Non-partitioned topic + if (localMetadata.partitions == 0) { + if (localMetadata.partitions == remoteMetadata.partitions) { + return replicationAdmin.topics().createNonPartitionedTopicAsync(localTopicName) + .exceptionally(ex -> { + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if (throwable instanceof ConflictException) { + // Topic already exists on the remote cluster. + return null; + } else { + throw new CompletionException( + "Failed to create non-partitioned topic", throwable); + } + }); + } else { + return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException( + "Topic type is not matched between local and remote cluster: local " + + "partitions: " + localMetadata.partitions + + ", remote partitions: " + remoteMetadata.partitions)); + } + } else { + if (remoteMetadata.partitions == 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] Creating partitioned topic {} with {} partitions", + replicatorId, baseTopicName, localMetadata.partitions); + } + // We maybe need to create a partitioned topic on remote cluster. + return replicationAdmin.topics() + .createPartitionedTopicAsync(baseTopicName.toString(), + localMetadata.partitions) + .exceptionally(ex -> { + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if (throwable instanceof ConflictException) { + // Topic already exists on the remote cluster. + // This can happen if the topic was created, or the topic is + // non-partitioned. + return null; + } else { + throw new CompletionException( + "Failed to create partitioned topic", throwable); + } + }) + .thenCompose((__) -> replicationAdmin.topics() + .getPartitionedTopicMetadataAsync(baseTopicName.toString())) + .thenCompose(metadata -> { + // Double check if the partitioned topic is created + // successfully. + // When partitions is equals to 0, it means this topic is + // non-partitioned, we should throw an exception. + if (completeTopicName.getPartitionIndex() >= metadata.partitions) { + return FutureUtil.failedFuture( + new PulsarClientException.NotAllowedException( + "Topic type is not matched between " + + "local and " + + "remote cluster: local " + + "partitions: " + + localMetadata.partitions + + ", remote partitions: " + + remoteMetadata.partitions)); + } + return CompletableFuture.completedFuture(null); + }); + } + // Update partitioned topic partitions if needed. + if (completeTopicName.getPartitionIndex() > remoteMetadata.partitions + && localMetadata.partitions > remoteMetadata.partitions) { + log.info("[{}] Updating partitioned topic {} to {} partitions", replicatorId, + baseTopicName, localMetadata.partitions); + return replicationAdmin.topics() + .updatePartitionedTopicAsync(baseTopicName.toString(), Review Comment: > Before this PR, Pulsar would never modify customers' partition count without users' permission, which I have described in the previous comments, see [#24136 (comment)](https://github.com/apache/pulsar/pull/24136#discussion_r2018451234) @poorbarcode I'd say that it's unexpected behavior when `createTopicToRemoteClusterForReplication=true` since it breaks replication if the partitions aren't created. However, I think that there's already existing code elsewhere in the replication implementation that modifies the partitions and creates them if they don't exist. @nodece did you check if that exists already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org