This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fca90d14fc4 [improve][broker] PIP-433: Ensure topic creation before
starting GEO (#24652)
fca90d14fc4 is described below
commit fca90d14fc45dbe4b464be4ad66bac6703b8ded1
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Jan 19 11:57:40 2026 +0800
[improve][broker] PIP-433: Ensure topic creation before starting GEO
(#24652)
Co-authored-by: fengyubiao <[email protected]>
---
.../pulsar/broker/service/AbstractReplicator.java | 8 +-
.../nonpersistent/NonPersistentReplicator.java | 6 +-
.../service/nonpersistent/NonPersistentTopic.java | 13 +-
.../persistent/GeoPersistentReplicator.java | 103 ++++++++---
.../service/persistent/PersistentReplicator.java | 8 +-
.../broker/service/persistent/PersistentTopic.java | 23 +--
.../service/persistent/ShadowReplicator.java | 6 +-
.../broker/service/AbstractReplicatorTest.java | 21 ++-
.../broker/service/OneWayReplicatorTest.java | 193 +++++++++++++++++++++
.../pulsar/broker/service/PersistentTopicTest.java | 47 ++++-
10 files changed, 374 insertions(+), 54 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index f996d328090..c7a36ad1b21 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -57,6 +58,8 @@ public abstract class AbstractReplicator implements
Replicator {
protected final String remoteCluster;
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;
+ protected final PulsarAdmin replicationAdmin;
+ protected final PulsarAdmin admin;
protected String replicatorId;
@Getter
protected final Topic localTopic;
@@ -107,7 +110,8 @@ public abstract class AbstractReplicator implements
Replicator {
}
public AbstractReplicator(String localCluster, Topic localTopic, String
remoteCluster, String remoteTopicName,
- String replicatorPrefix, BrokerService
brokerService, PulsarClientImpl replicationClient)
+ String replicatorPrefix, BrokerService
brokerService, PulsarClientImpl replicationClient,
+ PulsarAdmin replicationAdmin)
throws PulsarServerException {
this.brokerService = brokerService;
this.localTopic = localTopic;
@@ -117,7 +121,9 @@ public abstract class AbstractReplicator implements
Replicator {
this.remoteTopicName = remoteTopicName;
this.remoteCluster = StringInterner.intern(remoteCluster);
this.replicationClient = replicationClient;
+ this.replicationAdmin = replicationAdmin;
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
+ this.admin = brokerService.pulsar().getAdminClient();
this.producer = null;
this.producerQueueSize =
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
this.replicatorId = String.format("%s | %s",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 38e1894c178..38320e5be70 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Replicator;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -51,9 +52,10 @@ public class NonPersistentReplicator extends
AbstractReplicator implements Repli
private final NonPersistentReplicatorStatsImpl stats = new
NonPersistentReplicatorStatsImpl();
public NonPersistentReplicator(NonPersistentTopic topic, String
localCluster, String remoteCluster,
- BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
+ BrokerService brokerService,
PulsarClientImpl replicationClient,
+ PulsarAdmin replicationAdmin) throws
PulsarServerException {
super(localCluster, topic, remoteCluster, topic.getName(),
topic.getReplicatorPrefix(), brokerService,
- replicationClient);
+ replicationClient, replicationAdmin);
// NonPersistentReplicator does not support limitation so far, so
reset pending queue size to the default value.
producerBuilder.maxPendingMessages(1000);
producerBuilder.blockIfQueueFull(false);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 6021c41142a..96a9f97d70f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -71,7 +71,9 @@ import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
@@ -628,14 +630,15 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
String localCluster) {
return
AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(),
brokerService)
.thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getClusterResources()
- .getClusterAsync(remoteCluster)
- .thenApply(clusterData ->
-
brokerService.getReplicationClient(remoteCluster, clusterData)))
- .thenAccept(replicationClient -> {
+ .getClusterAsync(remoteCluster))
+ .thenAccept((clusterData) -> {
+ PulsarClient replicationClient =
brokerService.getReplicationClient(remoteCluster, clusterData);
+ PulsarAdmin replicationAdmin =
brokerService.getClusterPulsarAdmin(remoteCluster, clusterData);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new
NonPersistentReplicator(NonPersistentTopic.this, localCluster,
- remoteCluster, brokerService,
(PulsarClientImpl) replicationClient);
+ remoteCluster, brokerService,
(PulsarClientImpl) replicationClient,
+ replicationAdmin);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}",
topic, remoteCluster, e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 46f8a27d580..922a0e42c3d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -27,10 +27,12 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;
@@ -40,9 +42,10 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor
cursor, String localCluster,
String remoteCluster, BrokerService
brokerService,
- PulsarClientImpl replicationClient)
+ PulsarClientImpl replicationClient,
PulsarAdmin replicationAdmin)
throws PulsarServerException {
- super(localCluster, topic, cursor, remoteCluster, topic.getName(),
brokerService, replicationClient);
+ super(localCluster, topic, cursor, remoteCluster, topic.getName(),
brokerService, replicationClient,
+ replicationAdmin);
}
/**
@@ -55,29 +58,85 @@ public class GeoPersistentReplicator extends
PersistentReplicator {
@Override
protected CompletableFuture<Void> prepareCreateProducer() {
- if
(brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication())
{
- return CompletableFuture.completedFuture(null);
- } else {
- CompletableFuture<Void> topicCheckFuture = new
CompletableFuture<>();
-
replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false,
false)
- .whenComplete((metadata, ex) -> {
- if (ex == null) {
- if (metadata.partitions == 0) {
- topicCheckFuture.complete(null);
- } else {
- String errorMsg = String.format("%s Can not create the
replicator due to the partitions in the"
- + " remote cluster is not 0, but is
%s",
- replicatorId, metadata.partitions);
- log.error(errorMsg);
- topicCheckFuture.completeExceptionally(
- new
PulsarClientException.NotAllowedException(errorMsg));
+ return
createRemoteTopicIfDoesNotExist(TopicName.get(localTopicName).getPartitionedTopicName());
+ }
+
+ private CompletableFuture<Integer> getLocalPartitionMetadata(String topic)
{
+ return admin.topics().getPartitionedTopicMetadataAsync(topic)
+ .thenApply(metadata -> metadata.partitions)
+ .exceptionallyCompose(t -> {
+ Throwable actEx = FutureUtil.unwrapCompletionException(t);
+ if (actEx instanceof
PulsarAdminException.NotFoundException) {
+ // Legacy edge case: Local topic is non-partitioned
but name ends with "-partition-{num}".
+ // This should never happen in practice because
PIP-414 disables this naming pattern.
+ return createRemoteTopicIfDoesNotExist(localTopicName)
+ .thenApply(__ -> -1); // Special marker
}
- } else {
-
topicCheckFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
+ return CompletableFuture.failedFuture(actEx);
+ });
+ }
+
+ private CompletableFuture<Integer> getRemotePartitionMetadata(String
topic) {
+ return
replicationAdmin.topics().getPartitionedTopicMetadataAsync(topic)
+ .thenApply(metadata -> metadata.partitions)
+ .exceptionallyCompose(t -> {
+ Throwable actEx = FutureUtil.unwrapCompletionException(t);
+ if (actEx instanceof
PulsarAdminException.NotFoundException) {
+ return CompletableFuture.completedFuture(-1); // Topic
doesn't exist
+ }
+ return CompletableFuture.failedFuture(actEx);
+ });
+ }
+
+ private CompletableFuture<Void> handlePartitionComparison(String topic,
int localPartitions, int remotePartitions) {
+ // Skip if already handled by recursion
+ if (localPartitions == -1) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Remote topic doesn't exist - create it
+ if (remotePartitions == -1) {
+ if
(!brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication())
{
+ String errorMsg = String.format("[%s] Can not start replicator
because there is no topic on"
+ + " the remote cluster. Please create a %s on
the remote cluster",
+ replicatorId, localPartitions == 0 ? "non-partitioned
topic"
+ : "partitioned topic with " + localPartitions
+ " partitions");
+ log.error(errorMsg);
+ return CompletableFuture.failedFuture(new
PulsarServerException(errorMsg));
+ }
+
+ CompletableFuture<Void> createFuture = localPartitions == 0
+ ?
replicationAdmin.topics().createNonPartitionedTopicAsync(topic)
+ :
replicationAdmin.topics().createPartitionedTopicAsync(topic, localPartitions);
+
+ return createFuture.whenComplete((__, t) -> {
+ if (t != null) {
+ log.error("[{}] Failed to create topic on remote cluster.
Local has {} partitions",
+ replicatorId, localPartitions, t);
}
});
- return topicCheckFuture;
}
+
+ // Both exist - verify compatibility
+ if (localPartitions == remotePartitions) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Incompatible partitions
+ String errorMsg = String.format("[%s] Can not start replicator because
the partitions between"
+ + " local and remote cluster are different. local: %s,
remote: %s",
+ replicatorId, localPartitions, remotePartitions);
+ log.error(errorMsg);
+ return CompletableFuture.failedFuture(new
PulsarServerException(errorMsg));
+ }
+
+ private CompletableFuture<Void> createRemoteTopicIfDoesNotExist(String
partitionedTopic) {
+ return getLocalPartitionMetadata(partitionedTopic)
+ .thenCompose(localPartitions ->
+
getRemotePartitionMetadata(partitionedTopic).thenCompose(remotePartitions ->
+ handlePartitionComparison(partitionedTopic,
localPartitions, remotePartitions)
+ )
+ );
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index e0a31476fc9..c1d73cd3891 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Replicator;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -119,11 +120,12 @@ public abstract class PersistentReplicator extends
AbstractReplicator
protected final LinkedList<InFlightTask> inFlightTasks = new
LinkedList<>();
public PersistentReplicator(String localCluster, PersistentTopic
localTopic, ManagedCursor cursor,
- String remoteCluster, String remoteTopic,
- BrokerService brokerService,
PulsarClientImpl replicationClient)
+ String remoteCluster, String remoteTopic,
+ BrokerService brokerService, PulsarClientImpl
replicationClient,
+ PulsarAdmin replicationAdmin)
throws PulsarServerException {
super(localCluster, localTopic, remoteCluster, remoteTopic,
localTopic.getReplicatorPrefix(),
- brokerService, replicationClient);
+ brokerService, replicationClient, replicationAdmin);
this.topic = localTopic;
this.localSchemaTopicName =
TopicName.getPartitionedTopicName(localTopicName).toString();
this.cursor = Objects.requireNonNull(cursor);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 451471e215e..5a1e0e940a8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -148,6 +148,7 @@ import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -2343,11 +2344,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
String localCluster) {
return
AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(),
brokerService)
.thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getClusterResources()
- .getClusterAsync(remoteCluster)
- .thenApply(clusterData ->
-
brokerService.getReplicationClient(remoteCluster, clusterData)))
- .thenAccept(replicationClient -> {
- if (replicationClient == null) {
+ .getClusterAsync(remoteCluster))
+ .thenAccept((clusterData) -> {
+ PulsarClient replicationClient =
brokerService.getReplicationClient(remoteCluster, clusterData);
+ PulsarAdmin replicationAdmin =
brokerService.getClusterPulsarAdmin(remoteCluster, clusterData);
+ if (replicationClient == null || replicationAdmin == null)
{
log.error("[{}] Can not create replicator because the
remote client can not be created."
+ " remote cluster: {}. State of
transferring : {}",
topic, remoteCluster, transferring);
@@ -2365,7 +2366,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
Replicator replicator =
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new
GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
- remoteCluster, brokerService,
(PulsarClientImpl) replicationClient);
+ remoteCluster, brokerService,
(PulsarClientImpl) replicationClient,
+ replicationAdmin);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}",
topic, remoteCluster, e);
}
@@ -2431,9 +2433,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
return
AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(),
brokerService)
.thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getClusterResources()
- .getClusterAsync(localCluster)
- .thenApply(clusterData ->
brokerService.getReplicationClient(localCluster, clusterData)))
- .thenAccept(replicationClient -> {
+ .getClusterAsync(localCluster))
+ .thenAccept((clusterData) -> {
+ PulsarClient replicationClient =
brokerService.getReplicationClient(localCluster, clusterData);
+ PulsarAdmin replicationAdmin =
brokerService.getClusterPulsarAdmin(localCluster, clusterData);
Replicator replicator =
shadowReplicators.computeIfAbsent(shadowTopic, r -> {
try {
TopicName sourceTopicName =
TopicName.get(getName());
@@ -2442,7 +2445,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
shadowPartitionTopic += "-partition-" +
sourceTopicName.getPartitionIndex();
}
return new ShadowReplicator(shadowPartitionTopic,
PersistentTopic.this, cursor,
- brokerService, (PulsarClientImpl)
replicationClient);
+ brokerService, (PulsarClientImpl)
replicationClient, replicationAdmin);
} catch (PulsarServerException e) {
log.error("[{}] ShadowReplicator startup failed
{}", topic, shadowTopic, e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index a334fd86dd0..2e5e91fb9a6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -39,11 +40,12 @@ import org.apache.pulsar.common.util.Codec;
public class ShadowReplicator extends PersistentReplicator {
public ShadowReplicator(String shadowTopic, PersistentTopic sourceTopic,
ManagedCursor cursor,
- BrokerService brokerService, PulsarClientImpl
replicationClient)
+ BrokerService brokerService, PulsarClientImpl
replicationClient,
+ PulsarAdmin replicationAdmin)
throws PulsarServerException {
super(brokerService.pulsar().getConfiguration().getClusterName(),
sourceTopic, cursor,
brokerService.pulsar().getConfiguration().getClusterName(),
shadowTopic, brokerService,
- replicationClient);
+ replicationClient, replicationAdmin);
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 39252ac0a94..5f1d3a8a6c5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.channel.DefaultEventLoop;
@@ -39,12 +40,15 @@ import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
@@ -94,9 +98,19 @@ public class AbstractReplicatorTest {
when(producerBuilder.create()).thenThrow(new RuntimeException("mocked
ex"));
when(producerBuilder.createAsync())
.thenReturn(CompletableFuture.failedFuture(new
RuntimeException("mocked ex")));
+
+ @Cleanup
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ Topics adminTopics = mock(Topics.class);
+ doReturn(adminTopics).when(admin).topics();
+ doReturn(CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0))).when(adminTopics)
+ .getPartitionedTopicMetadataAsync(anyString());
+ doReturn(CompletableFuture.completedFuture(null)).when(adminTopics)
+ .createNonPartitionedTopicAsync(anyString());
+
// Make race condition: "retry start producer" and "close replicator".
final ReplicatorInTest replicator = new ReplicatorInTest(localCluster,
localTopic, remoteCluster, topicName,
- replicatorPrefix, broker, remoteClient);
+ replicatorPrefix, broker, remoteClient, admin);
replicator.startProducer();
replicator.terminate();
@@ -122,9 +136,10 @@ public class AbstractReplicatorTest {
public ReplicatorInTest(String localCluster, Topic localTopic, String
remoteCluster, String remoteTopicName,
String replicatorPrefix, BrokerService
brokerService,
- PulsarClientImpl replicationClient) throws
PulsarServerException {
+ PulsarClientImpl replicationClient,
PulsarAdmin replicationAdmin)
+ throws PulsarServerException {
super(localCluster, localTopic, remoteCluster, remoteTopicName,
replicatorPrefix, brokerService,
- replicationClient);
+ replicationClient, replicationAdmin);
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 8a80f57f5b2..8971e42f1d1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -63,6 +64,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
+import lombok.Cleanup;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -90,6 +92,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
@@ -108,6 +111,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TopicType;
import
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -1889,4 +1893,193 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
// Verify: all inflight tasks are done.
ensureNoBacklogByInflightTask(getReplicator(topicName));
}
+
+ @DataProvider
+ public Object[] isPartitioned() {
+ return new Object[]{
+ true,
+ false
+ };
+ }
+
+ @Test(dataProvider = "isPartitioned")
+ public void testReplicatorCreateTopic(boolean isPartitioned) throws
Exception {
+ String ns = defaultTenant + "/" +
UUID.randomUUID().toString().replace("-", "");
+ admin1.namespaces().createNamespace(ns);
+ if (!usingGlobalZK){
+ admin2.namespaces().createNamespace(ns);
+ }
+
+ int numPartitions = 4;
+ List<String> partitions = new ArrayList<>();
+ final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns +
"/tp_");
+ if (isPartitioned) {
+ admin1.topics().createPartitionedTopic(tp, numPartitions);
+ for (int i = 0; i < numPartitions; i++) {
+ partitions.add(TopicName.getTopicPartitionNameString(tp, i));
+ }
+ } else {
+ admin1.topics().createNonPartitionedTopic(tp);
+ }
+
+ admin1.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic) broker1.getTopic(
+ isPartitioned ?
TopicName.get(tp).getPartition(0).toString() : tp, false).join()
+ .get();
+ assertFalse(persistentTopic.getReplicators().isEmpty());
+ });
+
+ @Cleanup
+ Producer<String> p1 =
client1.newProducer(Schema.STRING).topic(tp).create();
+ p1.send("msg-1");
+
+ Awaitility.await().untilAsserted(() -> {
+ List<String> partitionedTopicList =
admin2.topics().getPartitionedTopicList(ns);
+ if (isPartitioned) {
+ assertThat(partitionedTopicList).contains(tp);
+
assertThat(admin2.topics().getList(ns)).containsAll(partitions);
+ } else {
+ assertThat(partitionedTopicList).doesNotContain(tp);
+ assertThat(admin2.topics().getList(ns)).contains(tp);
+ }
+ });
+ }
+
+ @Test
+ public void
testReplicatorCreateTopicWhenTopicExistsWithDifferentTypeAcrossClusters()
throws Exception {
+ if (usingGlobalZK) {
+ // This test case is not applicable when using global ZK, because
the namespace policies
+ // are shared among clusters.
+ return;
+ }
+
+ String ns = defaultTenant + "/" +
UUID.randomUUID().toString().replace("-", "");
+ admin1.namespaces().createNamespace(ns);
+ admin2.namespaces().createNamespace(ns);
+
+ final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns +
"/tp_");
+ admin1.topics().createPartitionedTopic(tp, 4);
+ admin2.topics().createNonPartitionedTopic(tp);
+
+ admin1.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ admin2.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+ .get();
+ assertFalse(persistentTopic.getReplicators().isEmpty());
+ });
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic = (PersistentTopic)
broker2.getTopic(tp, false).join().get();
+ assertFalse(persistentTopic.getReplicators().isEmpty());
+ });
+
+ @Cleanup
+ Producer<String> p1 =
client1.newProducer(Schema.STRING).topic(tp).create();
+ p1.send("msg-p1-1");
+ @Cleanup
+ Producer<String> p2 =
client2.newProducer(Schema.STRING).topic(tp).create();
+ p2.send("msg-p2-1");
+
+ // The topic exists, but its type differs between the local and remote
clusters. The replicator should not
+ // recreate the topic.
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+ .get();
+ persistentTopic.getReplicators().forEach((key, value) -> {
+ assertFalse(value.isConnected());
+ });
+ });
+
assertThat(admin2.topics().getPartitionedTopicList(ns)).doesNotContain(tp);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic = (PersistentTopic)
broker2.getTopic(tp, false).join().get();
+ persistentTopic.getReplicators().forEach((key, value) -> {
+ assertFalse(value.isConnected());
+ });
+ });
+ assertThat(admin1.topics().getList(ns)).doesNotContain(tp);
+ }
+
+ @Test
+ public void testReplicatorWhenPartitionCountsDiffer() throws Exception {
+ if (usingGlobalZK) {
+ // This test case is not applicable when using global ZK, because
the namespace policies
+ // are shared among clusters.
+ return;
+ }
+
+ String ns = defaultTenant + "/" +
UUID.randomUUID().toString().replace("-", "");
+
+ admin1.namespaces().createNamespace(ns);
+ admin1.namespaces().setAutoTopicCreation(ns,
AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(true)
+ .topicType(TopicType.PARTITIONED.toString())
+ .defaultNumPartitions(12)
+ .build());
+
+ admin2.namespaces().createNamespace(ns);
+ admin2.namespaces().setAutoTopicCreation(ns,
AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(true)
+ .topicType(TopicType.NON_PARTITIONED.toString())
+ .build());
+
+ final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns +
"/tp_");
+ admin1.topics().createPartitionedTopic(tp, 4);
+ admin2.topics().createPartitionedTopic(tp, 8);
+
+ admin1.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ admin2.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+ .get();
+ assertFalse(persistentTopic.getReplicators().isEmpty());
+ });
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
broker2.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+ .get();
+ assertFalse(persistentTopic.getReplicators().isEmpty());
+ });
+
+ // Trigger the replicator.
+ @Cleanup
+ Producer<String> p1 =
client1.newProducer(Schema.STRING).topic(tp).create();
+ p1.send("msg-p1-1");
+ @Cleanup
+ Producer<String> p2 =
client2.newProducer(Schema.STRING).topic(tp).create();
+ p2.send("msg-p2-1");
+
+ // Topic partition counts differ between the local and remote clusters.
+ // The replicator should not replicate the messages.
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+ .get();
+ persistentTopic.getReplicators().forEach((key, value) -> {
+ assertFalse(value.isConnected());
+ });
+ });
+
+ @Cleanup
+ Consumer<String> c2 =
client2.newConsumer(Schema.STRING).topic(tp).subscriptionName("test-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+ while (true) {
+ Message<String> receive = c2.receive(3, TimeUnit.SECONDS);
+ if (receive == null) {
+ break;
+ }
+ assertEquals(receive.getValue(), "msg-p2-1");
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 3b64b2ecc2c..c83f3750ff0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -111,11 +111,14 @@ import
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -130,6 +133,8 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
@@ -233,6 +238,13 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(nsSvc)
.checkTopicExistsAsync(any());
+ PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ LookupService lookupService = mock(LookupService.class);
+ doReturn(CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0))).when(lookupService)
+ .getPartitionedTopicMetadata(any(), anyBoolean(),
anyBoolean());
+ doReturn(lookupService).when(pulsarClient).getLookup();
+
doReturn(pulsarClient).when(pulsarTestContext.getPulsarService()).getClient();
+
setupMLAsyncCallbackMocks();
}
@@ -1653,6 +1665,16 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertNull(topic2.getSubscription(successSubName));
}
+ private PulsarAdmin mockReplicationAdmin() {
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ Topics topics = mock(Topics.class);
+ doReturn(topics).when(admin).topics();
+ doReturn(CompletableFuture.completedFuture(new
PartitionedTopicMetadata(0))).when(topics)
+ .getPartitionedTopicMetadataAsync(anyString());
+
doReturn(CompletableFuture.completedFuture(null)).when(topics).createNonPartitionedTopicAsync(anyString());
+ return admin;
+ }
+
/**
* NonPersistentReplicator.removeReplicator doesn't remove replicator in
atomic way and does in multiple step:
* 1. disconnect replicator producer
@@ -1699,11 +1721,18 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
return producerBuilder;
});
brokerService.getReplicationClients().put(remoteCluster,
pulsarClientMock);
+
+ @Cleanup
+ PulsarAdmin admin = mockReplicationAdmin();
+ PulsarService pulsar = brokerService.getPulsar();
+ doReturn(admin).when(pulsar).getAdminClient();
+ brokerService.getClusterAdmins().put(remoteCluster, admin);
+ Optional<ClusterData> clusterData =
brokerService.pulsar().getPulsarResources().getClusterResources()
+ .getCluster(remoteCluster);
PersistentReplicator replicator = spy(
new GeoPersistentReplicator(topic, cursor, localCluster,
remoteCluster, brokerService,
- (PulsarClientImpl)
brokerService.getReplicationClient(remoteCluster,
-
brokerService.pulsar().getPulsarResources().getClusterResources()
- .getCluster(remoteCluster))));
+ (PulsarClientImpl)
brokerService.getReplicationClient(remoteCluster, clusterData),
+ brokerService.getClusterPulsarAdmin(remoteCluster,
clusterData)));
replicatorMap.put(remoteReplicatorName, replicator);
// step-1 remove replicator : it will disconnect the producer but it
will wait for callback to be completed
@@ -1751,10 +1780,16 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
brokerService.getReplicationClients().put(remoteCluster, client);
+
+ @Cleanup
+ PulsarAdmin admin = mockReplicationAdmin();
+ doReturn(admin).when(pulsar).getAdminClient();
+ brokerService.getClusterAdmins().put(remoteCluster, admin);
+ Optional<ClusterData> clusterData =
brokerService.pulsar().getPulsarResources().getClusterResources()
+ .getCluster(remoteCluster);
PersistentReplicator replicator = new GeoPersistentReplicator(topic,
cursor, localCluster, remoteCluster,
- brokerService, (PulsarClientImpl)
brokerService.getReplicationClient(remoteCluster,
-
brokerService.pulsar().getPulsarResources().getClusterResources()
- .getCluster(remoteCluster)));
+ brokerService, (PulsarClientImpl)
brokerService.getReplicationClient(remoteCluster, clusterData),
+ brokerService.getClusterPulsarAdmin(remoteCluster,
clusterData));
// PersistentReplicator constructor calls startProducer()
verify(clientImpl)