nodece commented on code in PR #22983:
URL: https://github.com/apache/pulsar/pull/22983#discussion_r1666970494
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -184,8 +187,42 @@ public void startProducer() {
}
log.info("[{}] Starting replicator", replicatorId);
- producerBuilder.createAsync().thenAccept(producer -> {
- setProducerAndTriggerReadEntries(producer);
+ CompletableFuture<Void> checkPartitionsSameFuture = new
CompletableFuture<>();
+ replicationClient.getPartitionedTopicMetadata(remoteTopicName,
false).thenAccept(metadata -> {
+ // If there is an exists partitioned topic on the remote cluster,
report an error.
+ if (metadata.partitions != 0) {
+ log.error("[{}] The partitions count between two clusters is
not the same(remote partitions: {}),"
+ + " the replicator can not be created
successfully: {}", replicatorId, metadata.partitions,
+ state);
+ // This exception will be caught below, so it can be any typed.
+ checkPartitionsSameFuture.completeExceptionally(new
RuntimeException(replicatorId
+ + "Can not replicate data to a partitioned topic."));
+ } else {
+ checkPartitionsSameFuture.complete(null);
+ }
+ }).exceptionally(ex -> {
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ if (actEx instanceof PulsarClientException.NotFoundException
+ || actEx instanceof
PulsarClientException.TopicDoesNotExistException
+ || actEx instanceof
PulsarAdminException.NotFoundException) {
+ // These 3 error means the topic has not been created on the
remote cluster yet, and the current
+ // replicator will trigger an event to create it. So it is
okay.
+ checkPartitionsSameFuture.complete(null);
+ } else {
+ log.warn("[{}] Failed to create remote producer due to get
partitioned metadata failed",
+ replicatorId, ex);
+ checkPartitionsSameFuture.completeExceptionally(ex);
+ }
+ return null;
+ });
+ checkPartitionsSameFuture.thenCompose(metadata -> {
+ // Force only replicate messages to a non-partitioned topic, to
avoid auto-create a partitioned topic on
+ // the remote cluster.
+ ProducerBuilderImpl builderImpl = (ProducerBuilderImpl)
producerBuilder;
+ builderImpl.getConf().setForceOnoPartitioned(true);
Review Comment:
This looks like a hack code.
Would you send the message to one topic? and skip the metadata fetching?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]