poorbarcode commented on code in PR #24136: URL: https://github.com/apache/pulsar/pull/24136#discussion_r2018339057
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java: ########## @@ -56,7 +63,112 @@ protected String getProducerName() { @Override protected CompletableFuture<Void> prepareCreateProducer() { if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) { - return CompletableFuture.completedFuture(null); + TopicName completeTopicName = TopicName.get(localTopicName); + TopicName baseTopicName; + if (completeTopicName.isPartitioned()) { + baseTopicName = TopicName.get(completeTopicName.getPartitionedTopicName()); + } else { + baseTopicName = completeTopicName; + } + // Set useFallbackForNonPIP344Brokers to true when mix of PIP-344 and non-PIP-344 brokers are used, it + // can still work. + return client.getLookup().getPartitionedTopicMetadata(baseTopicName, false, true) + .thenCompose((localMetadata) -> replicationAdmin.topics() + // https://github.com/apache/pulsar/pull/4963 + // Use the admin API instead of the client to fetch partitioned metadata + // to prevent automatic topic creation on the remote cluster. + // PIP-344 introduced an option to disable auto-creation when fetching partitioned + // topic metadata via the client, but this requires Pulsar 3.0.x. + // This change is a workaround to support Pulsar 2.4.2. + .getPartitionedTopicMetadataAsync(baseTopicName.toString()) + .exceptionally(ex -> { + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if (throwable instanceof NotFoundException) { + // Topic does not exist on the remote cluster. + return new PartitionedTopicMetadata(0); + } + throw new CompletionException("Failed to get partitioned topic metadata", throwable); + }).thenCompose(remoteMetadata -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Local metadata partitions: {} Remote metadata partitions: {}", + replicatorId, localMetadata.partitions, remoteMetadata.partitions); + } + + // Non-partitioned topic + if (localMetadata.partitions == 0) { + if (localMetadata.partitions == remoteMetadata.partitions) { + return replicationAdmin.topics().createNonPartitionedTopicAsync(localTopicName) + .exceptionally(ex -> { + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if (throwable instanceof ConflictException) { + // Topic already exists on the remote cluster. + return null; + } else { + throw new CompletionException( + "Failed to create non-partitioned topic", throwable); + } + }); + } else { + return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException( + "Topic type is not matched between local and remote cluster: local " + + "partitions: " + localMetadata.partitions + + ", remote partitions: " + remoteMetadata.partitions)); + } + } else { + if (remoteMetadata.partitions == 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] Creating partitioned topic {} with {} partitions", + replicatorId, baseTopicName, localMetadata.partitions); + } + // We maybe need to create a partitioned topic on remote cluster. + return replicationAdmin.topics() + .createPartitionedTopicAsync(baseTopicName.toString(), + localMetadata.partitions) + .exceptionally(ex -> { + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if (throwable instanceof ConflictException) { + // Topic already exists on the remote cluster. + // This can happen if the topic was created, or the topic is + // non-partitioned. + return null; + } else { + throw new CompletionException( + "Failed to create partitioned topic", throwable); + } + }) + .thenCompose((__) -> replicationAdmin.topics() + .getPartitionedTopicMetadataAsync(baseTopicName.toString())) + .thenCompose(metadata -> { + // Double check if the partitioned topic is created + // successfully. + // When partitions is equals to 0, it means this topic is + // non-partitioned, we should throw an exception. + if (completeTopicName.getPartitionIndex() >= metadata.partitions) { + return FutureUtil.failedFuture( + new PulsarClientException.NotAllowedException( + "Topic type is not matched between " + + "local and " + + "remote cluster: local " + + "partitions: " + + localMetadata.partitions + + ", remote partitions: " + + remoteMetadata.partitions)); + } + return CompletableFuture.completedFuture(null); + }); + } + // Update partitioned topic partitions if needed. + if (completeTopicName.getPartitionIndex() > remoteMetadata.partitions + && localMetadata.partitions > remoteMetadata.partitions) { + log.info("[{}] Updating partitioned topic {} to {} partitions", replicatorId, + baseTopicName, localMetadata.partitions); + return replicationAdmin.topics() + .updatePartitionedTopicAsync(baseTopicName.toString(), Review Comment: Please do not edit users' partition count automatically, it will break their consumption ordering when they use `Producer -> MessageRouter` or `Key_Shared` subscription. Instead of modifying users' partition count, we'd better add some checks and print suitbale errors when users enable Geo-Replication by calling the admin API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org