BewareMyPower commented on code in PR #20537:
URL: https://github.com/apache/pulsar/pull/20537#discussion_r1234655490


##########
conf/broker.conf:
##########
@@ -1308,6 +1308,9 @@ loadBalancerOverrideBrokerNicSpeedGbps=
 # Name of load manager to use
 
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl
 
+#Name of topic bundle assignment strategy to use

Review Comment:
   ```suggestion
   # Name of topic bundle assignment strategy to use
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.util.Reflections;
+
+public class TopicBundleAssignmentFactory {
+
+    private static volatile TopicBundleAssignmentStrategy strategy;
+
+    public static TopicBundleAssignmentStrategy create(PulsarService pulsar) {
+        if (strategy != null) {
+            return strategy;
+        }
+        synchronized (TopicBundleAssignmentFactory.class) {
+            if (strategy != null) {
+                return strategy;
+            }
+            ServiceConfiguration conf = pulsar.getConfiguration();
+            try {
+                TopicBundleAssignmentStrategy tempStategy =
+                        
Reflections.createInstance(conf.getTopicBundleAssignmentStrategy(),
+                                TopicBundleAssignmentStrategy.class, 
Thread.currentThread().getContextClassLoader());
+                tempStategy.init(pulsar);
+                strategy = tempStategy;
+                return strategy;
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        "Could not load TopicBundleAssignmentStrategy:" + 
conf.getTopicBundleAssignmentStrategy(), e);

Review Comment:
   This exception could only be caught in 
`PulsarWebResource#validateNamespaceBundleRange`. So if the strategy class 
cannot be loaded, each time a REST request comes, an error response will be 
created.
   
   We should fail fast in this case. So it's better to do the initialization in 
`PulsarService#start()`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java:
##########
@@ -204,4 +200,8 @@ public LocalPolicies toLocalPolicies() {
                 localPolicies.map(lp -> 
lp.getLeft().bookieAffinityGroup).orElse(null),
                 localPolicies.map(lp -> 
lp.getLeft().namespaceAntiAffinityGroup).orElse(null));
     }
+
+    public NamespaceName getNsname() {
+        return this.nsname;
+    }

Review Comment:
   It adds a new public method to `NamespaceBundles`, which was not discussed 
in PIP. I think we should not add such public methods easily, especially it's a 
class that could be used in Pulsar plugins.
   
   We can just move the logic to `NamespaceBundles#findBundle`:
   
   ```java
       public NamespaceBundle findBundle(TopicName topicName) {
           checkArgument(nsname.equals(topicName.getNamespaceObject()));
           return topicBundleAssignmentStrategy.findBundle(topicName, this);
       }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.util.Reflections;
+
+public class TopicBundleAssignmentFactory {
+
+    private static volatile TopicBundleAssignmentStrategy strategy;
+
+    public static TopicBundleAssignmentStrategy create(PulsarService pulsar) {
+        if (strategy != null) {
+            return strategy;
+        }
+        synchronized (TopicBundleAssignmentFactory.class) {
+            if (strategy != null) {
+                return strategy;
+            }
+            ServiceConfiguration conf = pulsar.getConfiguration();
+            try {
+                TopicBundleAssignmentStrategy tempStategy =
+                        
Reflections.createInstance(conf.getTopicBundleAssignmentStrategy(),
+                                TopicBundleAssignmentStrategy.class, 
Thread.currentThread().getContextClassLoader());
+                tempStategy.init(pulsar);
+                strategy = tempStategy;

Review Comment:
   Typo, `tempStategy` -> `tempStrategy`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+
+public class ConsistentHashingTopicBundleAssigner implements 
TopicBundleAssignmentStrategy {
+    private NamespaceService namespaceService;
+    @Override
+    public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles 
namespaceBundles) {
+        
checkArgument(namespaceBundles.getNsname().equals(topicName.getNamespaceObject()));
+        long hashCode = 
namespaceService.getNamespaceBundleFactory().getLongHashCode(topicName.toString());
+        NamespaceBundle bundle = namespaceBundles.getBundle(hashCode);
+        if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
+            bundle.setHasNonPersistentTopic(true);
+        }
+        return bundle;
+    }
+
+    @Override
+    public void init(PulsarService pulsarService) {
+        this.namespaceService = pulsarService.getNamespaceService();
+    }
+
+}

Review Comment:
   Could you add the line break at the end (including other new files)?
   
   If you use Intellij Idea, go to **Editor - General** and click **Ensure 
every saved file ends with a line break**
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java:
##########
@@ -377,6 +378,10 @@ public boolean canSplitBundle(NamespaceBundle bundle) {
         return range.upperEndpoint() - range.lowerEndpoint() > 1;
     }
 
+    public PulsarService getPulsar() {

Review Comment:
   It's better not to make this method public, I see this method is only used 
in `NamespaceBundles`, we can use the default access modifier.
   
   In addition, it would be more simple to create the strategy by the factory, 
e.g.
   
   ```java
       TopicBundleAssignmentStrategy createAssignmentStrategy() {
           return TopicBundleAssignmentFactory.create(pulsar);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to