This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 69d2b66 Fixed use of static synchronized in LoadManagerShared (#1154) 69d2b66 is described below commit 69d2b66c3c125216ba7bae64300b4224fec0c633 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Feb 1 09:00:29 2018 -0800 Fixed use of static synchronized in LoadManagerShared (#1154) --- .../broker/loadbalance/impl/LoadManagerShared.java | 62 +++++++++++++--------- .../broker/loadbalance/impl/OverloadShedder.java | 8 ++- .../pulsar/broker/service/ReplicatorTest.java | 12 ++--- 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index e0af8ec..2658a95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -28,19 +28,17 @@ import java.net.URL; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.pulsar.broker.BrokerData; -import java.util.concurrent.TimeoutException; + import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.BrokerData; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; import org.apache.pulsar.broker.loadbalance.LoadData; import org.apache.pulsar.broker.stats.metrics.JvmMetrics; @@ -57,6 +55,9 @@ import org.slf4j.LoggerFactory; import com.beust.jcommander.internal.Lists; import com.google.common.collect.Maps; +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.internal.PlatformDependent; + /** * This class contains code which in shared between the two load manager implementations. */ @@ -67,11 +68,21 @@ public class LoadManagerShared { public static final int MIBI = 1024 * 1024; // Cache for primary brokers according to policies. - private static final Set<String> primariesCache = new HashSet<>(); + private static final FastThreadLocal<Set<String>> localPrimariesCache = new FastThreadLocal<Set<String>>() { + @Override + protected Set<String> initialValue() throws Exception { + return new HashSet<>(); + } + }; // Cache for shard brokers according to policies. - private static final Set<String> secondaryCache = new HashSet<>(); - + private static final FastThreadLocal<Set<String>> localSecondaryCache = new FastThreadLocal<Set<String>>() { + @Override + protected Set<String> initialValue() throws Exception { + return new HashSet<>(); + } + }; + // update LoadReport at most every 5 seconds public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5); @@ -83,12 +94,15 @@ public class LoadManagerShared { // Determines the brokers available for the given service unit according to the given policies. // The brokers are put into brokerCandidateCache. - public static synchronized void applyNamespacePolicies(final ServiceUnitId serviceUnit, + public static void applyNamespacePolicies(final ServiceUnitId serviceUnit, final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache, - final Set<String> availableBrokers, - final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) { + final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) { + Set<String> primariesCache = localPrimariesCache.get(); primariesCache.clear(); + + Set<String> secondaryCache = localSecondaryCache.get(); secondaryCache.clear(); + NamespaceName namespace = serviceUnit.getNamespaceObject(); boolean isIsolationPoliciesPresent = policies.areIsolationPoliciesPresent(namespace); boolean isNonPersistentTopic = (serviceUnit instanceof NamespaceBundle) @@ -141,7 +155,7 @@ public class LoadManagerShared { } } else if (!isNonPersistentTopic && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) { - // persistent topic can be assigned to only brokers that enabled for persistent-topic + // persistent topic can be assigned to only brokers that enabled for persistent-topic if (log.isDebugEnabled()) { log.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]", brokerUrl.getHost(), namespace.toString()); @@ -217,7 +231,7 @@ public class LoadManagerShared { // Collect JVM direct memory systemResourceUsage.directMemory.usage = (double) (JvmMetrics.getJvmDirectMemoryUsed() / MIBI); - systemResourceUsage.directMemory.limit = (double) (sun.misc.VM.maxDirectMemory() / MIBI); + systemResourceUsage.directMemory.limit = (double) (PlatformDependent.maxDirectMemory() / MIBI); return systemResourceUsage; } @@ -235,7 +249,7 @@ public class LoadManagerShared { /** * Removes the brokers which have more bundles assigned to them in the same namespace as the incoming bundle than at * least one other available broker from consideration. - * + * * @param assignedBundleName * Name of bundle to be assigned. * @param candidates @@ -280,7 +294,7 @@ public class LoadManagerShared { .size() != finalLeastBundles); } } - + /** * It tries to filter out brokers which own namespace with same anti-affinity-group as given namespace. If all the * domains own namespace with same anti-affinity group then it will try to keep brokers with domain that has least @@ -293,17 +307,17 @@ public class LoadManagerShared { * d1-3 b1-2,b2-1 * d2-3 b3-2,b4-1 * d3-4 b5-2,b6-2 - * + * * After filtering: "candidates" brokers * Domain-count Brokers-count * ____________ ____________ * d1-3 b2-1 * d2-3 b4-1 - * + * * "candidate" broker to own anti-affinity-namespace = b2 or b4 - * + * * </pre> - * + * * @param pulsar * @param assignedBundleName * @param candidates @@ -366,7 +380,7 @@ public class LoadManagerShared { /** * It computes least number of namespace owned by any of the domain and then it filters out all the domains that own * namespaces more than this count. - * + * * @param brokerToAntiAffinityNamespaceCount * @param candidates * @param brokerToDomainMap @@ -403,7 +417,7 @@ public class LoadManagerShared { /** * It returns map of broker and count of namespace that are belong to the same anti-affinity group as given * {@param namespaceName} - * + * * @param pulsar * @param namespaceName * @param brokerToNamespaceToBundleRange @@ -451,12 +465,12 @@ public class LoadManagerShared { } /** - * + * * It checks if given anti-affinity namespace should be unloaded by broker due to load-shedding. If all the brokers * are owning same number of anti-affinity namespaces then unloading this namespace again ends up at the same broker * from which it was unloaded. So, this util checks that given namespace should be unloaded only if it can be loaded * by different broker. - * + * * @param namespace * @param bundle * @param currentBroker @@ -512,7 +526,7 @@ public class LoadManagerShared { * It filters out brokers which owns topic higher than configured threshold at * {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/> * if all the brokers own topic higher than threshold then it resets the list with original broker candidates - * + * * @param brokerCandidateCache * @param loadData * @param loadBalancerBrokerMaxTopics diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java index 341136d..ed71def 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java @@ -45,7 +45,7 @@ public class OverloadShedder implements LoadSheddingStrategy { /** * Create an OverloadShedder with the service configuration. - * + * * @param conf * Service configuration to create from. */ @@ -55,7 +55,7 @@ public class OverloadShedder implements LoadSheddingStrategy { /** * Attempt to shed one bundle off every broker which is overloaded. - * + * * @param loadData * The load data to used to make the unloading decision. * @param conf @@ -78,6 +78,10 @@ public class OverloadShedder implements LoadSheddingStrategy { if (localData.getBundles().size() > 1) { for (final String bundle : localData.getBundles()) { final BundleData bundleData = loadData.getBundleData().get(bundle); + if (bundleData == null) { + continue; + } + // Consider short-term message rate to address system resource burden final TimeAverageMessageData shortTermData = bundleData.getShortTermData(); final double messageRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 25098a9..8397727 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -31,9 +31,9 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -43,12 +43,10 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; -import org.mockito.Mockito; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.namespace.OwnedBundle; import org.apache.pulsar.broker.namespace.OwnershipCache; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; @@ -56,17 +54,16 @@ import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.PersistentTopicStats; -import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; +import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -664,6 +661,8 @@ public class ReplicatorTest extends ReplicatorTestBase { assertNull(topic.getPersistentReplicator(replicatorClusterName)); return null; }); + + producer1.close(); } @Test(priority = 5, timeOut = 30000) @@ -683,6 +682,7 @@ public class ReplicatorTest extends ReplicatorTestBase { field.setAccessible(true); ProducerImpl producer = (ProducerImpl) field.get(replicator); assertNull(producer); + producer1.close(); } /** -- To stop receiving notification emails like this one, please contact mme...@apache.org.