This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69d2b66  Fixed use of static synchronized in LoadManagerShared (#1154)
69d2b66 is described below

commit 69d2b66c3c125216ba7bae64300b4224fec0c633
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Feb 1 09:00:29 2018 -0800

    Fixed use of static synchronized in LoadManagerShared (#1154)
---
 .../broker/loadbalance/impl/LoadManagerShared.java | 62 +++++++++++++---------
 .../broker/loadbalance/impl/OverloadShedder.java   |  8 ++-
 .../pulsar/broker/service/ReplicatorTest.java      | 12 ++---
 3 files changed, 50 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index e0af8ec..2658a95 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -28,19 +28,17 @@ import java.net.URL;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-import org.apache.pulsar.broker.BrokerData;
-import java.util.concurrent.TimeoutException;
+
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
@@ -57,6 +55,9 @@ import org.slf4j.LoggerFactory;
 import com.beust.jcommander.internal.Lists;
 import com.google.common.collect.Maps;
 
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.internal.PlatformDependent;
+
 /**
  * This class contains code which in shared between the two load manager 
implementations.
  */
@@ -67,11 +68,21 @@ public class LoadManagerShared {
     public static final int MIBI = 1024 * 1024;
 
     // Cache for primary brokers according to policies.
-    private static final Set<String> primariesCache = new HashSet<>();
+    private static final FastThreadLocal<Set<String>> localPrimariesCache = 
new FastThreadLocal<Set<String>>() {
+        @Override
+        protected Set<String> initialValue() throws Exception {
+            return new HashSet<>();
+        }
+    };
 
     // Cache for shard brokers according to policies.
-    private static final Set<String> secondaryCache = new HashSet<>();
-    
+    private static final FastThreadLocal<Set<String>> localSecondaryCache = 
new FastThreadLocal<Set<String>>() {
+        @Override
+        protected Set<String> initialValue() throws Exception {
+            return new HashSet<>();
+        }
+    };
+
     // update LoadReport at most every 5 seconds
     public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = 
TimeUnit.SECONDS.toMillis(5);
 
@@ -83,12 +94,15 @@ public class LoadManagerShared {
 
     // Determines the brokers available for the given service unit according 
to the given policies.
     // The brokers are put into brokerCandidateCache.
-    public static synchronized void applyNamespacePolicies(final ServiceUnitId 
serviceUnit,
+    public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
             final SimpleResourceAllocationPolicies policies, final Set<String> 
brokerCandidateCache,
-            final Set<String> availableBrokers,
-            final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+            final Set<String> availableBrokers, final 
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+        Set<String> primariesCache = localPrimariesCache.get();
         primariesCache.clear();
+
+        Set<String> secondaryCache = localSecondaryCache.get();
         secondaryCache.clear();
+
         NamespaceName namespace = serviceUnit.getNamespaceObject();
         boolean isIsolationPoliciesPresent = 
policies.areIsolationPoliciesPresent(namespace);
         boolean isNonPersistentTopic = (serviceUnit instanceof NamespaceBundle)
@@ -141,7 +155,7 @@ public class LoadManagerShared {
                     }
                 } else if (!isNonPersistentTopic
                         && 
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
-                    // persistent topic can be assigned to only brokers that 
enabled for persistent-topic 
+                    // persistent topic can be assigned to only brokers that 
enabled for persistent-topic
                     if (log.isDebugEnabled()) {
                         log.debug("Filter broker- [{}] because broker only 
supports non-persistent namespace - [{}]",
                                 brokerUrl.getHost(), namespace.toString());
@@ -217,7 +231,7 @@ public class LoadManagerShared {
 
         // Collect JVM direct memory
         systemResourceUsage.directMemory.usage = (double) 
(JvmMetrics.getJvmDirectMemoryUsed() / MIBI);
-        systemResourceUsage.directMemory.limit = (double) 
(sun.misc.VM.maxDirectMemory() / MIBI);
+        systemResourceUsage.directMemory.limit = (double) 
(PlatformDependent.maxDirectMemory() / MIBI);
 
         return systemResourceUsage;
     }
@@ -235,7 +249,7 @@ public class LoadManagerShared {
     /**
      * Removes the brokers which have more bundles assigned to them in the 
same namespace as the incoming bundle than at
      * least one other available broker from consideration.
-     * 
+     *
      * @param assignedBundleName
      *            Name of bundle to be assigned.
      * @param candidates
@@ -280,7 +294,7 @@ public class LoadManagerShared {
                     .size() != finalLeastBundles);
         }
     }
-    
+
     /**
      * It tries to filter out brokers which own namespace with same 
anti-affinity-group as given namespace. If all the
      * domains own namespace with same anti-affinity group then it will try to 
keep brokers with domain that has least
@@ -293,17 +307,17 @@ public class LoadManagerShared {
      * d1-3          b1-2,b2-1
      * d2-3          b3-2,b4-1
      * d3-4          b5-2,b6-2
-     * 
+     *
      * After filtering: "candidates" brokers
      * Domain-count  Brokers-count
      * ____________  ____________
      * d1-3          b2-1
      * d2-3          b4-1
-     * 
+     *
      * "candidate" broker to own anti-affinity-namespace = b2 or b4
-     * 
+     *
      * </pre>
-     * 
+     *
      * @param pulsar
      * @param assignedBundleName
      * @param candidates
@@ -366,7 +380,7 @@ public class LoadManagerShared {
     /**
      * It computes least number of namespace owned by any of the domain and 
then it filters out all the domains that own
      * namespaces more than this count.
-     * 
+     *
      * @param brokerToAntiAffinityNamespaceCount
      * @param candidates
      * @param brokerToDomainMap
@@ -403,7 +417,7 @@ public class LoadManagerShared {
     /**
      * It returns map of broker and count of namespace that are belong to the 
same anti-affinity group as given
      * {@param namespaceName}
-     * 
+     *
      * @param pulsar
      * @param namespaceName
      * @param brokerToNamespaceToBundleRange
@@ -451,12 +465,12 @@ public class LoadManagerShared {
     }
 
     /**
-     * 
+     *
      * It checks if given anti-affinity namespace should be unloaded by broker 
due to load-shedding. If all the brokers
      * are owning same number of anti-affinity namespaces then unloading this 
namespace again ends up at the same broker
      * from which it was unloaded. So, this util checks that given namespace 
should be unloaded only if it can be loaded
      * by different broker.
-     * 
+     *
      * @param namespace
      * @param bundle
      * @param currentBroker
@@ -512,7 +526,7 @@ public class LoadManagerShared {
      * It filters out brokers which owns topic higher than configured 
threshold at
      * {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/>
      * if all the brokers own topic higher than threshold then it resets the 
list with original broker candidates
-     * 
+     *
      * @param brokerCandidateCache
      * @param loadData
      * @param loadBalancerBrokerMaxTopics
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index 341136d..ed71def 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -45,7 +45,7 @@ public class OverloadShedder implements LoadSheddingStrategy {
 
     /**
      * Create an OverloadShedder with the service configuration.
-     * 
+     *
      * @param conf
      *            Service configuration to create from.
      */
@@ -55,7 +55,7 @@ public class OverloadShedder implements LoadSheddingStrategy {
 
     /**
      * Attempt to shed one bundle off every broker which is overloaded.
-     * 
+     *
      * @param loadData
      *            The load data to used to make the unloading decision.
      * @param conf
@@ -78,6 +78,10 @@ public class OverloadShedder implements LoadSheddingStrategy 
{
                 if (localData.getBundles().size() > 1) {
                     for (final String bundle : localData.getBundles()) {
                         final BundleData bundleData = 
loadData.getBundleData().get(bundle);
+                        if (bundleData == null) {
+                            continue;
+                        }
+
                         // Consider short-term message rate to address system 
resource burden
                         final TimeAverageMessageData shortTermData = 
bundleData.getShortTermData();
                         final double messageRate = 
shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 25098a9..8397727 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -31,9 +31,9 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -43,12 +43,10 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
-import org.mockito.Mockito;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnedBundle;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
-import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
@@ -56,17 +54,16 @@ import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.PersistentTopicStats;
-import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -664,6 +661,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
             assertNull(topic.getPersistentReplicator(replicatorClusterName));
             return null;
         });
+
+        producer1.close();
     }
 
     @Test(priority = 5, timeOut = 30000)
@@ -683,6 +682,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         field.setAccessible(true);
         ProducerImpl producer = (ProducerImpl) field.get(replicator);
         assertNull(producer);
+        producer1.close();
     }
 
     /**

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to