This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 290e501 ARTEMIS-3645 Support broker balancer cache persistence
new d22e675 This closes #3914
290e501 is described below
commit 290e5016c890646eef2d4730d297c97150957933
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Fri Jan 14 15:07:53 2022 +0100
ARTEMIS-3645 Support broker balancer cache persistence
---
.../balancing/BrokerBalancerConfiguration.java | 10 +-
.../core/config/balancing/CacheConfiguration.java | 47 +++++++
.../deployers/impl/FileConfigurationParser.java | 18 ++-
.../artemis/core/persistence/StorageManager.java | 7 +
.../persistence/config/PersistedKeyValuePair.java | 97 +++++++++++++
.../journal/AbstractJournalStorageManager.java | 53 +++++++
.../persistence/impl/journal/JournalRecordIds.java | 1 +
.../impl/nullpm/NullStorageManager.java | 16 +++
.../core/server/balancing/BrokerBalancer.java | 57 ++++----
.../server/balancing/BrokerBalancerManager.java | 21 ++-
.../core/server/balancing/caches/Cache.java | 29 ++++
.../core/server/balancing/caches/LocalCache.java | 134 ++++++++++++++++++
.../core/server/balancing/pools/AbstractPool.java | 21 +++
.../artemis/core/server/balancing/pools/Pool.java | 2 +
.../server/balancing/targets/ActiveMQTarget.java | 8 +-
.../resources/schema/artemis-configuration.xsd | 22 ++-
.../core/config/impl/FileConfigurationTest.java | 4 +-
.../core/server/balancing/BrokerBalancerTest.java | 4 +-
.../server/balancing/caches/LocalCacheTest.java | 155 +++++++++++++++++++++
.../core/transaction/impl/TransactionImplTest.java | 16 +++
.../resources/ConfigurationTest-full-config.xml | 5 +-
.../ConfigurationTest-xinclude-config.xml | 5 +-
docs/user-manual/en/broker-balancers.md | 20 ++-
.../en/images/broker_balancer_workflow.png | Bin 96089 -> 95086 bytes
.../en/images/management_api_redirect_sequence.png | Bin 12611 -> 12103 bytes
.../en/images/native_redirect_sequence.png | Bin 11169 -> 11955 bytes
.../integration/balancing/BalancingTestBase.java | 15 +-
.../tests/integration/balancing/RedirectTest.java | 28 +++-
.../tests/integration/client/SendAckFailTest.java | 16 +++
29 files changed, 755 insertions(+), 56 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
index e20d033..cf218fa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
@@ -26,7 +26,7 @@ public class BrokerBalancerConfiguration implements
Serializable {
private TargetKey targetKey = TargetKey.SOURCE_IP;
private String targetKeyFilter = null;
private String localTargetFilter = null;
- private int cacheTimeout = -1;
+ private CacheConfiguration cacheConfiguration = null;
private PoolConfiguration poolConfiguration = null;
private NamedPropertyConfiguration policyConfiguration = null;
private NamedPropertyConfiguration transformerConfiguration = null;
@@ -67,12 +67,12 @@ public class BrokerBalancerConfiguration implements
Serializable {
return this;
}
- public int getCacheTimeout() {
- return cacheTimeout;
+ public CacheConfiguration getCacheConfiguration() {
+ return cacheConfiguration;
}
- public BrokerBalancerConfiguration setCacheTimeout(int cacheTimeout) {
- this.cacheTimeout = cacheTimeout;
+ public BrokerBalancerConfiguration setCacheConfiguration(CacheConfiguration
cacheConfiguration) {
+ this.cacheConfiguration = cacheConfiguration;
return this;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java
new file mode 100644
index 0000000..f3bf22a
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config.balancing;
+
+import java.io.Serializable;
+
+public class CacheConfiguration implements Serializable {
+ private boolean persisted = false;
+
+ private int timeout = 0;
+
+ public CacheConfiguration() {
+ }
+
+ public boolean isPersisted() {
+ return persisted;
+ }
+
+ public CacheConfiguration setPersisted(boolean persisted) {
+ this.persisted = persisted;
+ return this;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public CacheConfiguration setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 50a5135..907b170 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -47,6 +47,7 @@ import
org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import
org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import
org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -2653,9 +2654,6 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
brokerBalancerConfiguration.setLocalTargetFilter(getString(e,
"local-target-filter", brokerBalancerConfiguration.getLocalTargetFilter(),
Validators.NO_CHECK));
- brokerBalancerConfiguration.setCacheTimeout(getInteger(e,
"cache-timeout",
- brokerBalancerConfiguration.getCacheTimeout(),
Validators.MINUS_ONE_OR_GE_ZERO));
-
NamedPropertyConfiguration policyConfiguration = null;
PoolConfiguration poolConfiguration = null;
NodeList children = e.getChildNodes();
@@ -2663,7 +2661,11 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
for (int j = 0; j < children.getLength(); j++) {
Node child = children.item(j);
- if (child.getNodeName().equals("policy")) {
+ if (child.getNodeName().equals("cache")) {
+ CacheConfiguration cacheConfiguration = new CacheConfiguration();
+ parseCacheConfiguration((Element) child, cacheConfiguration);
+
brokerBalancerConfiguration.setCacheConfiguration(cacheConfiguration);
+ } else if (child.getNodeName().equals("policy")) {
policyConfiguration = new NamedPropertyConfiguration();
parsePolicyConfiguration((Element) child, policyConfiguration);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration);
@@ -2681,6 +2683,14 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.getBalancerConfigurations().add(brokerBalancerConfiguration);
}
+ private void parseCacheConfiguration(final Element e, final
CacheConfiguration cacheConfiguration) throws ClassNotFoundException {
+ cacheConfiguration.setPersisted(getBoolean(e, "persisted",
+ cacheConfiguration.isPersisted()));
+
+ cacheConfiguration.setTimeout(getInteger(e, "timeout",
+ cacheConfiguration.getTimeout(), Validators.GE_ZERO));
+ }
+
private void parseTransformerConfiguration(final Element e, final
NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
String name = e.getAttribute("name");
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 748f682..ee747a6 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -383,6 +384,12 @@ public interface StorageManager extends IDGenerator,
ActiveMQComponent {
Map<String, PersistedRole> getPersistedRoles();
+ void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws
Exception;
+
+ void deleteKeyValuePair(String mapId, String key) throws Exception;
+
+ Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId);
+
/**
* @return The ID with the stored counter
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java
new file mode 100644
index 0000000..0bfb778
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.config;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.BufferHelper;
+
+public class PersistedKeyValuePair implements EncodingSupport {
+
+ private long storeId;
+
+ private String mapId;
+
+ private String key;
+
+ private String value;
+
+ public PersistedKeyValuePair() {
+ }
+
+ public PersistedKeyValuePair(String mapId, String key, String value) {
+ this.mapId = mapId;
+ this.key = key;
+ this.value = value;
+ }
+
+ public void setStoreId(long id) {
+ this.storeId = id;
+ }
+
+ public long getStoreId() {
+ return storeId;
+ }
+
+ public String getMapId() {
+ return mapId;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public int getEncodeSize() {
+ int size = 0;
+ size += BufferHelper.sizeOfString(mapId);
+ size += BufferHelper.sizeOfString(key);
+ size += BufferHelper.sizeOfString(value);
+ return size;
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer) {
+ buffer.writeString(mapId);
+ buffer.writeString(key);
+ buffer.writeString(value);
+ }
+
+ @Override
+ public void decode(ActiveMQBuffer buffer) {
+ mapId = buffer.readString();
+ key = buffer.readString();
+ value = buffer.readString();
+ }
+
+ @Override
+ public String toString() {
+ return "PersistedKeyValuePair [storeId=" + storeId +
+ ", mapId=" +
+ mapId +
+ ", key=" +
+ key +
+ ", value=" +
+ value +
+ "]";
+ }
+}
\ No newline at end of file
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index b7d3101..1c5532e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -76,6 +76,7 @@ import
org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -220,6 +221,8 @@ public abstract class AbstractJournalStorageManager extends
CriticalComponentImp
protected final Map<String, PersistedRole> mapPersistedRoles = new
ConcurrentHashMap<>();
+ protected final Map<String, Map<String, PersistedKeyValuePair>>
mapPersistedKeyValuePairs = new ConcurrentHashMap<>();
+
protected final ConcurrentLongHashMap<LargeServerMessage>
largeMessagesToDelete = new ConcurrentLongHashMap<>();
public AbstractJournalStorageManager(final Configuration config,
@@ -819,6 +822,41 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
}
@Override
+ public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair)
throws Exception {
+ deleteKeyValuePair(persistedKeyValuePair.getMapId(),
persistedKeyValuePair.getKey());
+ try (ArtemisCloseable lock = closeableReadLock()) {
+ final long id = idGenerator.generateID();
+ persistedKeyValuePair.setStoreId(id);
+ bindingsJournal.appendAddRecord(id,
JournalRecordIds.KEY_VALUE_PAIR_RECORD, persistedKeyValuePair, true);
+ Map<String, PersistedKeyValuePair> persistedKeyValuePairs =
mapPersistedKeyValuePairs.get(persistedKeyValuePair.getMapId());
+ if (persistedKeyValuePairs == null) {
+ persistedKeyValuePairs = new HashMap<>();
+ mapPersistedKeyValuePairs.put(persistedKeyValuePair.getMapId(),
persistedKeyValuePairs);
+ }
+ persistedKeyValuePairs.put(persistedKeyValuePair.getKey(),
persistedKeyValuePair);
+ }
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws Exception {
+ Map<String, PersistedKeyValuePair> persistedKeyValuePairs =
mapPersistedKeyValuePairs.get(mapId);
+ if (persistedKeyValuePairs != null) {
+ PersistedKeyValuePair oldMapStringEntry =
persistedKeyValuePairs.remove(key);
+ if (oldMapStringEntry != null) {
+ try (ArtemisCloseable lock = closeableReadLock()) {
+
bindingsJournal.tryAppendDeleteRecord(oldMapStringEntry.getStoreId(),
this::recordNotFoundCallback, false);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String
mapId) {
+ Map<String, PersistedKeyValuePair> persistedKeyValuePairs =
mapPersistedKeyValuePairs.get(mapId);
+ return persistedKeyValuePairs != null ? new
HashMap<>(persistedKeyValuePairs) : new HashMap<>();
+ }
+
+ @Override
public void storeID(final long journalID, final long id) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendAddRecord(journalID,
JournalRecordIds.ID_COUNTER_RECORD,
BatchingIDGenerator.createIDEncodingSupport(id), true, getContext(true));
@@ -1534,6 +1572,14 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
} else if (rec == JournalRecordIds.ROLE_RECORD) {
PersistedRole role = newRoleEncoding(id, buffer);
mapPersistedRoles.put(role.getUsername(), role);
+ } else if (rec == JournalRecordIds.KEY_VALUE_PAIR_RECORD) {
+ PersistedKeyValuePair keyValuePair =
newKeyValuePairEncoding(id, buffer);
+ Map<String, PersistedKeyValuePair> persistedKeyValuePairs =
mapPersistedKeyValuePairs.get(keyValuePair.getMapId());
+ if (persistedKeyValuePairs == null) {
+ persistedKeyValuePairs = new HashMap<>();
+ mapPersistedKeyValuePairs.put(keyValuePair.getMapId(),
persistedKeyValuePairs);
+ }
+ persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair);
} else {
// unlikely to happen
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new
Exception("invalid record type " + rec));
@@ -2046,6 +2092,13 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
return persistedRole;
}
+ static PersistedKeyValuePair newKeyValuePairEncoding(long id,
ActiveMQBuffer buffer) {
+ PersistedKeyValuePair persistedKeyValuePair = new
PersistedKeyValuePair();
+ persistedKeyValuePair.decode(buffer);
+ persistedKeyValuePair.setStoreId(id);
+ return persistedKeyValuePair;
+ }
+
/**
* @param id
* @param buffer
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 7bd371f..d11c619 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -99,4 +99,5 @@ public final class JournalRecordIds {
// Used to record the large message body on the journal when history is on
public static final byte ADD_MESSAGE_BODY = 49;
+ public static final byte KEY_VALUE_PAIR_RECORD = 50;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 61a95c8..b1e99e9 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -49,6 +49,7 @@ import
org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -489,6 +490,21 @@ public class NullStorageManager implements StorageManager {
}
@Override
+ public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair)
throws Exception {
+
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws Exception {
+
+ }
+
+ @Override
+ public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String
mapId) {
+ return null;
+ }
+
+ @Override
public void storeSecuritySetting(final PersistedSecuritySetting
persistedRoles) throws Exception {
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
index ae43e11..55f8c57 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
@@ -17,10 +17,9 @@
package org.apache.activemq.artemis.core.server.balancing;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
@@ -32,7 +31,6 @@ import
org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class BrokerBalancer implements ActiveMQComponent {
@@ -57,7 +55,7 @@ public class BrokerBalancer implements ActiveMQComponent {
private final KeyTransformer transformer;
- private final Cache<String, TargetResult> cache;
+ private final Cache cache;
private volatile boolean started = false;
@@ -85,7 +83,7 @@ public class BrokerBalancer implements ActiveMQComponent {
return policy;
}
- public Cache<String, TargetResult> getCache() {
+ public Cache getCache() {
return cache;
}
@@ -100,10 +98,10 @@ public class BrokerBalancer implements ActiveMQComponent {
final String targetKeyFilter,
final Target localTarget,
final String localTargetFilter,
+ final Cache cache,
final Pool pool,
final Policy policy,
- KeyTransformer transformer,
- final int cacheTimeout) {
+ KeyTransformer transformer) {
this.name = name;
this.targetKey = targetKey;
@@ -120,17 +118,15 @@ public class BrokerBalancer implements ActiveMQComponent {
this.policy = policy;
- if (cacheTimeout == -1) {
- this.cache = CacheBuilder.newBuilder().build();
- } else if (cacheTimeout > 0) {
- this.cache =
CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout,
TimeUnit.MILLISECONDS).build();
- } else {
- this.cache = null;
- }
+ this.cache = cache;
}
@Override
public void start() throws Exception {
+ if (cache != null) {
+ cache.start();
+ }
+
if (pool != null) {
pool.start();
}
@@ -145,6 +141,10 @@ public class BrokerBalancer implements ActiveMQComponent {
if (pool != null) {
pool.stop();
}
+
+ if (cache != null) {
+ cache.stop();
+ }
}
public TargetResult getTarget(Connection connection, String clientID,
String username) {
@@ -176,20 +176,25 @@ public class BrokerBalancer implements ActiveMQComponent {
TargetResult result = null;
if (cache != null) {
- result = cache.getIfPresent(key);
- }
+ String nodeId = cache.get(key);
- if (result != null) {
- if (pool.isTargetReady(result.getTarget())) {
- if (logger.isDebugEnabled()) {
- logger.debug("The cache returns [" + result.getTarget() + "]
ready for " + targetKey + "[" + key + "]");
- }
-
- return result;
+ if (logger.isDebugEnabled()) {
+ logger.debug("The cache returns target [" + nodeId + "] for " +
targetKey + "[" + key + "]");
}
- if (logger.isDebugEnabled()) {
- logger.debug("The cache returns [" + result.getTarget() + "] not
ready for " + targetKey + "[" + key + "]");
+ if (nodeId != null) {
+ Target target = pool.getReadyTarget(nodeId);
+ if (target != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The target [" + nodeId + "] is ready for " +
targetKey + "[" + key + "]");
+ }
+
+ return new TargetResult(target);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("The target [" + nodeId + "] is not ready for " +
targetKey + "[" + key + "]");
+ }
}
}
@@ -207,7 +212,7 @@ public class BrokerBalancer implements ActiveMQComponent {
if (logger.isDebugEnabled()) {
logger.debug("Caching " + targetKey + "[" + key + "] for [" +
target + "]");
}
- cache.put(key, result);
+ cache.put(key, target.getNodeID());
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
index f9b8002..72b99d1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
@@ -21,11 +21,14 @@ import
org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import
org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import
org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
+import org.apache.activemq.artemis.core.server.balancing.caches.LocalCache;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import
org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactory;
import
org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
@@ -54,6 +57,8 @@ import java.util.concurrent.ScheduledExecutorService;
public final class BrokerBalancerManager implements ActiveMQComponent {
private static final Logger logger =
Logger.getLogger(BrokerBalancerManager.class);
+ public static final String CACHE_ID_PREFIX = "$.BC.";
+
private final Configuration config;
@@ -91,6 +96,13 @@ public final class BrokerBalancerManager implements
ActiveMQComponent {
Target localTarget = new LocalTarget(null, server);
+
+ Cache cache = null;
+ CacheConfiguration cacheConfiguration = config.getCacheConfiguration();
+ if (cacheConfiguration != null) {
+ cache = deployCache(cacheConfiguration, config.getName());
+ }
+
Pool pool = null;
final PoolConfiguration poolConfiguration =
config.getPoolConfiguration();
if (poolConfiguration != null) {
@@ -110,13 +122,20 @@ public final class BrokerBalancerManager implements
ActiveMQComponent {
}
BrokerBalancer balancer = new BrokerBalancer(config.getName(),
config.getTargetKey(), config.getTargetKeyFilter(),
- localTarget,
config.getLocalTargetFilter(), pool, policy, transformer,
config.getCacheTimeout());
+ localTarget,
config.getLocalTargetFilter(), cache, pool, policy, transformer);
balancerControllers.put(balancer.getName(), balancer);
server.getManagementService().registerBrokerBalancer(balancer);
}
+ private Cache deployCache(CacheConfiguration configuration, String name)
throws ClassNotFoundException {
+ Cache cache = new LocalCache(CACHE_ID_PREFIX + name,
configuration.isPersisted(),
+ configuration.getTimeout(), server.getStorageManager());
+
+ return cache;
+ }
+
private Pool deployPool(PoolConfiguration config, Target localTarget)
throws Exception {
Pool pool;
TargetFactory targetFactory = new ActiveMQTargetFactory();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java
new file mode 100644
index 0000000..58b1b1d
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+public interface Cache {
+
+ void start();
+
+ void stop();
+
+ String get(String key);
+
+ void put(String key, String nodeId);
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java
new file mode 100644
index 0000000..94113f2
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
+import org.jboss.logging.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class LocalCache implements Cache, RemovalListener<String, String> {
+ private static final Logger logger = Logger.getLogger(LocalCache.class);
+
+ private String id;
+ private boolean persisted;
+ private int timeout;
+ private StorageManager storageManager;
+ private com.google.common.cache.Cache<String, String> cache;
+ private Map<String, PersistedKeyValuePair> persistedCacheEntries;
+
+ private volatile boolean running;
+
+ public String getId() {
+ return id;
+ }
+
+ public boolean isPersisted() {
+ return persisted;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public LocalCache(String id, boolean persisted, int timeout, StorageManager
storageManager) {
+ this.id = id;
+ this.persisted = persisted;
+ this.timeout = timeout;
+ this.storageManager = storageManager;
+
+ if (timeout == 0) {
+ cache = CacheBuilder.newBuilder().build();
+ } else {
+ cache =
CacheBuilder.newBuilder().removalListener(this).expireAfterAccess(timeout,
TimeUnit.MILLISECONDS).build();
+ }
+ }
+
+
+ @Override
+ public void start() {
+ if (persisted) {
+ persistedCacheEntries = storageManager.getPersistedKeyValuePairs(id);
+
+ for (Map.Entry<String, PersistedKeyValuePair> cacheEntry :
persistedCacheEntries.entrySet()) {
+ cache.put(cacheEntry.getKey(), cacheEntry.getValue().getValue());
+ logger.info(cacheEntry.toString());
+ }
+ }
+
+ running = true;
+ }
+
+ @Override
+ public void stop() {
+ cache.cleanUp();
+
+ if (persistedCacheEntries != null) {
+ persistedCacheEntries.clear();
+ }
+
+ running = false;
+ }
+
+ @Override
+ public String get(String key) {
+ return cache.getIfPresent(key);
+ }
+
+ @Override
+ public void put(String key, String nodeId) {
+ if (persisted) {
+ PersistedKeyValuePair persistedKeyValuePair =
persistedCacheEntries.get(key);
+
+ if (persistedKeyValuePair == null || !Objects.equals(nodeId,
persistedKeyValuePair.getValue())) {
+ persistedKeyValuePair = new PersistedKeyValuePair(id, key, nodeId);
+
+ try {
+ storageManager.storeKeyValuePair(persistedKeyValuePair);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ persistedCacheEntries.put(key, persistedKeyValuePair);
+ }
+ }
+
+ cache.put(key, nodeId);
+ }
+
+ @Override
+ public void onRemoval(RemovalNotification<String, String> notification) {
+ if (running && persisted) {
+ PersistedKeyValuePair persistedKeyValuePair =
persistedCacheEntries.remove(notification.getKey());
+
+ if (persistedKeyValuePair != null) {
+ try {
+
storageManager.deleteKeyValuePair(persistedKeyValuePair.getMapId(),
persistedKeyValuePair.getKey());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
index 5ab6145..7860625 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
@@ -176,6 +176,27 @@ public abstract class AbstractPool implements Pool {
}
@Override
+ public Target getReadyTarget(String nodeId) {
+ int readyTargets;
+ long deadline = quorumTimeout > 0 ? System.nanoTime() +
quorumTimeoutNanos : 0;
+
+ do {
+ readyTargets = 0;
+ for (TargetMonitor targetMonitor : targetMonitors) {
+ if (targetMonitor.isTargetReady()) {
+ readyTargets++;
+ if (nodeId.equals(targetMonitor.getTarget().getNodeID())) {
+ return targetMonitor.getTarget();
+ }
+ }
+ }
+ }
+ while(readyTargets < quorumSize && deadline > 0 && (System.nanoTime() -
deadline) < 0);
+
+ return null;
+ }
+
+ @Override
public void addTargetProbe(TargetProbe probe) {
targetProbes.add(probe);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
index db6b733..43e3214 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
@@ -46,6 +46,8 @@ public interface Pool extends ActiveMQComponent {
Target getTarget(String nodeId);
+ Target getReadyTarget(String nodeId);
+
boolean isTargetReady(Target target);
List<Target> getTargets();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
index 0fc1cad..adbd426 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
@@ -69,6 +69,10 @@ public class ActiveMQTarget extends AbstractTarget
implements FailureListener {
false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE,
BrokerBalancer.CLIENT_ID_PREFIX +
UUIDGenerator.getInstance().generateStringUUID()).start());
+ if (getNodeID() == null) {
+ setNodeID(getAttribute(ResourceNames.BROKER, "NodeID", String.class,
3000));
+ }
+
connected = true;
fireConnectedEvent();
@@ -92,10 +96,6 @@ public class ActiveMQTarget extends AbstractTarget
implements FailureListener {
@Override
public boolean checkReadiness() {
try {
- if (getNodeID() == null) {
- setNodeID(getAttribute(ResourceNames.BROKER, "NodeID",
String.class, 3000));
- }
-
return getAttribute(ResourceNames.BROKER, "Active", Boolean.class,
3000);
} catch (Exception e) {
logger.warn("Error on check readiness", e);
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index f143174..8f8a60f 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2123,7 +2123,7 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
- <xsd:element name="cache-timeout" type="xsd:long" default="-1"
maxOccurs="1" minOccurs="0">
+ <xsd:element name="cache" type="brokerBalancerCacheType"
maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the time period for a cache entry to remain active
@@ -2171,6 +2171,26 @@
</xsd:restriction>
</xsd:simpleType>
+ <xsd:complexType name="brokerBalancerCacheType">
+ <xsd:sequence maxOccurs="unbounded">
+ <xsd:element name="persisted" type="xsd:boolean" default="false"
maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ true means that the cache entries are persisted
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ <xsd:element name="timeout" type="xsd:long" default="-1"
maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ the timeout (in milliseconds) before removing cache entries
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+ </xsd:sequence>
+ <xsd:attributeGroup ref="xml:specialAttrs"/>
+ </xsd:complexType>
+
<xsd:complexType name="brokerBalancerPolicyType">
<xsd:sequence>
<xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 603f082..7a21cfe 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -301,7 +301,9 @@ public class FileConfigurationTest extends
ConfigurationImplTest {
} else {
Assert.assertEquals(bc.getTargetKey(), TargetKey.SOURCE_IP);
Assert.assertEquals("least-connections-balancer", bc.getName());
- Assert.assertEquals(60000, bc.getCacheTimeout());
+ Assert.assertNotNull(bc.getCacheConfiguration());
+ Assert.assertEquals(true,
bc.getCacheConfiguration().isPersisted());
+ Assert.assertEquals(60000,
bc.getCacheConfiguration().getTimeout());
Assert.assertEquals(bc.getPolicyConfiguration().getName(),
LeastConnectionsPolicy.NAME);
Assert.assertEquals(3000,
bc.getPoolConfiguration().getCheckPeriod());
Assert.assertEquals(2, bc.getPoolConfiguration().getQuorumSize());
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
index 732ea53..fc6acc9 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
@@ -53,7 +53,7 @@ public class BrokerBalancerTest {
Pool pool = null;
Policy policy = null;
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
- localTarget, "^FOO.*", pool, policy,
null, 0);
+ localTarget, "^FOO.*", null, pool,
policy, null);
assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT,
underTest.getTarget("BAR_EE"));
}
@@ -69,7 +69,7 @@ public class BrokerBalancerTest {
}
};
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
- localTarget, "^FOO.*", pool, policy,
keyTransformer, 0);
+ localTarget, "^FOO.*", null, pool,
policy, keyTransformer);
assertEquals( localTarget,
underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java
new file mode 100644
index 0000000..5fc3f3b
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
+import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LocalCacheTest {
+ private static final String CACHE_NAME = "TEST";
+ private static final int CACHE_TIMEOUT = 500;
+ private static final String CACHE_ENTRY_KEY = "TEST_KEY";
+ private static final String CACHE_ENTRY_VALUE = "TEST_VALUE";
+
+ @Test
+ public void testValidEntry() {
+ LocalCache cache = new LocalCache(CACHE_NAME, false, 0, null);
+
+ cache.start();
+
+ try {
+ cache.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+ Assert.assertEquals(CACHE_ENTRY_VALUE, cache.get(CACHE_ENTRY_KEY));
+ } finally {
+ cache.stop();
+ }
+ }
+
+ @Test
+ public void testExpiration() throws Exception {
+ LocalCache cache = new LocalCache(CACHE_NAME, false, CACHE_TIMEOUT,
null);
+
+ cache.start();
+
+ try {
+ cache.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+ Assert.assertEquals(CACHE_ENTRY_VALUE, cache.get(CACHE_ENTRY_KEY));
+ Wait.assertTrue(() -> cache.get(CACHE_ENTRY_KEY) == null,
CACHE_TIMEOUT * 2, CACHE_TIMEOUT);
+ } finally {
+ cache.stop();
+ }
+ }
+
+ @Test
+ public void testPersistedEntry() {
+ StorageManager storageManager = new DummyKeyValuePairStorageManager();
+
+ LocalCache cacheBeforeStop = new LocalCache(CACHE_NAME, true, 0,
storageManager);
+
+ cacheBeforeStop.start();
+
+ try {
+ cacheBeforeStop.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+ Assert.assertEquals(CACHE_ENTRY_VALUE,
cacheBeforeStop.get(CACHE_ENTRY_KEY));
+ } finally {
+ cacheBeforeStop.stop();
+ }
+
+ Assert.assertEquals(CACHE_ENTRY_VALUE,
storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+
+ LocalCache cacheAfterStop = new LocalCache(CACHE_NAME, true, 0,
storageManager);
+
+ cacheAfterStop.start();
+
+ try {
+ Assert.assertEquals(CACHE_ENTRY_VALUE,
cacheAfterStop.get(CACHE_ENTRY_KEY));
+ } finally {
+ cacheAfterStop.stop();
+ }
+
+ Assert.assertEquals(CACHE_ENTRY_VALUE,
storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+ }
+
+ @Test
+ public void testPersistedExpiration() throws Exception {
+ StorageManager storageManager = new DummyKeyValuePairStorageManager();
+
+ LocalCache cacheBeforeStop = new LocalCache(CACHE_NAME, true,
CACHE_TIMEOUT, storageManager);
+
+ cacheBeforeStop.start();
+
+ try {
+ cacheBeforeStop.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+ Assert.assertEquals(CACHE_ENTRY_VALUE,
cacheBeforeStop.get(CACHE_ENTRY_KEY));
+ } finally {
+ cacheBeforeStop.stop();
+ }
+
+ Assert.assertEquals(CACHE_ENTRY_VALUE,
storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+
+ LocalCache cacheAfterStop = new LocalCache(CACHE_NAME, true,
CACHE_TIMEOUT, storageManager);
+
+ cacheAfterStop.start();
+
+ try {
+ Assert.assertEquals(CACHE_ENTRY_VALUE,
cacheAfterStop.get(CACHE_ENTRY_KEY));
+ Thread.sleep(CACHE_TIMEOUT * 2);
+ Assert.assertNull(cacheAfterStop.get(CACHE_ENTRY_KEY));
+ } finally {
+ cacheAfterStop.stop();
+ }
+
+
Assert.assertNull(storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY));
+ }
+
+ static class DummyKeyValuePairStorageManager extends NullStorageManager {
+ private Map<String, Map<String, PersistedKeyValuePair>>
mapPersistedKeyValuePairs = new ConcurrentHashMap<>();
+
+ @Override
+ public void storeKeyValuePair(PersistedKeyValuePair
persistedKeyValuePair) throws Exception {
+ Map<String, PersistedKeyValuePair> persistedKeyValuePairs =
mapPersistedKeyValuePairs.get(persistedKeyValuePair.getMapId());
+ if (persistedKeyValuePairs == null) {
+ persistedKeyValuePairs = new HashMap<>();
+ mapPersistedKeyValuePairs.put(persistedKeyValuePair.getMapId(),
persistedKeyValuePairs);
+ }
+ persistedKeyValuePairs.put(persistedKeyValuePair.getKey(),
persistedKeyValuePair);
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws
Exception {
+ Map<String, PersistedKeyValuePair> persistedKeyValuePairs =
mapPersistedKeyValuePairs.get(mapId);
+ if (persistedKeyValuePairs != null) {
+ persistedKeyValuePairs.remove(key);
+ }
+ }
+
+ @Override
+ public Map<String, PersistedKeyValuePair>
getPersistedKeyValuePairs(String mapId) {
+ Map<String, PersistedKeyValuePair> persistedKeyValuePairs =
mapPersistedKeyValuePairs.get(mapId);
+ return persistedKeyValuePairs != null ? new
HashMap<>(persistedKeyValuePairs) : new HashMap<>();
+ }
+ }
+}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 77ba7ee..4ceee87 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -48,6 +48,7 @@ import
org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -667,6 +668,21 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
+ public void storeKeyValuePair(PersistedKeyValuePair
persistedKeyValuePair) throws Exception {
+
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws
Exception {
+
+ }
+
+ @Override
+ public Map<String, PersistedKeyValuePair>
getPersistedKeyValuePairs(String mapId) {
+ return null;
+ }
+
+ @Override
public long storePageCounter(long txID, long queueID, long value, long
size) throws Exception {
return 0;
}
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 6764e25..c331dfa 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -188,7 +188,10 @@
</pool>
</broker-balancer>
<broker-balancer name="least-connections-balancer">
- <cache-timeout>60000</cache-timeout>
+ <cache>
+ <persisted>true</persisted>
+ <timeout>60000</timeout>
+ </cache>
<policy name="LEAST_CONNECTIONS"/>
<pool>
<check-period>3000</check-period>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index aee5fb2..70cdde0 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -179,7 +179,10 @@
</pool>
</broker-balancer>
<broker-balancer name="least-connections-balancer">
- <cache-timeout>60000</cache-timeout>
+ <cache>
+ <persisted>true</persisted>
+ <timeout>60000</timeout>
+ </cache>
<policy name="LEAST_CONNECTIONS"/>
<pool>
<check-period>3000</check-period>
diff --git a/docs/user-manual/en/broker-balancers.md
b/docs/user-manual/en/broker-balancers.md
index c821877..4fbc0a6 100644
--- a/docs/user-manual/en/broker-balancers.md
+++ b/docs/user-manual/en/broker-balancers.md
@@ -103,8 +103,19 @@ A policy is defined by the `policy` element. Let's take a
look at a policy examp
The broker balancer provides a cache with a timeout to improve the stickiness
of the target broker selected,
returning the same target broker for a target key as long as it is present in
the cache and is ready.
So a broker balancer with the cache enabled doesn't strictly follow the
configured policy.
-By default, the cache is enabled and will never timeout. See below
-for more details about setting the `cache-timeout` parameter.
+By default, the cache is not enabled.
+
+A cache is defined by the `cache` element that includes the following items:
+* the `persisted` element defines whether the cache has to persist entries,
default is `false`;
+* the `timeout` element defines the timeout before removing entries, measured
in milliseconds, setting 0 will disable the timeout, default is `0`.
+
+Let's take a look at a cache example from broker.xml:
+```xml
+<cache>
+ <persisted>true</persisted>
+ <timeout>60000</timeout>
+</cache>
+```
## Key transformers
A `local-target-key-transformer` allows target key transformation before
matching against any local-target-filter. One use case is
@@ -119,9 +130,8 @@ A broker balancer is defined by the `broker-balancer`
element, it includes the f
* the `target-key-filter` element defines a regular expression to filter the
resolved keys;
* the `local-target-filter` element defines a regular expression to match the
keys that have to return a local target;
* the `local-target-key-transformer` element defines a key transformer, see
[key transformers](#key-transformers);
-* the `cache-timeout` element is the time period for a target broker to remain
in the cache, measured in milliseconds, setting `0` will disable the cache,
default is `-1`, meaning no expiration;
-* the `pool` element defines the pool to group the target brokers, see
[pools](#pools).
-* the `policy` element defines the policy used to select the target brokers
from the pool, see [policies](#policies);
+* the `pool` element defines the pool to group the target brokers, see
[pools](#pools);
+* the `policy` element defines the policy used to select the target brokers
from the pool, see [policies](#policies).
Let's take a look at some broker balancer examples from broker.xml:
```xml
diff --git a/docs/user-manual/en/images/broker_balancer_workflow.png
b/docs/user-manual/en/images/broker_balancer_workflow.png
index 97560b6..457c5e1 100644
Binary files a/docs/user-manual/en/images/broker_balancer_workflow.png and
b/docs/user-manual/en/images/broker_balancer_workflow.png differ
diff --git a/docs/user-manual/en/images/management_api_redirect_sequence.png
b/docs/user-manual/en/images/management_api_redirect_sequence.png
index 371baa89..4f25326 100644
Binary files a/docs/user-manual/en/images/management_api_redirect_sequence.png
and b/docs/user-manual/en/images/management_api_redirect_sequence.png differ
diff --git a/docs/user-manual/en/images/native_redirect_sequence.png
b/docs/user-manual/en/images/native_redirect_sequence.png
index d7466da..e5b45bd 100644
Binary files a/docs/user-manual/en/images/native_redirect_sequence.png and
b/docs/user-manual/en/images/native_redirect_sequence.png differ
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
index c626325..3a82854 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import
org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import
org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@@ -151,6 +152,16 @@ public class BalancingTestBase extends ClusterTestBase {
}
+ protected void setupBalancerLocalCache(final int node, boolean persisted,
int timeout) {
+
+ Configuration configuration = getServer(node).getConfiguration();
+ BrokerBalancerConfiguration brokerBalancerConfiguration =
configuration.getBalancerConfigurations().stream()
+ .filter(brokerBalancerConfiguration1 ->
brokerBalancerConfiguration1.getName().equals(BROKER_BALANCER_NAME)).findFirst().get();
+
+ brokerBalancerConfiguration.setCacheConfiguration(
+ new CacheConfiguration().setPersisted(persisted).setTimeout(timeout));
+ }
+
protected ConnectionFactory createFactory(String protocol, boolean
sslEnabled, String host, int port, String clientID, String user, String
password) throws Exception {
return createFactory(protocol, sslEnabled, host, port, clientID, user,
password, -1);
}
@@ -215,7 +226,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
- urlBuilder.append("?failover.startupMaxReconnectAttempts=" +
retries + "&failover.randomize=true");
+ urlBuilder.append("?failover.startupMaxReconnectAttempts=" +
retries);
if (clientID != null) {
urlBuilder.append("&jms.clientID=");
@@ -243,7 +254,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
- urlBuilder.append("?startupMaxReconnectAttempts=" + retries +
"&maxReconnectAttempts=" + retries);
+ urlBuilder.append("?randomize=false&startupMaxReconnectAttempts="
+ retries + "&maxReconnectAttempts=" + retries);
if (clientID != null) {
urlBuilder.append("&jms.clientID=");
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
index 46dd417..3b7b786 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
@@ -140,15 +140,27 @@ public class RedirectTest extends BalancingTestBase {
@Test
public void testRoundRobinRedirect() throws Exception {
- testEvenlyRedirect(RoundRobinPolicy.NAME, null);
+ testEvenlyRedirect(RoundRobinPolicy.NAME, null, false);
}
@Test
public void testLeastConnectionsRedirect() throws Exception {
- testEvenlyRedirect(LeastConnectionsPolicy.NAME,
Collections.singletonMap(LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD,
String.valueOf(30)));
+ testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(
+ LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD,
String.valueOf(30)), false);
}
- private void testEvenlyRedirect(final String policyName, final Map<String,
String> properties) throws Exception {
+ @Test
+ public void testRoundRobinRedirectWithFailure() throws Exception {
+ testEvenlyRedirect(RoundRobinPolicy.NAME, null, true);
+ }
+
+ @Test
+ public void testLeastConnectionsRedirectWithFailure() throws Exception {
+ testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(
+ LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD,
String.valueOf(30)), true);
+ }
+
+ private void testEvenlyRedirect(final String policyName, final Map<String,
String> properties, final boolean withFailure) throws Exception {
final String queueName = "RedirectTestQueue";
final int targets = MULTIPLE_TARGETS;
int[] nodes = new int[targets + 1];
@@ -174,6 +186,10 @@ public class RedirectTest extends BalancingTestBase {
setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME,
policyName, properties, false, null, targets, 1, 2, 3);
}
+ if (withFailure) {
+ setupBalancerLocalCache(0, true, 0);
+ }
+
startServers(nodes);
for (int node : nodes) {
@@ -214,6 +230,12 @@ public class RedirectTest extends BalancingTestBase {
Assert.assertEquals("Messages of node " + targetNode, 1,
queueControls[targetNode].countMessages());
}
+ if (withFailure) {
+ crashAndWaitForFailure(getServer(0));
+
+ startServers(0);
+ }
+
for (int i = 0; i < targets; i++) {
try (Connection connection =
connectionFactories[i].createConnection()) {
connection.start();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 01b1c22..13d176e 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -62,6 +62,7 @@ import
org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -757,6 +758,21 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
+ public void storeKeyValuePair(PersistedKeyValuePair
persistedKeyValuePair) throws Exception {
+ manager.storeKeyValuePair(persistedKeyValuePair);
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws
Exception {
+ manager.deleteKeyValuePair(mapId, key);
+ }
+
+ @Override
+ public Map<String, PersistedKeyValuePair>
getPersistedKeyValuePairs(String mapId) {
+ return manager.getPersistedKeyValuePairs(mapId);
+ }
+
+ @Override
public long storePageCounter(long txID, long queueID, long value, long
size) throws Exception {
return manager.storePageCounter(txID, queueID, value, size);
}