This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 290e501  ARTEMIS-3645 Support broker balancer cache persistence
     new d22e675  This closes #3914
290e501 is described below

commit 290e5016c890646eef2d4730d297c97150957933
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Fri Jan 14 15:07:53 2022 +0100

    ARTEMIS-3645 Support broker balancer cache persistence
---
 .../balancing/BrokerBalancerConfiguration.java     |  10 +-
 .../core/config/balancing/CacheConfiguration.java  |  47 +++++++
 .../deployers/impl/FileConfigurationParser.java    |  18 ++-
 .../artemis/core/persistence/StorageManager.java   |   7 +
 .../persistence/config/PersistedKeyValuePair.java  |  97 +++++++++++++
 .../journal/AbstractJournalStorageManager.java     |  53 +++++++
 .../persistence/impl/journal/JournalRecordIds.java |   1 +
 .../impl/nullpm/NullStorageManager.java            |  16 +++
 .../core/server/balancing/BrokerBalancer.java      |  57 ++++----
 .../server/balancing/BrokerBalancerManager.java    |  21 ++-
 .../core/server/balancing/caches/Cache.java        |  29 ++++
 .../core/server/balancing/caches/LocalCache.java   | 134 ++++++++++++++++++
 .../core/server/balancing/pools/AbstractPool.java  |  21 +++
 .../artemis/core/server/balancing/pools/Pool.java  |   2 +
 .../server/balancing/targets/ActiveMQTarget.java   |   8 +-
 .../resources/schema/artemis-configuration.xsd     |  22 ++-
 .../core/config/impl/FileConfigurationTest.java    |   4 +-
 .../core/server/balancing/BrokerBalancerTest.java  |   4 +-
 .../server/balancing/caches/LocalCacheTest.java    | 155 +++++++++++++++++++++
 .../core/transaction/impl/TransactionImplTest.java |  16 +++
 .../resources/ConfigurationTest-full-config.xml    |   5 +-
 .../ConfigurationTest-xinclude-config.xml          |   5 +-
 docs/user-manual/en/broker-balancers.md            |  20 ++-
 .../en/images/broker_balancer_workflow.png         | Bin 96089 -> 95086 bytes
 .../en/images/management_api_redirect_sequence.png | Bin 12611 -> 12103 bytes
 .../en/images/native_redirect_sequence.png         | Bin 11169 -> 11955 bytes
 .../integration/balancing/BalancingTestBase.java   |  15 +-
 .../tests/integration/balancing/RedirectTest.java  |  28 +++-
 .../tests/integration/client/SendAckFailTest.java  |  16 +++
 29 files changed, 755 insertions(+), 56 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
index e20d033..cf218fa 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
@@ -26,7 +26,7 @@ public class BrokerBalancerConfiguration implements 
Serializable {
    private TargetKey targetKey = TargetKey.SOURCE_IP;
    private String targetKeyFilter = null;
    private String localTargetFilter = null;
-   private int cacheTimeout = -1;
+   private CacheConfiguration cacheConfiguration = null;
    private PoolConfiguration poolConfiguration = null;
    private NamedPropertyConfiguration policyConfiguration = null;
    private NamedPropertyConfiguration transformerConfiguration = null;
@@ -67,12 +67,12 @@ public class BrokerBalancerConfiguration implements 
Serializable {
       return this;
    }
 
-   public int getCacheTimeout() {
-      return cacheTimeout;
+   public CacheConfiguration getCacheConfiguration() {
+      return cacheConfiguration;
    }
 
-   public BrokerBalancerConfiguration setCacheTimeout(int cacheTimeout) {
-      this.cacheTimeout = cacheTimeout;
+   public BrokerBalancerConfiguration setCacheConfiguration(CacheConfiguration 
cacheConfiguration) {
+      this.cacheConfiguration = cacheConfiguration;
       return this;
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java
new file mode 100644
index 0000000..f3bf22a
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config.balancing;
+
+import java.io.Serializable;
+
+public class CacheConfiguration implements Serializable {
+   private boolean persisted = false;
+
+   private int timeout = 0;
+
+   public CacheConfiguration() {
+   }
+
+   public boolean isPersisted() {
+      return persisted;
+   }
+
+   public CacheConfiguration setPersisted(boolean persisted) {
+      this.persisted = persisted;
+      return this;
+   }
+
+   public int getTimeout() {
+      return timeout;
+   }
+
+   public CacheConfiguration setTimeout(int timeout) {
+      this.timeout = timeout;
+      return this;
+   }
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 50a5135..907b170 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -47,6 +47,7 @@ import 
org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import 
org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
 import 
org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -2653,9 +2654,6 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       brokerBalancerConfiguration.setLocalTargetFilter(getString(e, 
"local-target-filter", brokerBalancerConfiguration.getLocalTargetFilter(), 
Validators.NO_CHECK));
 
-      brokerBalancerConfiguration.setCacheTimeout(getInteger(e, 
"cache-timeout",
-         brokerBalancerConfiguration.getCacheTimeout(), 
Validators.MINUS_ONE_OR_GE_ZERO));
-
       NamedPropertyConfiguration policyConfiguration = null;
       PoolConfiguration poolConfiguration = null;
       NodeList children = e.getChildNodes();
@@ -2663,7 +2661,11 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
       for (int j = 0; j < children.getLength(); j++) {
          Node child = children.item(j);
 
-         if (child.getNodeName().equals("policy")) {
+         if (child.getNodeName().equals("cache")) {
+            CacheConfiguration cacheConfiguration = new CacheConfiguration();
+            parseCacheConfiguration((Element) child, cacheConfiguration);
+            
brokerBalancerConfiguration.setCacheConfiguration(cacheConfiguration);
+         } else if (child.getNodeName().equals("policy")) {
             policyConfiguration = new NamedPropertyConfiguration();
             parsePolicyConfiguration((Element) child, policyConfiguration);
             
brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration);
@@ -2681,6 +2683,14 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
       config.getBalancerConfigurations().add(brokerBalancerConfiguration);
    }
 
+   private void parseCacheConfiguration(final Element e, final 
CacheConfiguration cacheConfiguration) throws ClassNotFoundException {
+      cacheConfiguration.setPersisted(getBoolean(e, "persisted",
+         cacheConfiguration.isPersisted()));
+
+      cacheConfiguration.setTimeout(getInteger(e, "timeout",
+         cacheConfiguration.getTimeout(), Validators.GE_ZERO));
+   }
+
    private void parseTransformerConfiguration(final Element e, final 
NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
       String name = e.getAttribute("name");
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 748f682..ee747a6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import 
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -383,6 +384,12 @@ public interface StorageManager extends IDGenerator, 
ActiveMQComponent {
 
    Map<String, PersistedRole> getPersistedRoles();
 
+   void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws 
Exception;
+
+   void deleteKeyValuePair(String mapId, String key) throws Exception;
+
+   Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId);
+
    /**
     * @return The ID with the stored counter
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java
new file mode 100644
index 0000000..0bfb778
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.config;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.BufferHelper;
+
+public class PersistedKeyValuePair implements EncodingSupport {
+
+   private long storeId;
+
+   private String mapId;
+
+   private String key;
+
+   private String value;
+
+   public PersistedKeyValuePair() {
+   }
+
+   public PersistedKeyValuePair(String mapId, String key, String value) {
+      this.mapId = mapId;
+      this.key = key;
+      this.value = value;
+   }
+
+   public void setStoreId(long id) {
+      this.storeId = id;
+   }
+
+   public long getStoreId() {
+      return storeId;
+   }
+
+   public String getMapId() {
+      return mapId;
+   }
+
+   public String getKey() {
+      return key;
+   }
+
+   public String getValue() {
+      return value;
+   }
+
+   @Override
+   public int getEncodeSize() {
+      int size = 0;
+      size += BufferHelper.sizeOfString(mapId);
+      size += BufferHelper.sizeOfString(key);
+      size += BufferHelper.sizeOfString(value);
+      return size;
+   }
+
+   @Override
+   public void encode(ActiveMQBuffer buffer) {
+      buffer.writeString(mapId);
+      buffer.writeString(key);
+      buffer.writeString(value);
+   }
+
+   @Override
+   public void decode(ActiveMQBuffer buffer) {
+      mapId = buffer.readString();
+      key = buffer.readString();
+      value = buffer.readString();
+   }
+
+   @Override
+   public String toString() {
+      return "PersistedKeyValuePair [storeId=" + storeId +
+         ", mapId=" +
+         mapId +
+         ", key=" +
+         key +
+         ", value=" +
+         value +
+         "]";
+   }
+}
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index b7d3101..1c5532e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -76,6 +76,7 @@ import 
org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import 
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -220,6 +221,8 @@ public abstract class AbstractJournalStorageManager extends 
CriticalComponentImp
 
    protected final Map<String, PersistedRole> mapPersistedRoles = new 
ConcurrentHashMap<>();
 
+   protected final Map<String, Map<String, PersistedKeyValuePair>> 
mapPersistedKeyValuePairs = new ConcurrentHashMap<>();
+
    protected final ConcurrentLongHashMap<LargeServerMessage> 
largeMessagesToDelete = new ConcurrentLongHashMap<>();
 
    public AbstractJournalStorageManager(final Configuration config,
@@ -819,6 +822,41 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    }
 
    @Override
+   public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) 
throws Exception {
+      deleteKeyValuePair(persistedKeyValuePair.getMapId(), 
persistedKeyValuePair.getKey());
+      try (ArtemisCloseable lock = closeableReadLock()) {
+         final long id = idGenerator.generateID();
+         persistedKeyValuePair.setStoreId(id);
+         bindingsJournal.appendAddRecord(id, 
JournalRecordIds.KEY_VALUE_PAIR_RECORD, persistedKeyValuePair, true);
+         Map<String, PersistedKeyValuePair> persistedKeyValuePairs = 
mapPersistedKeyValuePairs.get(persistedKeyValuePair.getMapId());
+         if (persistedKeyValuePairs == null) {
+            persistedKeyValuePairs = new HashMap<>();
+            mapPersistedKeyValuePairs.put(persistedKeyValuePair.getMapId(), 
persistedKeyValuePairs);
+         }
+         persistedKeyValuePairs.put(persistedKeyValuePair.getKey(), 
persistedKeyValuePair);
+      }
+   }
+
+   @Override
+   public void deleteKeyValuePair(String mapId, String key) throws Exception {
+      Map<String, PersistedKeyValuePair> persistedKeyValuePairs = 
mapPersistedKeyValuePairs.get(mapId);
+      if (persistedKeyValuePairs != null) {
+         PersistedKeyValuePair oldMapStringEntry = 
persistedKeyValuePairs.remove(key);
+         if (oldMapStringEntry != null) {
+            try (ArtemisCloseable lock = closeableReadLock()) {
+               
bindingsJournal.tryAppendDeleteRecord(oldMapStringEntry.getStoreId(), 
this::recordNotFoundCallback, false);
+            }
+         }
+      }
+   }
+
+   @Override
+   public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String 
mapId) {
+      Map<String, PersistedKeyValuePair> persistedKeyValuePairs = 
mapPersistedKeyValuePairs.get(mapId);
+      return persistedKeyValuePairs != null ? new 
HashMap<>(persistedKeyValuePairs) : new HashMap<>();
+   }
+
+   @Override
    public void storeID(final long journalID, final long id) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
          bindingsJournal.appendAddRecord(journalID, 
JournalRecordIds.ID_COUNTER_RECORD, 
BatchingIDGenerator.createIDEncodingSupport(id), true, getContext(true));
@@ -1534,6 +1572,14 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
             } else if (rec == JournalRecordIds.ROLE_RECORD) {
                PersistedRole role = newRoleEncoding(id, buffer);
                mapPersistedRoles.put(role.getUsername(), role);
+            } else if (rec == JournalRecordIds.KEY_VALUE_PAIR_RECORD) {
+               PersistedKeyValuePair keyValuePair = 
newKeyValuePairEncoding(id, buffer);
+               Map<String, PersistedKeyValuePair> persistedKeyValuePairs = 
mapPersistedKeyValuePairs.get(keyValuePair.getMapId());
+               if (persistedKeyValuePairs == null) {
+                  persistedKeyValuePairs = new HashMap<>();
+                  mapPersistedKeyValuePairs.put(keyValuePair.getMapId(), 
persistedKeyValuePairs);
+               }
+               persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair);
             } else {
                // unlikely to happen
                ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new 
Exception("invalid record type " + rec));
@@ -2046,6 +2092,13 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
       return persistedRole;
    }
 
+   static PersistedKeyValuePair newKeyValuePairEncoding(long id, 
ActiveMQBuffer buffer) {
+      PersistedKeyValuePair persistedKeyValuePair = new 
PersistedKeyValuePair();
+      persistedKeyValuePair.decode(buffer);
+      persistedKeyValuePair.setStoreId(id);
+      return persistedKeyValuePair;
+   }
+
    /**
     * @param id
     * @param buffer
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 7bd371f..d11c619 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -99,4 +99,5 @@ public final class JournalRecordIds {
    // Used to record the large message body on the journal when history is on
    public static final byte ADD_MESSAGE_BODY = 49;
 
+   public static final byte KEY_VALUE_PAIR_RECORD = 50;
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 61a95c8..b1e99e9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -49,6 +49,7 @@ import 
org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import 
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -489,6 +490,21 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
+   public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) 
throws Exception {
+
+   }
+
+   @Override
+   public void deleteKeyValuePair(String mapId, String key) throws Exception {
+
+   }
+
+   @Override
+   public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String 
mapId) {
+      return null;
+   }
+
+   @Override
    public void storeSecuritySetting(final PersistedSecuritySetting 
persistedRoles) throws Exception {
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
index ae43e11..55f8c57 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
@@ -17,10 +17,9 @@
 
 package org.apache.activemq.artemis.core.server.balancing;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
 import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
 import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
 import org.apache.activemq.artemis.core.server.balancing.targets.Target;
@@ -32,7 +31,6 @@ import 
org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.jboss.logging.Logger;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 public class BrokerBalancer implements ActiveMQComponent {
@@ -57,7 +55,7 @@ public class BrokerBalancer implements ActiveMQComponent {
 
    private final KeyTransformer transformer;
 
-   private final Cache<String, TargetResult> cache;
+   private final Cache cache;
 
    private volatile boolean started = false;
 
@@ -85,7 +83,7 @@ public class BrokerBalancer implements ActiveMQComponent {
       return policy;
    }
 
-   public Cache<String, TargetResult> getCache() {
+   public Cache getCache() {
       return cache;
    }
 
@@ -100,10 +98,10 @@ public class BrokerBalancer implements ActiveMQComponent {
                          final String targetKeyFilter,
                          final Target localTarget,
                          final String localTargetFilter,
+                         final Cache cache,
                          final Pool pool,
                          final Policy policy,
-                         KeyTransformer transformer,
-                         final int cacheTimeout) {
+                         KeyTransformer transformer) {
       this.name = name;
 
       this.targetKey = targetKey;
@@ -120,17 +118,15 @@ public class BrokerBalancer implements ActiveMQComponent {
 
       this.policy = policy;
 
-      if (cacheTimeout == -1) {
-         this.cache = CacheBuilder.newBuilder().build();
-      } else if (cacheTimeout > 0) {
-         this.cache = 
CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, 
TimeUnit.MILLISECONDS).build();
-      } else {
-         this.cache = null;
-      }
+      this.cache = cache;
    }
 
    @Override
    public void start() throws Exception {
+      if (cache != null) {
+         cache.start();
+      }
+
       if (pool != null) {
          pool.start();
       }
@@ -145,6 +141,10 @@ public class BrokerBalancer implements ActiveMQComponent {
       if (pool != null) {
          pool.stop();
       }
+
+      if (cache != null) {
+         cache.stop();
+      }
    }
 
    public TargetResult getTarget(Connection connection, String clientID, 
String username) {
@@ -176,20 +176,25 @@ public class BrokerBalancer implements ActiveMQComponent {
       TargetResult result = null;
 
       if (cache != null) {
-         result = cache.getIfPresent(key);
-      }
+         String nodeId = cache.get(key);
 
-      if (result != null) {
-         if (pool.isTargetReady(result.getTarget())) {
-            if (logger.isDebugEnabled()) {
-               logger.debug("The cache returns [" + result.getTarget() + "] 
ready for " + targetKey + "[" + key + "]");
-            }
-
-            return result;
+         if (logger.isDebugEnabled()) {
+            logger.debug("The cache returns target [" + nodeId + "] for " + 
targetKey + "[" + key + "]");
          }
 
-         if (logger.isDebugEnabled()) {
-            logger.debug("The cache returns [" + result.getTarget() + "] not 
ready for " + targetKey + "[" + key + "]");
+         if (nodeId != null) {
+            Target target = pool.getReadyTarget(nodeId);
+            if (target != null) {
+               if (logger.isDebugEnabled()) {
+                  logger.debug("The target [" + nodeId + "] is ready for " + 
targetKey + "[" + key + "]");
+               }
+
+               return new TargetResult(target);
+            }
+
+            if (logger.isDebugEnabled()) {
+               logger.debug("The target [" + nodeId + "] is not ready for " + 
targetKey + "[" + key + "]");
+            }
          }
       }
 
@@ -207,7 +212,7 @@ public class BrokerBalancer implements ActiveMQComponent {
             if (logger.isDebugEnabled()) {
                logger.debug("Caching " + targetKey + "[" + key + "] for [" + 
target + "]");
             }
-            cache.put(key, result);
+            cache.put(key, target.getNodeID());
          }
       }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
index f9b8002..72b99d1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
@@ -21,11 +21,14 @@ import 
org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
 import 
org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
 import 
org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
 import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
+import org.apache.activemq.artemis.core.server.balancing.caches.LocalCache;
 import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
 import 
org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactory;
 import 
org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
@@ -54,6 +57,8 @@ import java.util.concurrent.ScheduledExecutorService;
 public final class BrokerBalancerManager implements ActiveMQComponent {
    private static final Logger logger = 
Logger.getLogger(BrokerBalancerManager.class);
 
+   public static final String CACHE_ID_PREFIX = "$.BC.";
+
 
    private final Configuration config;
 
@@ -91,6 +96,13 @@ public final class BrokerBalancerManager implements 
ActiveMQComponent {
 
       Target localTarget = new LocalTarget(null, server);
 
+
+      Cache cache = null;
+      CacheConfiguration cacheConfiguration = config.getCacheConfiguration();
+      if (cacheConfiguration != null) {
+         cache = deployCache(cacheConfiguration, config.getName());
+      }
+
       Pool pool = null;
       final PoolConfiguration poolConfiguration = 
config.getPoolConfiguration();
       if (poolConfiguration != null) {
@@ -110,13 +122,20 @@ public final class BrokerBalancerManager implements 
ActiveMQComponent {
       }
 
       BrokerBalancer balancer = new BrokerBalancer(config.getName(), 
config.getTargetKey(), config.getTargetKeyFilter(),
-                                                   localTarget, 
config.getLocalTargetFilter(), pool, policy, transformer, 
config.getCacheTimeout());
+                                                   localTarget, 
config.getLocalTargetFilter(), cache, pool, policy, transformer);
 
       balancerControllers.put(balancer.getName(), balancer);
 
       server.getManagementService().registerBrokerBalancer(balancer);
    }
 
+   private Cache deployCache(CacheConfiguration configuration, String name) 
throws ClassNotFoundException {
+      Cache cache = new LocalCache(CACHE_ID_PREFIX + name, 
configuration.isPersisted(),
+         configuration.getTimeout(), server.getStorageManager());
+
+      return cache;
+   }
+
    private Pool deployPool(PoolConfiguration config, Target localTarget) 
throws Exception {
       Pool pool;
       TargetFactory targetFactory = new ActiveMQTargetFactory();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java
new file mode 100644
index 0000000..58b1b1d
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+public interface Cache {
+
+   void start();
+
+   void stop();
+
+   String get(String key);
+
+   void put(String key, String nodeId);
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java
new file mode 100644
index 0000000..94113f2
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
+import org.jboss.logging.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class LocalCache implements Cache, RemovalListener<String, String> {
+   private static final Logger logger = Logger.getLogger(LocalCache.class);
+
+   private String id;
+   private boolean persisted;
+   private int timeout;
+   private StorageManager storageManager;
+   private com.google.common.cache.Cache<String, String> cache;
+   private Map<String, PersistedKeyValuePair> persistedCacheEntries;
+
+   private volatile boolean running;
+
+   public String getId() {
+      return id;
+   }
+
+   public boolean isPersisted() {
+      return persisted;
+   }
+
+   public int getTimeout() {
+      return timeout;
+   }
+
+   public LocalCache(String id, boolean persisted, int timeout, StorageManager 
storageManager) {
+      this.id = id;
+      this.persisted = persisted;
+      this.timeout = timeout;
+      this.storageManager = storageManager;
+
+      if (timeout == 0) {
+         cache = CacheBuilder.newBuilder().build();
+      } else {
+         cache = 
CacheBuilder.newBuilder().removalListener(this).expireAfterAccess(timeout, 
TimeUnit.MILLISECONDS).build();
+      }
+   }
+
+
+   @Override
+   public void start() {
+      if (persisted) {
+         persistedCacheEntries = storageManager.getPersistedKeyValuePairs(id);
+
+         for (Map.Entry<String, PersistedKeyValuePair> cacheEntry : 
persistedCacheEntries.entrySet()) {
+            cache.put(cacheEntry.getKey(), cacheEntry.getValue().getValue());
+            logger.info(cacheEntry.toString());
+         }
+      }
+
+      running = true;
+   }
+
+   @Override
+   public void stop() {
+      cache.cleanUp();
+
+      if (persistedCacheEntries != null) {
+         persistedCacheEntries.clear();
+      }
+
+      running = false;
+   }
+
+   @Override
+   public String get(String key) {
+      return cache.getIfPresent(key);
+   }
+
+   @Override
+   public void put(String key, String nodeId) {
+      if (persisted) {
+         PersistedKeyValuePair persistedKeyValuePair = 
persistedCacheEntries.get(key);
+
+         if (persistedKeyValuePair == null || !Objects.equals(nodeId, 
persistedKeyValuePair.getValue())) {
+            persistedKeyValuePair = new PersistedKeyValuePair(id, key, nodeId);
+
+            try {
+               storageManager.storeKeyValuePair(persistedKeyValuePair);
+            } catch (Exception e) {
+               throw new RuntimeException(e);
+            }
+
+            persistedCacheEntries.put(key, persistedKeyValuePair);
+         }
+      }
+
+      cache.put(key, nodeId);
+   }
+
+   @Override
+   public void onRemoval(RemovalNotification<String, String> notification) {
+      if (running && persisted) {
+         PersistedKeyValuePair persistedKeyValuePair = 
persistedCacheEntries.remove(notification.getKey());
+
+         if (persistedKeyValuePair != null) {
+            try {
+               
storageManager.deleteKeyValuePair(persistedKeyValuePair.getMapId(), 
persistedKeyValuePair.getKey());
+            } catch (Exception e) {
+               throw new RuntimeException(e);
+            }
+         }
+      }
+   }
+}
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
index 5ab6145..7860625 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
@@ -176,6 +176,27 @@ public abstract class AbstractPool implements Pool {
    }
 
    @Override
+   public Target getReadyTarget(String nodeId) {
+      int readyTargets;
+      long deadline = quorumTimeout > 0 ? System.nanoTime() + 
quorumTimeoutNanos : 0;
+
+      do {
+         readyTargets = 0;
+         for (TargetMonitor targetMonitor : targetMonitors) {
+            if (targetMonitor.isTargetReady()) {
+               readyTargets++;
+               if (nodeId.equals(targetMonitor.getTarget().getNodeID())) {
+                  return targetMonitor.getTarget();
+               }
+            }
+         }
+      }
+      while(readyTargets < quorumSize && deadline > 0 && (System.nanoTime() - 
deadline) < 0);
+
+      return null;
+   }
+
+   @Override
    public void addTargetProbe(TargetProbe probe) {
       targetProbes.add(probe);
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
index db6b733..43e3214 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
@@ -46,6 +46,8 @@ public interface Pool extends ActiveMQComponent {
 
    Target getTarget(String nodeId);
 
+   Target getReadyTarget(String nodeId);
+
    boolean isTargetReady(Target target);
 
    List<Target> getTargets();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
index 0fc1cad..adbd426 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
@@ -69,6 +69,10 @@ public class ActiveMQTarget extends AbstractTarget 
implements FailureListener {
          false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE,
          BrokerBalancer.CLIENT_ID_PREFIX + 
UUIDGenerator.getInstance().generateStringUUID()).start());
 
+      if (getNodeID() == null) {
+         setNodeID(getAttribute(ResourceNames.BROKER, "NodeID", String.class, 
3000));
+      }
+
       connected = true;
 
       fireConnectedEvent();
@@ -92,10 +96,6 @@ public class ActiveMQTarget extends AbstractTarget 
implements FailureListener {
    @Override
    public boolean checkReadiness() {
       try {
-         if (getNodeID() == null) {
-            setNodeID(getAttribute(ResourceNames.BROKER, "NodeID", 
String.class, 3000));
-         }
-
          return getAttribute(ResourceNames.BROKER, "Active", Boolean.class, 
3000);
       } catch (Exception e) {
          logger.warn("Error on check readiness", e);
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index f143174..8f8a60f 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2123,7 +2123,7 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
-         <xsd:element name="cache-timeout" type="xsd:long" default="-1" 
maxOccurs="1" minOccurs="0">
+         <xsd:element name="cache" type="brokerBalancerCacheType" 
maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
                   the time period for a cache entry to remain active
@@ -2171,6 +2171,26 @@
       </xsd:restriction>
    </xsd:simpleType>
 
+   <xsd:complexType name="brokerBalancerCacheType">
+      <xsd:sequence maxOccurs="unbounded">
+         <xsd:element name="persisted" type="xsd:boolean" default="false" 
maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  true means that the cache entries are persisted
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="timeout" type="xsd:long" default="-1" 
maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  the timeout (in milliseconds) before removing cache entries
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+      </xsd:sequence>
+      <xsd:attributeGroup ref="xml:specialAttrs"/>
+   </xsd:complexType>
+
    <xsd:complexType name="brokerBalancerPolicyType">
       <xsd:sequence>
          <xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 603f082..7a21cfe 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -301,7 +301,9 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
          } else {
             Assert.assertEquals(bc.getTargetKey(), TargetKey.SOURCE_IP);
             Assert.assertEquals("least-connections-balancer", bc.getName());
-            Assert.assertEquals(60000, bc.getCacheTimeout());
+            Assert.assertNotNull(bc.getCacheConfiguration());
+            Assert.assertEquals(true, 
bc.getCacheConfiguration().isPersisted());
+            Assert.assertEquals(60000, 
bc.getCacheConfiguration().getTimeout());
             Assert.assertEquals(bc.getPolicyConfiguration().getName(), 
LeastConnectionsPolicy.NAME);
             Assert.assertEquals(3000, 
bc.getPoolConfiguration().getCheckPeriod());
             Assert.assertEquals(2, bc.getPoolConfiguration().getQuorumSize());
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
index 732ea53..fc6acc9 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
@@ -53,7 +53,7 @@ public class BrokerBalancerTest {
       Pool pool = null;
       Policy policy = null;
       underTest  = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
-                                      localTarget, "^FOO.*", pool, policy, 
null, 0);
+                                      localTarget, "^FOO.*", null, pool, 
policy, null);
       assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
       assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, 
underTest.getTarget("BAR_EE"));
    }
@@ -69,7 +69,7 @@ public class BrokerBalancerTest {
          }
       };
       underTest  = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
-                                      localTarget, "^FOO.*", pool, policy, 
keyTransformer, 0);
+                                      localTarget, "^FOO.*", null, pool, 
policy, keyTransformer);
       assertEquals( localTarget, 
underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
    }
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java
new file mode 100644
index 0000000..5fc3f3b
--- /dev/null
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
+import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LocalCacheTest {
+   private static final String CACHE_NAME = "TEST";
+   private static final int CACHE_TIMEOUT = 500;
+   private static final String CACHE_ENTRY_KEY = "TEST_KEY";
+   private static final String CACHE_ENTRY_VALUE = "TEST_VALUE";
+
+   @Test
+   public void testValidEntry() {
+      LocalCache cache = new LocalCache(CACHE_NAME, false, 0, null);
+
+      cache.start();
+
+      try {
+         cache.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+         Assert.assertEquals(CACHE_ENTRY_VALUE, cache.get(CACHE_ENTRY_KEY));
+      } finally {
+         cache.stop();
+      }
+   }
+
+   @Test
+   public void testExpiration() throws Exception {
+      LocalCache cache = new LocalCache(CACHE_NAME, false, CACHE_TIMEOUT, 
null);
+
+      cache.start();
+
+      try {
+         cache.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+         Assert.assertEquals(CACHE_ENTRY_VALUE, cache.get(CACHE_ENTRY_KEY));
+         Wait.assertTrue(() -> cache.get(CACHE_ENTRY_KEY) == null, 
CACHE_TIMEOUT * 2, CACHE_TIMEOUT);
+      } finally {
+         cache.stop();
+      }
+   }
+
+   @Test
+   public void testPersistedEntry() {
+      StorageManager storageManager = new DummyKeyValuePairStorageManager();
+
+      LocalCache cacheBeforeStop = new LocalCache(CACHE_NAME, true, 0, 
storageManager);
+
+      cacheBeforeStop.start();
+
+      try {
+         cacheBeforeStop.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+         Assert.assertEquals(CACHE_ENTRY_VALUE, 
cacheBeforeStop.get(CACHE_ENTRY_KEY));
+      } finally {
+         cacheBeforeStop.stop();
+      }
+
+      Assert.assertEquals(CACHE_ENTRY_VALUE, 
storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+
+      LocalCache cacheAfterStop = new LocalCache(CACHE_NAME, true, 0, 
storageManager);
+
+      cacheAfterStop.start();
+
+      try {
+         Assert.assertEquals(CACHE_ENTRY_VALUE, 
cacheAfterStop.get(CACHE_ENTRY_KEY));
+      } finally {
+         cacheAfterStop.stop();
+      }
+
+      Assert.assertEquals(CACHE_ENTRY_VALUE, 
storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+   }
+
+   @Test
+   public void testPersistedExpiration() throws Exception {
+      StorageManager storageManager = new DummyKeyValuePairStorageManager();
+
+      LocalCache cacheBeforeStop = new LocalCache(CACHE_NAME, true, 
CACHE_TIMEOUT, storageManager);
+
+      cacheBeforeStop.start();
+
+      try {
+         cacheBeforeStop.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+         Assert.assertEquals(CACHE_ENTRY_VALUE, 
cacheBeforeStop.get(CACHE_ENTRY_KEY));
+      } finally {
+         cacheBeforeStop.stop();
+      }
+
+      Assert.assertEquals(CACHE_ENTRY_VALUE, 
storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+
+      LocalCache cacheAfterStop = new LocalCache(CACHE_NAME, true, 
CACHE_TIMEOUT, storageManager);
+
+      cacheAfterStop.start();
+
+      try {
+         Assert.assertEquals(CACHE_ENTRY_VALUE, 
cacheAfterStop.get(CACHE_ENTRY_KEY));
+         Thread.sleep(CACHE_TIMEOUT * 2);
+         Assert.assertNull(cacheAfterStop.get(CACHE_ENTRY_KEY));
+      } finally {
+         cacheAfterStop.stop();
+      }
+
+      
Assert.assertNull(storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY));
+   }
+
+   static class DummyKeyValuePairStorageManager extends NullStorageManager {
+      private Map<String, Map<String, PersistedKeyValuePair>> 
mapPersistedKeyValuePairs = new ConcurrentHashMap<>();
+
+      @Override
+      public void storeKeyValuePair(PersistedKeyValuePair 
persistedKeyValuePair) throws Exception {
+         Map<String, PersistedKeyValuePair> persistedKeyValuePairs = 
mapPersistedKeyValuePairs.get(persistedKeyValuePair.getMapId());
+         if (persistedKeyValuePairs == null) {
+            persistedKeyValuePairs = new HashMap<>();
+            mapPersistedKeyValuePairs.put(persistedKeyValuePair.getMapId(), 
persistedKeyValuePairs);
+         }
+         persistedKeyValuePairs.put(persistedKeyValuePair.getKey(), 
persistedKeyValuePair);
+      }
+
+      @Override
+      public void deleteKeyValuePair(String mapId, String key) throws 
Exception {
+         Map<String, PersistedKeyValuePair> persistedKeyValuePairs = 
mapPersistedKeyValuePairs.get(mapId);
+         if (persistedKeyValuePairs != null) {
+            persistedKeyValuePairs.remove(key);
+         }
+      }
+
+      @Override
+      public Map<String, PersistedKeyValuePair> 
getPersistedKeyValuePairs(String mapId) {
+         Map<String, PersistedKeyValuePair> persistedKeyValuePairs = 
mapPersistedKeyValuePairs.get(mapId);
+         return persistedKeyValuePairs != null ? new 
HashMap<>(persistedKeyValuePairs) : new HashMap<>();
+      }
+   }
+}
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 77ba7ee..4ceee87 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -48,6 +48,7 @@ import 
org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import 
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -667,6 +668,21 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void storeKeyValuePair(PersistedKeyValuePair 
persistedKeyValuePair) throws Exception {
+
+      }
+
+      @Override
+      public void deleteKeyValuePair(String mapId, String key) throws 
Exception {
+
+      }
+
+      @Override
+      public Map<String, PersistedKeyValuePair> 
getPersistedKeyValuePairs(String mapId) {
+         return null;
+      }
+
+      @Override
       public long storePageCounter(long txID, long queueID, long value, long 
size) throws Exception {
          return 0;
       }
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 6764e25..c331dfa 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -188,7 +188,10 @@
             </pool>
          </broker-balancer>
          <broker-balancer name="least-connections-balancer">
-            <cache-timeout>60000</cache-timeout>
+            <cache>
+               <persisted>true</persisted>
+               <timeout>60000</timeout>
+            </cache>
             <policy name="LEAST_CONNECTIONS"/>
             <pool>
                <check-period>3000</check-period>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index aee5fb2..70cdde0 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -179,7 +179,10 @@
             </pool>
          </broker-balancer>
          <broker-balancer name="least-connections-balancer">
-            <cache-timeout>60000</cache-timeout>
+            <cache>
+               <persisted>true</persisted>
+               <timeout>60000</timeout>
+            </cache>
             <policy name="LEAST_CONNECTIONS"/>
             <pool>
                <check-period>3000</check-period>
diff --git a/docs/user-manual/en/broker-balancers.md 
b/docs/user-manual/en/broker-balancers.md
index c821877..4fbc0a6 100644
--- a/docs/user-manual/en/broker-balancers.md
+++ b/docs/user-manual/en/broker-balancers.md
@@ -103,8 +103,19 @@ A policy is defined by the `policy` element. Let's take a 
look at a policy examp
 The broker balancer provides a cache with a timeout to improve the stickiness 
of the target broker selected,
 returning the same target broker for a target key as long as it is present in 
the cache and is ready.
 So a broker balancer with the cache enabled doesn't strictly follow the 
configured policy.
-By default, the cache is enabled and will never timeout. See below
-for more details about setting the `cache-timeout` parameter.
+By default, the cache is not enabled.
+
+A cache is defined by the `cache` element that includes the following items:
+* the `persisted` element defines whether the cache has to persist entries, 
default is `false`;
+* the `timeout` element defines the timeout before removing entries, measured 
in milliseconds, setting 0 will disable the timeout, default is `0`.
+
+Let's take a look at a cache example from broker.xml:
+```xml
+<cache>
+  <persisted>true</persisted>
+  <timeout>60000</timeout>
+</cache>
+```
 
 ## Key transformers
 A `local-target-key-transformer` allows target key transformation before 
matching against any local-target-filter. One use case is
@@ -119,9 +130,8 @@ A broker balancer is defined by the `broker-balancer` 
element, it includes the f
 * the `target-key-filter` element defines a regular expression to filter the 
resolved keys;
 * the `local-target-filter` element defines a regular expression to match the 
keys that have to return a local target;
 * the `local-target-key-transformer` element defines a key transformer, see 
[key transformers](#key-transformers);
-* the `cache-timeout` element is the time period for a target broker to remain 
in the cache, measured in milliseconds, setting `0` will disable the cache, 
default is `-1`, meaning no expiration;
-* the `pool` element defines the pool to group the target brokers, see 
[pools](#pools).
-* the `policy` element defines the policy used to select the target brokers 
from the pool, see [policies](#policies);
+* the `pool` element defines the pool to group the target brokers, see 
[pools](#pools);
+* the `policy` element defines the policy used to select the target brokers 
from the pool, see [policies](#policies).
 
 Let's take a look at some broker balancer examples from broker.xml:
 ```xml
diff --git a/docs/user-manual/en/images/broker_balancer_workflow.png 
b/docs/user-manual/en/images/broker_balancer_workflow.png
index 97560b6..457c5e1 100644
Binary files a/docs/user-manual/en/images/broker_balancer_workflow.png and 
b/docs/user-manual/en/images/broker_balancer_workflow.png differ
diff --git a/docs/user-manual/en/images/management_api_redirect_sequence.png 
b/docs/user-manual/en/images/management_api_redirect_sequence.png
index 371baa89..4f25326 100644
Binary files a/docs/user-manual/en/images/management_api_redirect_sequence.png 
and b/docs/user-manual/en/images/management_api_redirect_sequence.png differ
diff --git a/docs/user-manual/en/images/native_redirect_sequence.png 
b/docs/user-manual/en/images/native_redirect_sequence.png
index d7466da..e5b45bd 100644
Binary files a/docs/user-manual/en/images/native_redirect_sequence.png and 
b/docs/user-manual/en/images/native_redirect_sequence.png differ
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
index c626325..3a82854 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import 
org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
 import 
org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
 import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@@ -151,6 +152,16 @@ public class BalancingTestBase extends ClusterTestBase {
 
    }
 
+   protected void setupBalancerLocalCache(final int node, boolean persisted, 
int timeout) {
+
+      Configuration configuration = getServer(node).getConfiguration();
+      BrokerBalancerConfiguration brokerBalancerConfiguration = 
configuration.getBalancerConfigurations().stream()
+         .filter(brokerBalancerConfiguration1 -> 
brokerBalancerConfiguration1.getName().equals(BROKER_BALANCER_NAME)).findFirst().get();
+
+      brokerBalancerConfiguration.setCacheConfiguration(
+         new CacheConfiguration().setPersisted(persisted).setTimeout(timeout));
+   }
+
    protected ConnectionFactory createFactory(String protocol, boolean 
sslEnabled, String host, int port, String clientID, String user, String 
password) throws Exception {
       return createFactory(protocol, sslEnabled,  host, port, clientID, user, 
password, -1);
    }
@@ -215,7 +226,7 @@ public class BalancingTestBase extends ClusterTestBase {
                urlBuilder.append(")");
             }
 
-            urlBuilder.append("?failover.startupMaxReconnectAttempts=" + 
retries + "&failover.randomize=true");
+            urlBuilder.append("?failover.startupMaxReconnectAttempts=" + 
retries);
 
             if (clientID != null) {
                urlBuilder.append("&jms.clientID=");
@@ -243,7 +254,7 @@ public class BalancingTestBase extends ClusterTestBase {
                urlBuilder.append(")");
             }
 
-            urlBuilder.append("?startupMaxReconnectAttempts=" + retries + 
"&maxReconnectAttempts=" + retries);
+            urlBuilder.append("?randomize=false&startupMaxReconnectAttempts=" 
+ retries + "&maxReconnectAttempts=" + retries);
 
             if (clientID != null) {
                urlBuilder.append("&jms.clientID=");
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
index 46dd417..3b7b786 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
@@ -140,15 +140,27 @@ public class RedirectTest extends BalancingTestBase {
 
    @Test
    public void testRoundRobinRedirect() throws Exception {
-      testEvenlyRedirect(RoundRobinPolicy.NAME, null);
+      testEvenlyRedirect(RoundRobinPolicy.NAME, null, false);
    }
 
    @Test
    public void testLeastConnectionsRedirect() throws Exception {
-      testEvenlyRedirect(LeastConnectionsPolicy.NAME, 
Collections.singletonMap(LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, 
String.valueOf(30)));
+      testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(
+         LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, 
String.valueOf(30)), false);
    }
 
-   private void testEvenlyRedirect(final String policyName, final Map<String, 
String> properties) throws Exception {
+   @Test
+   public void testRoundRobinRedirectWithFailure() throws Exception {
+      testEvenlyRedirect(RoundRobinPolicy.NAME, null, true);
+   }
+
+   @Test
+   public void testLeastConnectionsRedirectWithFailure() throws Exception {
+      testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(
+         LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, 
String.valueOf(30)), true);
+   }
+
+   private void testEvenlyRedirect(final String policyName, final Map<String, 
String> properties, final boolean withFailure) throws Exception {
       final String queueName = "RedirectTestQueue";
       final int targets = MULTIPLE_TARGETS;
       int[] nodes = new int[targets + 1];
@@ -174,6 +186,10 @@ public class RedirectTest extends BalancingTestBase {
          setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, 
policyName, properties, false, null, targets, 1, 2, 3);
       }
 
+      if (withFailure) {
+         setupBalancerLocalCache(0, true, 0);
+      }
+
       startServers(nodes);
 
       for (int node : nodes) {
@@ -214,6 +230,12 @@ public class RedirectTest extends BalancingTestBase {
          Assert.assertEquals("Messages of node " + targetNode, 1, 
queueControls[targetNode].countMessages());
       }
 
+      if (withFailure) {
+         crashAndWaitForFailure(getServer(0));
+
+         startServers(0);
+      }
+
       for (int i = 0; i < targets; i++) {
          try (Connection connection = 
connectionFactories[i].createConnection()) {
             connection.start();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 01b1c22..13d176e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -62,6 +62,7 @@ import 
org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import 
org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
 import 
org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -757,6 +758,21 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
+      public void storeKeyValuePair(PersistedKeyValuePair 
persistedKeyValuePair) throws Exception {
+         manager.storeKeyValuePair(persistedKeyValuePair);
+      }
+
+      @Override
+      public void deleteKeyValuePair(String mapId, String key) throws 
Exception {
+         manager.deleteKeyValuePair(mapId, key);
+      }
+
+      @Override
+      public Map<String, PersistedKeyValuePair> 
getPersistedKeyValuePairs(String mapId) {
+         return manager.getPersistedKeyValuePairs(mapId);
+      }
+
+      @Override
       public long storePageCounter(long txID, long queueID, long value, long 
size) throws Exception {
          return manager.storePageCounter(txID, queueID, value, size);
       }

Reply via email to