This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3a9f99f03db [feat] [broker] PIP-188 Fix cluster migration state store
into local namespace policies (#21363)
3a9f99f03db is described below
commit 3a9f99f03db175ab0622dbfc87cde9efab24276e
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Oct 25 14:26:20 2023 -0700
[feat] [broker] PIP-188 Fix cluster migration state store into local
namespace policies (#21363)
Co-authored-by: Rajan Dhabalia <[email protected]>
---
.../apache/pulsar/broker/admin/AdminResource.java | 50 +++++++++++-----------
.../pulsar/broker/admin/impl/NamespacesBase.java | 4 +-
.../pulsar/broker/service/AbstractTopic.java | 6 +--
.../broker/service/ClusterMigrationTest.java | 1 +
.../pulsar/common/policies/data/Policies.java | 5 ++-
.../pulsar/common/policies/data/LocalPolicies.java | 1 +
6 files changed, 36 insertions(+), 31 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index e9beab90b5f..1526ae18a90 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -57,6 +57,7 @@ import
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.EntryFilters;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -304,7 +305,9 @@ public abstract class AdminResource extends
PulsarWebResource {
// fetch bundles from LocalZK-policies
BundlesData bundleData =
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName).getBundlesData();
+ Optional<LocalPolicies> localPolicies =
getLocalPolicies().getLocalPolicies(namespaceName);
policies.bundles = bundleData != null ? bundleData :
policies.bundles;
+ policies.migrated = localPolicies.isPresent() ?
localPolicies.get().migrated : false;
if (policies.is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker
value here for keeping compatibility.
policies.is_allow_auto_update_schema =
pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
@@ -321,32 +324,31 @@ public abstract class AdminResource extends
PulsarWebResource {
}
protected CompletableFuture<Policies>
getNamespacePoliciesAsync(NamespaceName namespaceName) {
- return
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> {
- if (policies.isPresent()) {
- return pulsar()
- .getNamespaceService()
- .getNamespaceBundleFactory()
- .getBundlesAsync(namespaceName)
- .thenCompose(bundles -> {
- BundlesData bundleData = null;
- try {
- bundleData = bundles.getBundlesData();
- } catch (Exception e) {
- log.error("[{}] Failed to get namespace policies {}",
clientAppId(), namespaceName, e);
- return FutureUtil.failedFuture(new RestException(e));
- }
- policies.get().bundles = bundleData != null ? bundleData :
policies.get().bundles;
- if (policies.get().is_allow_auto_update_schema == null) {
- // the type changed from boolean to Boolean. return
broker value here for keeping compatibility.
- policies.get().is_allow_auto_update_schema =
pulsar().getConfig()
- .isAllowAutoUpdateSchemaEnabled();
+ CompletableFuture<Policies> result = new CompletableFuture<>();
+ namespaceResources().getPoliciesAsync(namespaceName)
+
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl,
localPolicies) -> {
+ if (pl.isPresent()) {
+ Policies policies = pl.get();
+ if (localPolicies.isPresent()) {
+ policies.bundles = localPolicies.get().bundles;
+ policies.migrated = localPolicies.get().migrated;
+ }
+ if (policies.is_allow_auto_update_schema == null) {
+ // the type changed from boolean to Boolean. return
+ // broker value here for keeping compatibility.
+ policies.is_allow_auto_update_schema =
pulsar().getConfig()
+ .isAllowAutoUpdateSchemaEnabled();
+ }
+ result.complete(policies);
+ } else {
+ result.completeExceptionally(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
}
- return CompletableFuture.completedFuture(policies.get());
+ return null;
+ }).exceptionally(ex -> {
+ result.completeExceptionally(ex.getCause());
+ return null;
});
- } else {
- return FutureUtil.failedFuture(new
RestException(Status.NOT_FOUND, "Namespace does not exist"));
- }
- });
+ return result;
}
protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a8f1af1d34f..bd3690d3c74 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2646,8 +2646,8 @@ public abstract class NamespacesBase extends
AdminResource {
protected void internalEnableMigration(boolean migrated) {
validateSuperUserAccess();
try {
- updatePolicies(namespaceName, policies -> {
- policies.isMigrated = migrated;
+ getLocalPolicies().setLocalPolicies(namespaceName, (policies) -> {
+ policies.migrated = migrated;
return policies;
});
log.info("Successfully updated migration on namespace {}",
namespaceName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 7a23312c477..836bd7ad2d4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1384,9 +1384,9 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
private static CompletableFuture<Boolean>
isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
- return pulsar.getPulsarResources().getNamespaceResources().
- getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
- .thenApply(policies -> policies.isPresent() &&
policies.get().isMigrated);
+ return pulsar.getPulsarResources().getLocalPolicies()
+
.getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject())
+ .thenApply(policies -> policies.isPresent() &&
policies.get().migrated);
}
public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService
pulsar, String topic) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index 2fa201cf958..7bd82cdd840 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -979,6 +979,7 @@ public class ClusterMigrationTest {
pulsar2.getBrokerServiceUrl(),
pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", isClusterMigrate,
migratedUrl);
admin1.namespaces().updateMigrationState(namespace,
isNamespaceMigrate);
+ assertEquals(admin1.namespaces().getPolicies(namespace).migrated,
isNamespaceMigrate);
log.info("update cluster migration called");
retryStrategically((test) -> {
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 138e8c47930..e19b6ab95f1 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -126,7 +126,7 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public String resource_group_name = null;
- public boolean isMigrated;
+ public boolean migrated;
public enum BundleType {
LARGEST, HOT;
@@ -158,7 +158,7 @@ public class Policies {
offload_policies,
subscription_types_enabled,
properties,
- resource_group_name, entryFilters);
+ resource_group_name, entryFilters, migrated);
}
@Override
@@ -204,6 +204,7 @@ public class Policies {
&& Objects.equals(offload_policies, other.offload_policies)
&& Objects.equals(subscription_types_enabled,
other.subscription_types_enabled)
&& Objects.equals(properties, other.properties)
+ && Objects.equals(migrated, other.migrated)
&& Objects.equals(resource_group_name,
other.resource_group_name)
&& Objects.equals(entryFilters, other.entryFilters);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
index e8a158ace70..3b17dbe067e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
@@ -34,6 +34,7 @@ public class LocalPolicies {
public final BookieAffinityGroupData bookieAffinityGroup;
// namespace anti-affinity-group
public final String namespaceAntiAffinityGroup;
+ public boolean migrated;
public LocalPolicies() {
bundles = defaultBundle();