This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 04d1225fbb1 [feat] [broker] PIP-188 Fix cluster migration state store
into local metadatastore (#21359)
04d1225fbb1 is described below
commit 04d1225fbb1485333f44138e587aadce34ea1f0e
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Oct 24 23:59:09 2023 -0700
[feat] [broker] PIP-188 Fix cluster migration state store into local
metadatastore (#21359)
Co-authored-by: Rajan Dhabalia <[email protected]>
---
.../pulsar/broker/resources/ClusterResources.java | 40 ++++++-
.../pulsar/broker/resources/PulsarResources.java | 3 +-
.../pulsar/broker/admin/impl/ClustersBase.java | 44 ++++++-
.../pulsar/broker/service/AbstractTopic.java | 31 +++--
.../org/apache/pulsar/broker/service/Consumer.java | 2 +-
.../org/apache/pulsar/broker/service/Producer.java | 2 +-
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../broker/service/ClusterMigrationTest.java | 128 +++++++++------------
.../org/apache/pulsar/client/admin/Clusters.java | 43 ++++++-
.../pulsar/common/policies/data/ClusterData.java | 26 -----
.../common/policies/data/ClusterPolicies.java | 61 ++++++++++
.../pulsar/client/admin/internal/ClustersImpl.java | 16 ++-
.../org/apache/pulsar/admin/cli/CmdClusters.java | 13 ++-
.../common/policies/data/ClusterDataImpl.java | 31 +----
.../common/policies/data/ClusterPoliciesImpl.java | 85 ++++++++++++++
.../common/policies/data/ClusterDataImplTest.java | 3 -
18 files changed, 380 insertions(+), 154 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 843cec7b205..b0cc50edf1f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -39,10 +40,19 @@ public class ClusterResources extends
BaseResources<ClusterData> {
@Getter
private FailureDomainResources failureDomainResources;
-
- public ClusterResources(MetadataStore store, int operationTimeoutSec) {
- super(store, ClusterData.class, operationTimeoutSec);
- this.failureDomainResources = new FailureDomainResources(store,
FailureDomainImpl.class, operationTimeoutSec);
+ @Getter
+ private ClusterPoliciesResources clusterPoliciesResources;
+
+ public ClusterResources(MetadataStore localStore, MetadataStore
configurationStore, int operationTimeoutSec) {
+ super(configurationStore, ClusterData.class, operationTimeoutSec);
+ this.failureDomainResources = new
FailureDomainResources(configurationStore, FailureDomainImpl.class,
+ operationTimeoutSec);
+ if (localStore != null) {
+ this.clusterPoliciesResources = new
ClusterPoliciesResources(localStore, ClusterPoliciesImpl.class,
+ operationTimeoutSec);
+ } else {
+ this.clusterPoliciesResources = null;
+ }
}
public CompletableFuture<Set<String>> listAsync() {
@@ -216,4 +226,26 @@ public class ClusterResources extends
BaseResources<ClusterData> {
});
}
}
+
+ public static class ClusterPoliciesResources extends
BaseResources<ClusterPoliciesImpl> {
+ public static final String LOCAL_POLICIES_PATH = "policies";
+
+ public ClusterPoliciesResources(MetadataStore store,
Class<ClusterPoliciesImpl> clazz,
+ int operationTimeoutSec) {
+ super(store, clazz, operationTimeoutSec);
+ }
+
+ public Optional<ClusterPoliciesImpl> getClusterPolicies(String
clusterName) throws MetadataStoreException {
+ return get(joinPath(BASE_CLUSTERS_PATH, clusterName,
LOCAL_POLICIES_PATH));
+ }
+
+ public CompletableFuture<Optional<ClusterPoliciesImpl>>
getClusterPoliciesAsync(String clusterName) {
+ return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName,
LOCAL_POLICIES_PATH));
+ }
+
+ public CompletableFuture<Void> setPoliciesWithCreateAsync(String
clusterName,
+ Function<Optional<ClusterPoliciesImpl>, ClusterPoliciesImpl>
createFunction) {
+ return setWithCreateAsync(joinPath(BASE_CLUSTERS_PATH,
clusterName, LOCAL_POLICIES_PATH), createFunction);
+ }
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index ad872a5356c..fe7ffe0bc7b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -61,7 +61,8 @@ public class PulsarResources {
int operationTimeoutSec) {
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore,
operationTimeoutSec);
- clusterResources = new
ClusterResources(configurationMetadataStore, operationTimeoutSec);
+ clusterResources = new ClusterResources(localMetadataStore,
configurationMetadataStore,
+ operationTimeoutSec);
namespaceResources = new
NamespaceResources(configurationMetadataStore, operationTimeoutSec);
resourcegroupResources = new
ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 5d4ed54c334..b8743933098 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -60,8 +60,10 @@ import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
@@ -247,6 +249,41 @@ public class ClustersBase extends AdminResource {
});
}
+ @GET
+ @Path("/{cluster}/migrate")
+ @ApiOperation(
+ value = "Get the cluster migration configuration for the specified
cluster.",
+ response = ClusterDataImpl.class,
+ notes = "This operation requires Pulsar superuser privileges."
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Return the cluster data.",
response = ClusterDataImpl.class),
+ @ApiResponse(code = 403, message = "Don't have admin permission."),
+ @ApiResponse(code = 404, message = "Cluster doesn't exist."),
+ @ApiResponse(code = 500, message = "Internal server error.")
+ })
+ public ClusterPolicies getClusterMigration(
+ @ApiParam(
+ value = "The cluster name",
+ required = true
+ )
+ @PathParam("cluster") String cluster
+ ) {
+ validateSuperUserAccess();
+
+ try {
+ return
clusterResources().getClusterPoliciesResources().getClusterPolicies(cluster)
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster does not exist"));
+ } catch (Exception e) {
+ log.error("[{}] Failed to get cluster {}", clientAppId(), cluster,
e);
+ if (e instanceof RestException) {
+ throw (RestException) e;
+ } else {
+ throw new RestException(e);
+ }
+ }
+ }
+
@POST
@Path("/{cluster}/migrate")
@ApiOperation(
@@ -286,8 +323,9 @@ public class ClustersBase extends AdminResource {
}
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
- .thenCompose(__ ->
clusterResources().updateClusterAsync(cluster, old -> {
- ClusterDataImpl data = (ClusterDataImpl) old;
+ .thenCompose(__ ->
clusterResources().getClusterPoliciesResources().setPoliciesWithCreateAsync(cluster,
+ old -> {
+ ClusterPoliciesImpl data = old.orElse(new
ClusterPoliciesImpl());
data.setMigrated(isMigrated);
data.setMigratedClusterUrl(clusterUrl);
return data;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index a8f25f61a94..7a23312c477 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -65,7 +65,7 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
@@ -1358,14 +1358,29 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
public static CompletableFuture<Optional<ClusterUrl>>
getMigratedClusterUrlAsync(PulsarService pulsar,
-
String topic) {
- return
pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
+ String topic) {
+ CompletableFuture<Optional<ClusterUrl>> result = new
CompletableFuture<>();
+
pulsar.getPulsarResources().getClusterResources().getClusterPoliciesResources()
+ .getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
- ((clusterData, isNamespaceMigrationEnabled)
- -> ((clusterData.isPresent() &&
clusterData.get().isMigrated())
- || isNamespaceMigrationEnabled)
- ?
Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
- : Optional.empty()));
+ ((clusterData, isNamespaceMigrationEnabled) -> {
+ Optional<ClusterUrl> url =
((clusterData.isPresent() && clusterData.get().isMigrated())
+ || isNamespaceMigrationEnabled)
+ ?
Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
+ : Optional.empty();
+ return url;
+ }))
+ .thenAccept(res -> {
+ // cluster policies future is completed by metadata-store
thread and continuing further
+ // processing in the same metadata store can cause
deadlock while creating topic as
+ // create topic path may have blocking call on
metadata-store. so, complete future on a
+ // separate thread to avoid deadlock.
+ pulsar.getExecutor().execute(() -> result.complete(res));
+ }).exceptionally(ex -> {
+ pulsar.getExecutor().execute(() ->
result.completeExceptionally(ex.getCause()));
+ return null;
+ });
+ return result;
}
private static CompletableFuture<Boolean>
isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 023ede74b4f..e72c805d738 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -56,7 +56,7 @@ import org.apache.pulsar.common.api.proto.KeyLongValue;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index f7d2bb2dd27..acaa7c02d19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -50,7 +50,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TopicOperation;
import
org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 95f139dc11e..da51eaea8fb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -152,7 +152,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 76e9f261ca6..f3857b5ad2a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -73,7 +73,7 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d35d284d32..a8a921f3b62 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -147,7 +147,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index 2139a7bc12e..2fa201cf958 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
@@ -86,36 +86,12 @@ public class ClusterMigrationTest {
PulsarService pulsar4;
PulsarAdmin admin4;
- @DataProvider(name = "TopicsubscriptionTypes")
- public Object[][] subscriptionTypes() {
- return new Object[][] {
- {true, SubscriptionType.Shared},
- {true, SubscriptionType.Key_Shared},
- {true, SubscriptionType.Shared},
- {true, SubscriptionType.Key_Shared},
-
- {false, SubscriptionType.Shared},
- {false, SubscriptionType.Key_Shared},
- {false, SubscriptionType.Shared},
- {false, SubscriptionType.Key_Shared},
- };
- }
-
@DataProvider(name="NamespaceMigrationTopicSubscriptionTypes")
public Object[][] namespaceMigrationSubscriptionTypes() {
return new Object[][] {
- {true, SubscriptionType.Shared, true, false},
- {true, SubscriptionType.Key_Shared, true, false},
- {true, SubscriptionType.Shared, false, true},
- {true, SubscriptionType.Key_Shared, false, true},
- {true, SubscriptionType.Shared, true, true},
- {true, SubscriptionType.Key_Shared, true, true},
- {false, SubscriptionType.Shared, true, false},
- {false, SubscriptionType.Key_Shared, true, false},
- {false, SubscriptionType.Shared, false, true},
- {false, SubscriptionType.Key_Shared,false, true},
- {false, SubscriptionType.Shared, true, true},
- {false, SubscriptionType.Key_Shared,true, true},
+ {SubscriptionType.Shared, true, false},
+ {SubscriptionType.Shared, false, true},
+ {SubscriptionType.Shared, true, true},
};
}
@@ -227,14 +203,14 @@ public class ClusterMigrationTest {
@AfterMethod(alwaysRun = true, timeOut = 300000)
protected void cleanup() throws Exception {
log.info("--- Shutting down ---");
- broker1.cleanup();
admin1.close();
- broker2.cleanup();
admin2.close();
- broker3.cleanup();
admin3.close();
- broker4.cleanup();
admin4.close();
+ broker1.cleanup();
+ broker2.cleanup();
+ broker3.cleanup();
+ broker4.cleanup();
}
@BeforeMethod(alwaysRun = true)
@@ -259,11 +235,11 @@ public class ClusterMigrationTest {
* (11) Restart Broker-1 and connect producer/consumer on cluster-1
* @throws Exception
*/
- @Test(dataProvider = "TopicsubscriptionTypes")
- public void testClusterMigration(boolean persistent, SubscriptionType
subType) throws Exception {
+ @Test
+ public void testClusterMigration() throws Exception {
log.info("--- Starting ReplicatorTest::testClusterMigration ---");
final String topicName = BrokerTestUtil
- .newUniqueName((persistent ? "persistent" : "non-persistent")
+ "://" + namespace + "/migrationTopic");
+ .newUniqueName("persistent://" + namespace +
"/migrationTopic");
@Cleanup
PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
@@ -271,7 +247,7 @@ public class ClusterMigrationTest {
// cluster-1 producer/consumer
Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
- Consumer<byte[]> consumer1 =
client1.newConsumer().topic(topicName).subscriptionType(subType)
+ Consumer<byte[]> consumer1 =
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic)
pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
@@ -298,6 +274,7 @@ public class ClusterMigrationTest {
ClusterUrl migratedUrl = new
ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
+
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(),
migratedUrl);
retryStrategically((test) -> {
try {
@@ -330,12 +307,10 @@ public class ClusterMigrationTest {
// try to consume backlog messages from cluster-1
consumer1 =
client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
- if (persistent) {
- for (int i = 0; i < n; i++) {
- Message<byte[]> msg = consumer1.receive();
- assertEquals(msg.getData(), "test1".getBytes());
- consumer1.acknowledge(msg);
- }
+ for (int i = 0; i < n; i++) {
+ Message<byte[]> msg = consumer1.receive();
+ assertEquals(msg.getData(), "test1".getBytes());
+ consumer1.acknowledge(msg);
}
// after consuming all messages, consumer should have disconnected
// from cluster-1 and reconnect with cluster-2
@@ -351,13 +326,13 @@ public class ClusterMigrationTest {
assertTrue(topic1.getSubscriptions().isEmpty());
// not also create a new consumer which should also reconnect to
cluster-2
- Consumer<byte[]> consumer2 =
client1.newConsumer().topic(topicName).subscriptionType(subType)
+ Consumer<byte[]> consumer2 =
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s2").subscribe();
retryStrategically((test) -> topic2.getSubscription("s2") != null, 10,
500);
assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty());
// new sub on migration topic must be redirected immediately
- Consumer<byte[]> consumerM =
client1.newConsumer().topic(topicName).subscriptionType(subType)
+ Consumer<byte[]> consumerM =
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("sM").subscribe();
assertFalse(pulsar2.getBrokerService().getTopicReference(topicName).get().getSubscription("sM").getConsumers()
.isEmpty());
@@ -365,7 +340,7 @@ public class ClusterMigrationTest {
// migrate topic after creating subscription
String newTopicName = topicName + "-new";
- consumerM =
client1.newConsumer().topic(newTopicName).subscriptionType(subType)
+ consumerM =
client1.newConsumer().topic(newTopicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("sM").subscribe();
retryStrategically((t) ->
pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
pulsar2.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
@@ -392,8 +367,8 @@ public class ClusterMigrationTest {
// create non-migrated topic which should connect to cluster-1
String diffTopic = BrokerTestUtil
- .newUniqueName((persistent ? "persistent" : "non-persistent")
+ "://" + namespace + "/migrationTopic");
- Consumer<byte[]> consumerDiff =
client1.newConsumer().topic(diffTopic).subscriptionType(subType)
+ .newUniqueName("persistent://" + namespace +
"/migrationTopic");
+ Consumer<byte[]> consumerDiff =
client1.newConsumer().topic(diffTopic).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1-d").subscribe();
Producer<byte[]> producerDiff =
client1.newProducer().topic(diffTopic).enableBatching(false)
.producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -408,7 +383,7 @@ public class ClusterMigrationTest {
broker1.restart();
Producer<byte[]> producer4 =
client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
- Consumer<byte[]> consumer3 =
client1.newConsumer().topic(topicName).subscriptionType(subType)
+ Consumer<byte[]> consumer3 =
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s3").subscribe();
retryStrategically((test) -> topic2.getProducers().size() == 4, 10,
500);
assertTrue(topic2.getProducers().size() == 4);
@@ -421,15 +396,16 @@ public class ClusterMigrationTest {
assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(),
"test3".getBytes());
}
+ client1.close();
+ client2.close();
log.info("Successfully consumed messages by migrated consumers");
}
- @Test(dataProvider = "TopicsubscriptionTypes")
- public void testClusterMigrationWithReplicationBacklog(boolean persistent,
SubscriptionType subType) throws Exception {
+ @Test
+ public void testClusterMigrationWithReplicationBacklog() throws Exception {
log.info("--- Starting
ReplicatorTest::testClusterMigrationWithReplicationBacklog ---");
- persistent = true;
final String topicName = BrokerTestUtil
- .newUniqueName((persistent ? "persistent" : "non-persistent")
+ "://" + namespace + "/migrationTopic");
+ .newUniqueName("persistent://" + namespace +
"/migrationTopic");
@Cleanup
PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
@@ -440,11 +416,11 @@ public class ClusterMigrationTest {
// cluster-1 producer/consumer
Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
- Consumer<byte[]> consumer1 =
client1.newConsumer().topic(topicName).subscriptionType(subType)
+ Consumer<byte[]> consumer1 =
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
// cluster-3 consumer
- Consumer<byte[]> consumer3 =
client3.newConsumer().topic(topicName).subscriptionType(subType)
+ Consumer<byte[]> consumer3 =
client3.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic)
pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
@@ -526,6 +502,10 @@ public class ClusterMigrationTest {
// verify that the producer1 is now is now connected to migrated
cluster "r2" since backlog is cleared.
retryStrategically((test) -> topic2.getProducers().size()==2, 10, 500);
assertEquals(topic2.getProducers().size(), 2);
+
+ client1.close();
+ client2.close();
+ client3.close();
}
/**
@@ -638,17 +618,19 @@ public class ClusterMigrationTest {
}
}, 10, 500);
assertFalse(pulsar2Topic.getSubscription("s1").getConsumers().isEmpty());
+
+ client1.close();
}
@Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
- public void testNamespaceMigration(boolean persistent, SubscriptionType
subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception
{
+ public void testNamespaceMigration(SubscriptionType subType, boolean
isClusterMigrate, boolean isNamespaceMigrate) throws Exception {
log.info("--- Starting Test::testNamespaceMigration ---");
// topic for the namespace1 (to be migrated)
final String topicName = BrokerTestUtil
- .newUniqueName((persistent ? "persistent" : "non-persistent")
+ "://" + namespace + "/migrationTopic");
+ .newUniqueName("persistent://" + namespace +
"/migrationTopic");
// topic for namespace2 (not to be migrated)
final String topicName2 = BrokerTestUtil
- .newUniqueName((persistent ? "persistent" : "non-persistent")
+ "://" + namespaceNotToMigrate + "/migrationTopic");
+ .newUniqueName("persistent://" + namespaceNotToMigrate +
"/migrationTopic");
@Cleanup
PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
@@ -764,16 +746,14 @@ public class ClusterMigrationTest {
// try to consume backlog messages from cluster-1
blueConsumerNs1_1 =
client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
blueConsumerNs2_1 =
client1.newConsumer().topic(topicName2).subscriptionName("s1").subscribe();
- if (persistent) {
- for (int i = 0; i < n; i++) {
- Message<byte[]> msg = blueConsumerNs1_1.receive();
- assertEquals(msg.getData(), "test1".getBytes());
- blueConsumerNs1_1.acknowledge(msg);
-
- Message<byte[]> msg2 = blueConsumerNs2_1.receive();
- assertEquals(msg2.getData(), "test1".getBytes());
- blueConsumerNs2_1.acknowledge(msg2);
- }
+ for (int i = 0; i < n; i++) {
+ Message<byte[]> msg = blueConsumerNs1_1.receive();
+ assertEquals(msg.getData(), "test1".getBytes());
+ blueConsumerNs1_1.acknowledge(msg);
+
+ Message<byte[]> msg2 = blueConsumerNs2_1.receive();
+ assertEquals(msg2.getData(), "test1".getBytes());
+ blueConsumerNs2_1.acknowledge(msg2);
}
// after consuming all messages, consumer should have disconnected
// from blue and reconnect with green
@@ -862,7 +842,7 @@ public class ClusterMigrationTest {
// create non-migrated topic which should connect to blue
String diffTopic = BrokerTestUtil
- .newUniqueName((persistent ? "persistent" : "non-persistent")
+ "://" + namespace + "/migrationTopic");
+ .newUniqueName("persistent://" + namespace +
"/migrationTopic");
Consumer<byte[]> consumerDiff =
client1.newConsumer().topic(diffTopic).subscriptionType(subType)
.subscriptionName("s1-d").subscribe();
Producer<byte[]> producerDiff =
client1.newProducer().topic(diffTopic).enableBatching(false)
@@ -902,18 +882,19 @@ public class ClusterMigrationTest {
blueProducerNs2_1.close();
greenProducerNs1_1.close();
greenProducerNs2_1.close();
+ client1.close();
+ client2.close();
}
@Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
- public void testNamespaceMigrationWithReplicationBacklog(boolean
persistent, SubscriptionType subType, boolean isClusterMigrate, boolean
isNamespaceMigrate) throws Exception {
+ public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType
subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception
{
log.info("--- Starting
ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---");
- persistent = true;
// topic for namespace1 (to be migrated)
final String topicName = BrokerTestUtil
- .newUniqueName((persistent ? "persistent" : "non-persistent")
+ "://" + namespace + "/migrationTopic");
+ .newUniqueName("persistent://" + namespace +
"/migrationTopic");
// topic for namespace2 (not to be migrated)
final String topicName2 = BrokerTestUtil
- .newUniqueName((persistent ? "persistent" : "non-persistent")
+ "://" + namespaceNotToMigrate + "/migrationTopic");
+ .newUniqueName("persistent://" + namespaceNotToMigrate +
"/migrationTopic");
@Cleanup
PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
@@ -1069,6 +1050,9 @@ public class ClusterMigrationTest {
blueConsumerNs2_1.close();
greenProducerNs1_1.close();
greenProducerNs2_1.close();
+ client1.close();
+ client2.close();
+ client3.close();
}
static class TestBroker extends MockedPulsarServiceBaseTest {
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 4178bc7483d..53e66809465 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -29,7 +29,8 @@ import
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
@@ -209,6 +210,46 @@ public interface Clusters {
*/
CompletableFuture<Void> updatePeerClusterNamesAsync(String cluster,
LinkedHashSet<String> peerClusterNames);
+ /**
+ * Get the cluster migration configuration data for the specified cluster.
+ * <p/>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+ * </pre>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @return the cluster configuration
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission to get the configuration of
the cluster
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ ClusterPolicies getClusterMigration(String cluster) throws
PulsarAdminException;
+
+ /**
+ * Get the cluster migration configuration data for the specified cluster
asynchronously.
+ * <p/>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+ * </pre>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @return the cluster configuration
+ *
+ */
+ CompletableFuture<ClusterPolicies> getClusterMigrationAsync(String
cluster);
+
/**
* Update the configuration for a cluster migration.
* <p/>
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
index 0b3e5aa49cb..1f7126521c6 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
@@ -19,9 +19,6 @@
package org.apache.pulsar.common.policies.data;
import java.util.LinkedHashSet;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
import org.apache.pulsar.client.admin.utils.ReflectionUtils;
import org.apache.pulsar.client.api.ProxyProtocol;
@@ -70,10 +67,6 @@ public interface ClusterData {
String getListenerName();
- boolean isMigrated();
-
- ClusterUrl getMigratedClusterUrl();
-
interface Builder {
Builder serviceUrl(String serviceUrl);
@@ -119,10 +112,6 @@ public interface ClusterData {
Builder listenerName(String listenerName);
- Builder migrated(boolean migrated);
-
- Builder migratedClusterUrl(ClusterUrl migratedClusterUrl);
-
ClusterData build();
}
@@ -131,19 +120,4 @@ public interface ClusterData {
static Builder builder() {
return
ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.ClusterDataImpl");
}
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- class ClusterUrl {
- String serviceUrl;
- String serviceUrlTls;
- String brokerServiceUrl;
- String brokerServiceUrlTls;
-
- public boolean isEmpty() {
- return serviceUrl != null && serviceUrlTls != null &&
brokerServiceUrl == null
- && brokerServiceUrlTls == null;
- }
- }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java
new file mode 100644
index 00000000000..b95f6bb19ce
--- /dev/null
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.client.admin.utils.ReflectionUtils;
+
+public interface ClusterPolicies {
+ boolean isMigrated();
+
+ ClusterUrl getMigratedClusterUrl();
+
+ interface Builder {
+ Builder migrated(boolean migrated);
+
+ Builder migratedClusterUrl(ClusterUrl migratedClusterUrl);
+
+ ClusterPolicies build();
+ }
+
+ Builder clone();
+
+ static Builder builder() {
+ return
ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.ClusterPoliciesImpl");
+ }
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @EqualsAndHashCode
+ class ClusterUrl {
+ String serviceUrl;
+ String serviceUrlTls;
+ String brokerServiceUrl;
+ String brokerServiceUrlTls;
+
+ public boolean isEmpty() {
+ return serviceUrl != null && serviceUrlTls != null &&
brokerServiceUrl == null
+ && brokerServiceUrlTls == null;
+ }
+ }
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 02e44aca626..231d4506d61 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -34,8 +34,10 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
@@ -107,6 +109,18 @@ public class ClustersImpl extends BaseResource implements
Clusters {
return asyncPostRequest(path, Entity.entity(peerClusterNames,
MediaType.APPLICATION_JSON));
}
+ @Override
+ public ClusterPolicies getClusterMigration(String cluster) throws
PulsarAdminException {
+ return sync(() -> getClusterMigrationAsync(cluster));
+ }
+
+ @Override
+ public CompletableFuture<ClusterPolicies> getClusterMigrationAsync(String
cluster) {
+ WebTarget path = adminClusters.path(cluster).path("migrate");
+ return asyncGetRequest(path, new FutureCallback<ClusterPoliciesImpl>()
{
+ }).thenApply(policies -> policies);
+ }
+
@Override
public void updateClusterMigration(String cluster, boolean isMigrated,
ClusterUrl clusterUrl)
throws PulsarAdminException {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 033146aa607..646f4ef0f50 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -32,8 +32,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
@@ -154,6 +154,17 @@ public class CmdClusters extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the cluster migration configuration
data for the specified cluster")
+ private class GetClusterMigration extends CliCommand {
+ @Parameter(description = "cluster-name", required = true)
+ private java.util.List<String> params;
+
+ void run() throws PulsarAdminException {
+ String cluster = getOneArgument(params);
+ print(getAdmin().clusters().getClusterMigration(cluster));
+ }
+ }
+
@Parameters(commandDescription = "Update cluster migration")
private class UpdateClusterMigration extends CliCommand {
@Parameter(description = "cluster-name", required = true)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
index 6a7110e6507..fffe87a3005 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
@@ -176,17 +176,6 @@ public final class ClusterDataImpl implements
ClusterData, Cloneable {
example = ""
)
private String listenerName;
- @ApiModelProperty(
- name = "migrated",
- value = "flag to check if cluster is migrated to different
cluster",
- example = "true/false"
- )
- private boolean migrated;
- @ApiModelProperty(
- name = "migratedClusterUrl",
- value = "url of cluster where current cluster is migrated"
- )
- private ClusterUrl migratedClusterUrl;
public static ClusterDataImplBuilder builder() {
return new ClusterDataImplBuilder();
@@ -216,9 +205,7 @@ public final class ClusterDataImpl implements ClusterData,
Cloneable {
.brokerClientTrustCertsFilePath(brokerClientTrustCertsFilePath)
.brokerClientCertificateFilePath(brokerClientCertificateFilePath)
.brokerClientKeyFilePath(brokerClientKeyFilePath)
- .listenerName(listenerName)
- .migrated(migrated)
- .migratedClusterUrl(migratedClusterUrl);
+ .listenerName(listenerName);
}
@Data
@@ -245,8 +232,6 @@ public final class ClusterDataImpl implements ClusterData,
Cloneable {
private String brokerClientKeyFilePath;
private String brokerClientTrustCertsFilePath;
private String listenerName;
- private boolean migrated;
- private ClusterUrl migratedClusterUrl;
ClusterDataImplBuilder() {
}
@@ -367,16 +352,6 @@ public final class ClusterDataImpl implements
ClusterData, Cloneable {
return this;
}
- public ClusterDataImplBuilder migrated(boolean migrated) {
- this.migrated = migrated;
- return this;
- }
-
- public ClusterDataImplBuilder migratedClusterUrl(ClusterUrl
migratedClusterUrl) {
- this.migratedClusterUrl = migratedClusterUrl;
- return this;
- }
-
public ClusterDataImpl build() {
return new ClusterDataImpl(
serviceUrl,
@@ -400,9 +375,7 @@ public final class ClusterDataImpl implements ClusterData,
Cloneable {
brokerClientTrustCertsFilePath,
brokerClientKeyFilePath,
brokerClientCertificateFilePath,
- listenerName,
- migrated,
- migratedClusterUrl);
+ listenerName);
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java
new file mode 100644
index 00000000000..c8af2dec321
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * The configuration data for a cluster.
+ */
+@ApiModel(
+ value = "ClusterPolicies",
+ description = "The local cluster policies for a cluster"
+)
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public final class ClusterPoliciesImpl implements ClusterPolicies, Cloneable {
+ @ApiModelProperty(
+ name = "migrated",
+ value = "flag to check if cluster is migrated to different
cluster",
+ example = "true/false"
+ )
+ private boolean migrated;
+ @ApiModelProperty(
+ name = "migratedClusterUrl",
+ value = "url of cluster where current cluster is migrated"
+ )
+ private ClusterUrl migratedClusterUrl;
+
+ public static ClusterPoliciesImplBuilder builder() {
+ return new ClusterPoliciesImplBuilder();
+ }
+
+ @Override
+ public ClusterPoliciesImplBuilder clone() {
+ return builder()
+ .migrated(migrated)
+ .migratedClusterUrl(migratedClusterUrl);
+ }
+
+ @Data
+ public static class ClusterPoliciesImplBuilder implements
ClusterPolicies.Builder {
+ private boolean migrated;
+ private ClusterUrl migratedClusterUrl;
+
+ ClusterPoliciesImplBuilder() {
+ }
+
+ public ClusterPoliciesImplBuilder migrated(boolean migrated) {
+ this.migrated = migrated;
+ return this;
+ }
+
+ public ClusterPoliciesImplBuilder migratedClusterUrl(ClusterUrl
migratedClusterUrl) {
+ this.migratedClusterUrl = migratedClusterUrl;
+ return this;
+ }
+
+ public ClusterPoliciesImpl build() {
+ return new ClusterPoliciesImpl(
+ migrated,
+ migratedClusterUrl);
+ }
+ }
+}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java
index 87e935ecf73..0bf16166531 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java
@@ -53,9 +53,6 @@ public class ClusterDataImplTest {
.brokerClientKeyFilePath("/my/key/file")
.brokerClientCertificateFilePath("/my/cert/file")
.listenerName("a-listener")
- .migrated(true)
- .migratedClusterUrl(new
ClusterData.ClusterUrl("http://remote", "https://remote", "pulsar://remote",
- "pulsar+ssl://remote"))
.build();
ClusterDataImpl clone = originalData.clone().build();