This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a9f99f03db [feat] [broker] PIP-188 Fix cluster migration state store 
into local namespace policies (#21363)
3a9f99f03db is described below

commit 3a9f99f03db175ab0622dbfc87cde9efab24276e
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Oct 25 14:26:20 2023 -0700

    [feat] [broker] PIP-188 Fix cluster migration state store into local 
namespace policies (#21363)
    
    Co-authored-by: Rajan Dhabalia <[email protected]>
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 50 +++++++++++-----------
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  4 +-
 .../pulsar/broker/service/AbstractTopic.java       |  6 +--
 .../broker/service/ClusterMigrationTest.java       |  1 +
 .../pulsar/common/policies/data/Policies.java      |  5 ++-
 .../pulsar/common/policies/data/LocalPolicies.java |  1 +
 6 files changed, 36 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index e9beab90b5f..1526ae18a90 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -57,6 +57,7 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.EntryFilters;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -304,7 +305,9 @@ public abstract class AdminResource extends 
PulsarWebResource {
             // fetch bundles from LocalZK-policies
             BundlesData bundleData = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
                     .getBundles(namespaceName).getBundlesData();
+            Optional<LocalPolicies> localPolicies = 
getLocalPolicies().getLocalPolicies(namespaceName);
             policies.bundles = bundleData != null ? bundleData : 
policies.bundles;
+            policies.migrated = localPolicies.isPresent() ? 
localPolicies.get().migrated : false;
             if (policies.is_allow_auto_update_schema == null) {
                 // the type changed from boolean to Boolean. return broker 
value here for keeping compatibility.
                 policies.is_allow_auto_update_schema = 
pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
@@ -321,32 +324,31 @@ public abstract class AdminResource extends 
PulsarWebResource {
     }
 
     protected CompletableFuture<Policies> 
getNamespacePoliciesAsync(NamespaceName namespaceName) {
-        return 
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> {
-            if (policies.isPresent()) {
-                return pulsar()
-                        .getNamespaceService()
-                        .getNamespaceBundleFactory()
-                        .getBundlesAsync(namespaceName)
-                        .thenCompose(bundles -> {
-                    BundlesData bundleData = null;
-                    try {
-                        bundleData = bundles.getBundlesData();
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to get namespace policies {}", 
clientAppId(), namespaceName, e);
-                        return FutureUtil.failedFuture(new RestException(e));
-                    }
-                    policies.get().bundles = bundleData != null ? bundleData : 
policies.get().bundles;
-                    if (policies.get().is_allow_auto_update_schema == null) {
-                        // the type changed from boolean to Boolean. return 
broker value here for keeping compatibility.
-                        policies.get().is_allow_auto_update_schema = 
pulsar().getConfig()
-                                .isAllowAutoUpdateSchemaEnabled();
+        CompletableFuture<Policies> result = new CompletableFuture<>();
+        namespaceResources().getPoliciesAsync(namespaceName)
+                
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, 
localPolicies) -> {
+                    if (pl.isPresent()) {
+                        Policies policies = pl.get();
+                        if (localPolicies.isPresent()) {
+                            policies.bundles = localPolicies.get().bundles;
+                            policies.migrated = localPolicies.get().migrated;
+                        }
+                        if (policies.is_allow_auto_update_schema == null) {
+                            // the type changed from boolean to Boolean. return
+                            // broker value here for keeping compatibility.
+                            policies.is_allow_auto_update_schema = 
pulsar().getConfig()
+                                    .isAllowAutoUpdateSchemaEnabled();
+                        }
+                        result.complete(policies);
+                    } else {
+                        result.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
                     }
-                    return CompletableFuture.completedFuture(policies.get());
+                    return null;
+                }).exceptionally(ex -> {
+                    result.completeExceptionally(ex.getCause());
+                    return null;
                 });
-            } else {
-                return FutureUtil.failedFuture(new 
RestException(Status.NOT_FOUND, "Namespace does not exist"));
-            }
-        });
+        return result;
     }
 
     protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a8f1af1d34f..bd3690d3c74 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2646,8 +2646,8 @@ public abstract class NamespacesBase extends 
AdminResource {
     protected void internalEnableMigration(boolean migrated) {
         validateSuperUserAccess();
         try {
-            updatePolicies(namespaceName, policies -> {
-                policies.isMigrated = migrated;
+            getLocalPolicies().setLocalPolicies(namespaceName, (policies) -> {
+                policies.migrated = migrated;
                 return policies;
             });
             log.info("Successfully updated migration on namespace {}", 
namespaceName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 7a23312c477..836bd7ad2d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1384,9 +1384,9 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     }
 
     private static CompletableFuture<Boolean> 
isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
-        return pulsar.getPulsarResources().getNamespaceResources().
-                    getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                    .thenApply(policies ->  policies.isPresent() && 
policies.get().isMigrated);
+        return pulsar.getPulsarResources().getLocalPolicies()
+                
.getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject())
+                .thenApply(policies -> policies.isPresent() && 
policies.get().migrated);
     }
 
     public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService 
pulsar, String topic) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index 2fa201cf958..7bd82cdd840 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -979,6 +979,7 @@ public class ClusterMigrationTest {
                 pulsar2.getBrokerServiceUrl(), 
pulsar2.getBrokerServiceUrlTls());
         admin1.clusters().updateClusterMigration("r1", isClusterMigrate, 
migratedUrl);
         admin1.namespaces().updateMigrationState(namespace, 
isNamespaceMigrate);
+        assertEquals(admin1.namespaces().getPolicies(namespace).migrated, 
isNamespaceMigrate);
         log.info("update cluster migration called");
 
         retryStrategically((test) -> {
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 138e8c47930..e19b6ab95f1 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -126,7 +126,7 @@ public class Policies {
     @SuppressWarnings("checkstyle:MemberName")
     public String resource_group_name = null;
 
-    public boolean isMigrated;
+    public boolean migrated;
 
     public enum BundleType {
         LARGEST, HOT;
@@ -158,7 +158,7 @@ public class Policies {
                 offload_policies,
                 subscription_types_enabled,
                 properties,
-                resource_group_name, entryFilters);
+                resource_group_name, entryFilters, migrated);
     }
 
     @Override
@@ -204,6 +204,7 @@ public class Policies {
                     && Objects.equals(offload_policies, other.offload_policies)
                     && Objects.equals(subscription_types_enabled, 
other.subscription_types_enabled)
                     && Objects.equals(properties, other.properties)
+                    && Objects.equals(migrated, other.migrated)
                     && Objects.equals(resource_group_name, 
other.resource_group_name)
                     && Objects.equals(entryFilters, other.entryFilters);
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
index e8a158ace70..3b17dbe067e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
@@ -34,6 +34,7 @@ public class LocalPolicies {
     public final BookieAffinityGroupData bookieAffinityGroup;
     // namespace anti-affinity-group
     public final String namespaceAntiAffinityGroup;
+    public boolean migrated;
 
     public LocalPolicies() {
         bundles = defaultBundle();

Reply via email to