This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new f154ea9628 QPID-8666: [Broker-J] Broker plugin jdbc-provider-bone
replacement (#238)
f154ea9628 is described below
commit f154ea9628b81402f0adcb68c059871aeb1b2738
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Fri Feb 2 09:12:08 2024 +0100
QPID-8666: [Broker-J] Broker plugin jdbc-provider-bone replacement (#238)
* QPID-8666: [Broker-J] Broker plugin jdbc-provider-bone replacement
* QPID-8666: [Broker-J] Used new constants across
BrokerStoreUpgraderAndRecoverer.java
* QPID-8666: [Broker-J] Used new constants across
VirtualHostStoreUpgraderAndRecoverer.java
---------
Co-authored-by: vavrtom <[email protected]>
---
.../org/apache/qpid/server/model/BrokerModel.java | 5 +-
...ractConfigurationStoreUpgraderAndRecoverer.java | 5 +
.../store/BrokerStoreUpgraderAndRecoverer.java | 107 +++++++++++++++------
.../apache/qpid/server/store/UpgraderHelper.java | 50 ++++++++++
.../VirtualHostStoreUpgraderAndRecoverer.java | 73 ++++++++++----
.../store/BrokerStoreUpgraderAndRecovererTest.java | 83 ++++++++++++++--
.../VirtualHostStoreUpgraderAndRecovererTest.java | 77 +++++++++++++++
7 files changed, 342 insertions(+), 58 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
index 68cedf31d7..4e774b5148 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
@@ -84,9 +84,12 @@ public final class BrokerModel extends Model
*
* 9.0
* Introduced PublishProducer as a child of Exchange and
PointToPointProducer as child of Queue
+ *
+ * 9.1
+ * Replaced JDBC connection pool provider from BoneCP to HikariCP
*/
public static final int MODEL_MAJOR_VERSION = 9;
- public static final int MODEL_MINOR_VERSION = 0;
+ public static final int MODEL_MINOR_VERSION = 1;
public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." +
MODEL_MINOR_VERSION;
private static final Model MODEL_INSTANCE = new BrokerModel();
private final Map<Class<? extends ConfiguredObject>, Class<? extends
ConfiguredObject>> _parents = new HashMap<>();
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractConfigurationStoreUpgraderAndRecoverer.java
b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractConfigurationStoreUpgraderAndRecoverer.java
index ea5842523b..924f48d568 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractConfigurationStoreUpgraderAndRecoverer.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractConfigurationStoreUpgraderAndRecoverer.java
@@ -31,6 +31,11 @@ import org.apache.qpid.server.util.Action;
abstract class AbstractConfigurationStoreUpgraderAndRecoverer
{
+ protected static final String BROKER = "Broker";
+ protected static final String VIRTUALHOST = "VirtualHost";
+ protected static final String JDBC_VIRTUALHOST_TYPE = "JDBC";
+ protected static final String CONTEXT = "context";
+
private final Map<String, StoreUpgraderPhase> _upgraders = new HashMap<>();
private final String _initialVersion;
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
b/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
index c78884cbe8..820285238d 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecoverer.java
@@ -70,6 +70,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
register(new Upgrader_7_0_to_7_1());
register(new Upgrader_7_1_to_8_0());
register(new Upgrader_8_0_to_9_0());
+ register(new Upgrader_9_0_to_9_1());
}
private static final class Upgrader_1_0_to_1_1 extends StoreUpgraderPhase
@@ -82,12 +83,12 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if (record.getType().equals("Broker"))
+ if (record.getType().equals(BROKER))
{
record = upgradeRootRecord(record);
createVirtualHostsRecordsFromBrokerRecordForModel_1_x(record,
this);
}
- else if (record.getType().equals("VirtualHost") &&
record.getAttributes().containsKey("storeType"))
+ else if (record.getType().equals(VIRTUALHOST) &&
record.getAttributes().containsKey("storeType"))
{
Map<String, Object> updatedAttributes = new
HashMap<>(record.getAttributes());
updatedAttributes.put("type", "STANDARD");
@@ -114,7 +115,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if (record.getType().equals("Broker"))
+ if (record.getType().equals(BROKER))
{
record = upgradeRootRecord(record);
createVirtualHostsRecordsFromBrokerRecordForModel_1_x(record,
this);
@@ -154,7 +155,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
getUpdateMap().put(record.getId(), record);
}
- else if (record.getType().equals("Broker"))
+ else if (record.getType().equals(BROKER))
{
record = upgradeRootRecord(record);
createVirtualHostsRecordsFromBrokerRecordForModel_1_x(record,
this);
@@ -181,7 +182,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if (record.getType().equals("VirtualHost"))
+ if (record.getType().equals(VIRTUALHOST))
{
Map<String, Object> attributes = record.getAttributes();
if (attributes.containsKey("configPath"))
@@ -200,7 +201,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
getUpdateMap().put(record.getId(), record);
}
- else if (record.getType().equals("Broker"))
+ else if (record.getType().equals(BROKER))
{
record = upgradeRootRecord(record);
createVirtualHostsRecordsFromBrokerRecordForModel_1_x(record,
this);
@@ -238,7 +239,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
record = new ConfiguredObjectRecordImpl(record.getId(),
record.getType(), updatedAttributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
- else if (record.getType().equals("Broker"))
+ else if (record.getType().equals(BROKER))
{
upgradeRootRecord(record);
}
@@ -316,7 +317,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if (record.getType().equals("Broker"))
+ if (record.getType().equals(BROKER))
{
record = upgradeRootRecord(record);
@@ -370,7 +371,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
final ConfiguredObjectRecord logger = new
ConfiguredObjectRecordImpl(UUID.randomUUID(),
"BrokerLogger",
attributes,
- Collections.singletonMap("Broker",
+ Collections.singletonMap(BROKER,
record.getId()));
addNameValueFilter("Root", logger, LogLevel.WARN, "ROOT");
addNameValueFilter("Qpid", logger, LogLevel.INFO,
"org.apache.qpid.*");
@@ -454,7 +455,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if (record.getType().equals("Broker"))
+ if (record.getType().equals(BROKER))
{
record = upgradeRootRecord(record);
_rootRecordId = record.getId();
@@ -543,7 +544,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
attrs.put(ConfiguredObject.TYPE, "AllowAll");
attrs.put("priority", 9999);
ConfiguredObjectRecord allowAllAclRecord =
- new ConfiguredObjectRecordImpl(allowAllACLId,
"AccessControlProvider", attrs, Collections.singletonMap("Broker",
_rootRecordId));
+ new ConfiguredObjectRecordImpl(allowAllACLId,
"AccessControlProvider", attrs, Collections.singletonMap(BROKER,
_rootRecordId));
getUpdateMap().put(allowAllAclRecord.getId(),
allowAllAclRecord);
}
@@ -569,7 +570,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if (record.getType().equals("Broker"))
+ if (record.getType().equals(BROKER))
{
boolean rebuildRecord = false;
Map<String, Object> attributes = new
HashMap<>(record.getAttributes());
@@ -601,12 +602,12 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
if (!additionalContext.isEmpty())
{
Map<String, String> newContext = new HashMap<>();
- if (attributes.containsKey("context"))
+ if (attributes.containsKey(CONTEXT))
{
- newContext.putAll((Map<String, String>)
attributes.get("context"));
+ newContext.putAll((Map<String, String>)
attributes.get(CONTEXT));
}
newContext.putAll(additionalContext);
- attributes.put("context", newContext);
+ attributes.put(CONTEXT, newContext);
rebuildRecord = true;
}
@@ -653,9 +654,9 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
private void upgradeHttpPortIfRequired(final ConfiguredObjectRecord
record)
{
Map<String, Object> attributes = record.getAttributes();
- if (attributes.containsKey("context"))
+ if (attributes.containsKey(CONTEXT))
{
- Map<String, String> context = (Map<String, String>)
attributes.get("context");
+ Map<String, String> context = (Map<String, String>)
attributes.get(CONTEXT);
if (context != null
&&
(context.containsKey("port.http.additionalInternalThreads")
||
context.containsKey("port.http.maximumQueuedRequests")))
@@ -668,7 +669,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
updatedContext.put("qpid.port.http.acceptBacklog",
acceptorsBacklog);
}
Map<String, Object> updatedAttributes = new
LinkedHashMap<>(attributes);
- updatedAttributes.put("context", updatedContext);
+ updatedAttributes.put(CONTEXT, updatedContext);
ConfiguredObjectRecord upgradedRecord = new
ConfiguredObjectRecordImpl(record.getId(),
record.getType(),
@@ -697,7 +698,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
- if("Broker".equals(record.getType()))
+ if(BROKER.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -721,7 +722,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
- if("Broker".equals(record.getType()))
+ if(BROKER.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -745,12 +746,12 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if ("Broker".equals(record.getType()))
+ if (BROKER.equals(record.getType()))
{
record = upgradeRootRecord(record);
}
renameContextVariables(record,
- "context",
+ CONTEXT,
UpgraderHelper.MODEL9_MAPPING_FOR_RENAME_TO_ALLOW_DENY_CONTEXT_VARIABLES);
}
@@ -761,6 +762,47 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
}
}
+ private static class Upgrader_9_0_to_9_1 extends StoreUpgraderPhase
+ {
+ public Upgrader_9_0_to_9_1()
+ {
+ super("modelVersion", "9.0", "9.1");
+ }
+
+ @Override
+ public void configuredObject(final ConfiguredObjectRecord record)
+ {
+ if (BROKER.equals(record.getType()))
+ {
+ upgradeRootRecord(record);
+ }
+
+ final Map<String, Object> attributes = record.getAttributes();
+
+ if (attributes == null)
+ {
+ return;
+ }
+
+ if (!(VIRTUALHOST.equals(record.getType()) &&
JDBC_VIRTUALHOST_TYPE.equals(attributes.get("type"))))
+ {
+ return;
+ }
+
+ if (attributes.containsKey(CONTEXT))
+ {
+ final ConfiguredObjectRecord updatedRecord =
UpgraderHelper.upgradeConnectionPool(record);
+ getUpdateMap().put(updatedRecord.getId(), updatedRecord);
+ }
+ }
+
+ @Override
+ public void complete()
+ {
+
+ }
+ }
+
private static class VirtualHostEntryUpgrader
{
Map<String, AttributesTransformer> _messageStoreToNodeTransformers =
Map.of("DERBY", new AttributesTransformer().
@@ -784,8 +826,8 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
addAttributeTransformer("storePath", copyAttribute()).
addAttributeTransformer("storeUnderfullSize",
copyAttribute()).
addAttributeTransformer("storeOverfullSize",
copyAttribute()).
- addAttributeTransformer("bdbEnvironmentConfig",
mutateAttributeName("context")),
- "JDBC", new AttributesTransformer().
+ addAttributeTransformer("bdbEnvironmentConfig",
mutateAttributeName(CONTEXT)),
+ JDBC_VIRTUALHOST_TYPE, new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("name", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
@@ -805,10 +847,11 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
addAttributeTransformer("jdbcBytesForBlob",
addContextVar("qpid.jdbcstore.useBytesForBlob")).
addAttributeTransformer("jdbcBlobType",
addContextVar("qpid.jdbcstore.blobType")).
addAttributeTransformer("jdbcVarbinaryType",
addContextVar("qpid.jdbcstore.varBinaryType")).
- addAttributeTransformer("maximumPoolSize",
-
addContextVar("qpid.jdbcstore.hikaricp.maximumPoolSize")).
- addAttributeTransformer("minimumIdle",
-
addContextVar("qpid.jdbcstore.hikaricp.minimumIdle")),
+ addAttributeTransformer("partitionCount",
addContextVar("qpid.jdbcstore.bonecp.partitionCount")).
+ addAttributeTransformer("maxConnectionsPerPartition",
+
addContextVar("qpid.jdbcstore.bonecp.maxConnectionsPerPartition")).
+ addAttributeTransformer("minConnectionsPerPartition",
+
addContextVar("qpid.jdbcstore.bonecp.minConnectionsPerPartition")),
"BDB_HA", new AttributesTransformer().
addAttributeTransformer("id", copyAttribute()).
addAttributeTransformer("createdTime", copyAttribute()).
@@ -821,8 +864,8 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
addAttributeTransformer("haHelperAddress",
mutateAttributeName("helperAddress")).
addAttributeTransformer("haNodeAddress",
mutateAttributeName("address")).
addAttributeTransformer("haDesignatedPrimary",
mutateAttributeName("designatedPrimary")).
- addAttributeTransformer("haReplicationConfig",
mutateAttributeName("context")).
- addAttributeTransformer("bdbEnvironmentConfig",
mutateAttributeName("context")));
+ addAttributeTransformer("haReplicationConfig",
mutateAttributeName(CONTEXT)).
+ addAttributeTransformer("bdbEnvironmentConfig",
mutateAttributeName(CONTEXT)));
public ConfiguredObjectRecord upgrade(ConfiguredObjectRecord vhost)
{
@@ -972,7 +1015,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
@Override
public MutableEntry transform(MutableEntry entry)
{
- return new MutableEntry("context",
Collections.singletonMap(_newName, entry.getValue()));
+ return new MutableEntry(CONTEXT,
Collections.singletonMap(_newName, entry.getValue()));
}
}
@@ -1061,7 +1104,7 @@ public class BrokerStoreUpgraderAndRecoverer extends
AbstractConfigurationStoreU
}
}
- ConfiguredObjectRecord nodeRecord = new
ConfiguredObjectRecordImpl(id, "VirtualHost", virtualHostAttributes,
Collections.singletonMap("Broker", brokerRecord.getId()));
+ ConfiguredObjectRecord nodeRecord = new
ConfiguredObjectRecordImpl(id, VIRTUALHOST, virtualHostAttributes,
Collections.singletonMap(BROKER, brokerRecord.getId()));
upgrader.getUpdateMap().put(nodeRecord.getId(),
nodeRecord);
upgrader.configuredObject(nodeRecord);
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderHelper.java
b/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderHelper.java
index 4428d67bbf..3569138697 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderHelper.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/UpgraderHelper.java
@@ -26,6 +26,22 @@ import java.util.stream.Collectors;
public class UpgraderHelper
{
+ static final String CONTEXT = "context";
+
+ static final String CP_TYPE = "connectionPoolType";
+ static final String BONECP = "BONECP";
+ static final String HIKARICP = "HIKARICP";
+
+ static final String PARTITION_COUNT_PARAM =
"qpid.jdbcstore.bonecp.partitionCount";
+ static final String MAX_POOL_SIZE_OLD_PARAM =
"qpid.jdbcstore.bonecp.maxConnectionsPerPartition";
+ static final String MIN_IDLE_OLD_PARAM =
"qpid.jdbcstore.bonecp.minConnectionsPerPartition";
+
+ static final String MAX_POOL_SIZE_PARAM =
"qpid.jdbcstore.hikaricp.maximumPoolSize";
+ static final String MIN_IDLE_PARAM = "qpid.jdbcstore.hikaricp.minimumIdle";
+
+ static final Map<String, String> RENAME_MAPPING =
Map.of(MAX_POOL_SIZE_OLD_PARAM, MAX_POOL_SIZE_PARAM,
+ MIN_IDLE_OLD_PARAM, MIN_IDLE_PARAM);
+
public static final Map<String, String>
MODEL9_MAPPING_FOR_RENAME_TO_ALLOW_DENY_CONTEXT_VARIABLES = new HashMap<>();
static
{
@@ -57,4 +73,38 @@ public class UpgraderHelper
{
return
map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue,
Map.Entry::getKey));
}
+
+ /** Upgrades connection pool from BoneCP to HikariCP (model version 9.0 to
9.1) */
+ public static ConfiguredObjectRecord upgradeConnectionPool(final
ConfiguredObjectRecord record)
+ {
+ final Map<String, Object> attributes = record.getAttributes();
+
+ final Object contextObject = attributes.get(CONTEXT);
+
+ if (contextObject instanceof Map)
+ {
+ final Map <String, String> context = (Map<String, String>)
contextObject;
+ final Map<String, String> newContext =
UpgraderHelper.renameContextVariables(context, RENAME_MAPPING);
+
+ if (BONECP.equals(attributes.get(CP_TYPE)))
+ {
+ final int partitionCount =
newContext.get(PARTITION_COUNT_PARAM) != null
+ ?
Integer.parseInt(newContext.remove(PARTITION_COUNT_PARAM)) : 0;
+ final int maximumPoolSize =
newContext.get(MAX_POOL_SIZE_PARAM) != null && partitionCount != 0
+ ?
Integer.parseInt(newContext.get(MAX_POOL_SIZE_PARAM)) * partitionCount : 40;
+ final int minIdle = newContext.get(MIN_IDLE_PARAM) != null &&
partitionCount != 0
+ ? Integer.parseInt(newContext.get(MIN_IDLE_PARAM)) *
partitionCount : 20;
+ newContext.put(MAX_POOL_SIZE_PARAM,
String.valueOf(maximumPoolSize));
+ newContext.put(MIN_IDLE_PARAM, String.valueOf(minIdle));
+ }
+ final Map<String, Object> updatedAttributes = new
HashMap<>(record.getAttributes());
+ if (BONECP.equals(attributes.get(CP_TYPE)))
+ {
+ updatedAttributes.put(CP_TYPE, HIKARICP);
+ }
+ updatedAttributes.put(CONTEXT, newContext);
+ return new ConfiguredObjectRecordImpl(record.getId(),
record.getType(), updatedAttributes, record.getParents());
+ }
+ return record;
+ }
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index 2a3bc71393..8ac13de12b 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -74,6 +74,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
register(new Upgrader_7_0_to_7_1());
register(new Upgrader_7_1_to_8_0());
register(new Upgrader_8_0_to_9_0());
+ register(new Upgrader_9_0_to_9_1());
Map<String, UUID> defaultExchangeIds = new HashMap<>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
@@ -192,7 +193,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
ConfiguredObjectRecord newRecord = new
ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(),
record.getParents());
getUpdateMap().put(record.getId(), newRecord);
- if ("VirtualHost".equals(type))
+ if (VIRTUALHOST.equals(type))
{
upgradeRootRecord(newRecord);
}
@@ -256,7 +257,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if ("VirtualHost".equals(record.getType()))
+ if (VIRTUALHOST.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -362,7 +363,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if(VIRTUALHOST.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -419,13 +420,13 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if(VIRTUALHOST.equals(record.getType()))
{
record = upgradeRootRecord(record);
Map<String, Object> virtualHostAttributes = new
HashMap<>(record.getAttributes());
virtualHostAttributes.put("name", _virtualHostNode.getName());
virtualHostAttributes.put("modelVersion", getToVersion());
- record = new ConfiguredObjectRecordImpl(record.getId(),
"VirtualHost", virtualHostAttributes, Map.of());
+ record = new ConfiguredObjectRecordImpl(record.getId(),
VIRTUALHOST, virtualHostAttributes, Map.of());
_virtualHostRecord = record;
}
else if("Exchange".equals(record.getType()))
@@ -543,7 +544,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
public void configuredObject(ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if(VIRTUALHOST.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -566,7 +567,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
public void configuredObject(ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if(VIRTUALHOST.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -590,7 +591,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
public void configuredObject(ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if(VIRTUALHOST.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -623,12 +624,12 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if(VIRTUALHOST.equals(record.getType()))
{
record = upgradeRootRecord(record);
Map<String, Object> attributes = new
HashMap<>(record.getAttributes());
boolean modified =
attributes.remove("queue_deadLetterQueueEnabled") != null;
- Object context = attributes.get("context");
+ Object context = attributes.get(CONTEXT);
Map<String,Object> contextMap = null;
if(context instanceof Map)
{
@@ -636,7 +637,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
modified |=
contextMap.remove("queue.deadLetterQueueEnabled") != null;
if (modified)
{
- attributes.put("context", contextMap);
+ attributes.put(CONTEXT, contextMap);
}
}
@@ -650,7 +651,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
}
contextMap.put("qpid.virtualhost.statisticsReportPattern",
"${ancestor:virtualhost:name}: messagesIn=${messagesIn},
bytesIn=${bytesIn:byteunit}, messagesOut=${messagesOut},
bytesOut=${bytesOut:byteunit}");
- attributes.put("context", contextMap);
+ attributes.put(CONTEXT, contextMap);
modified = true;
}
@@ -725,7 +726,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
/ ((double)
queueFlowControlSizeBytesValue);
String flowResumeLimit = String.format("%.2f",
ratio * 100.0);
- Object context = attributes.get("context");
+ Object context = attributes.get(CONTEXT);
Map<String, String> contextMap;
if (context instanceof Map)
{
@@ -734,7 +735,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
else
{
contextMap = new HashMap<>();
- attributes.put("context", contextMap);
+ attributes.put(CONTEXT, contextMap);
}
contextMap.put("queue.queueFlowResumeLimit",
flowResumeLimit);
}
@@ -1046,7 +1047,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if(VIRTUALHOST.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -1070,7 +1071,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends
AbstractConfigurationS
@Override
public void configuredObject(final ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if(VIRTUALHOST.equals(record.getType()))
{
upgradeRootRecord(record);
}
@@ -1093,12 +1094,12 @@ public class VirtualHostStoreUpgraderAndRecoverer
extends AbstractConfigurationS
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if ("VirtualHost".equals(record.getType()))
+ if (VIRTUALHOST.equals(record.getType()))
{
record = upgradeRootRecord(record);
}
renameContextVariables(record,
- "context",
+ CONTEXT,
UpgraderHelper.MODEL9_MAPPING_FOR_RENAME_TO_ALLOW_DENY_CONTEXT_VARIABLES);
}
@@ -1109,6 +1110,42 @@ public class VirtualHostStoreUpgraderAndRecoverer
extends AbstractConfigurationS
}
}
+ private static class Upgrader_9_0_to_9_1 extends StoreUpgraderPhase
+ {
+ public Upgrader_9_0_to_9_1()
+ {
+ super("modelVersion", "9.0", "9.1");
+ }
+
+ @Override
+ public void configuredObject(final ConfiguredObjectRecord record)
+ {
+ final Map<String, Object> attributes = record.getAttributes();
+
+ if (attributes == null)
+ {
+ return;
+ }
+
+ if (!(VIRTUALHOST.equals(record.getType()) &&
JDBC_VIRTUALHOST_TYPE.equals(attributes.get("type"))))
+ {
+ return;
+ }
+
+ if (attributes.containsKey(CONTEXT))
+ {
+ final ConfiguredObjectRecord updatedRecord =
UpgraderHelper.upgradeConnectionPool(record);
+ getUpdateMap().put(updatedRecord.getId(), updatedRecord);
+ }
+ }
+
+ @Override
+ public void complete()
+ {
+
+ }
+ }
+
public boolean upgradeAndRecover(final DurableConfigurationStore
durableConfigurationStore,
final ConfiguredObjectRecord...
initialRecords)
{
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
b/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
index cc34fd03c4..b344e14c2a 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/store/BrokerStoreUpgraderAndRecovererTest.java
@@ -41,6 +41,8 @@ import org.apache.qpid.server.configuration.CommonProperties;
import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -90,17 +92,18 @@ public class BrokerStoreUpgraderAndRecovererTest extends
UnitTestBase
}
@Test
- public void testUpgradeVirtualHostWithJDBCStoreAndHikariCPPool()
+ public void testUpgradeVirtualHostWithJDBCStoreAndBoneCPPool()
{
final Map<String, Object> hostAttributes = ImmutableMap.<String,
Object>builder()
.put("name", VIRTUALHOST_NAME)
.put("modelVersion", "0.4")
- .put("connectionPool", "HIKARICP")
+ .put("connectionPool", "BONECP")
.put("connectionURL",
"jdbc:derby://localhost:1527/tmp/vh/test;create=true")
.put("createdBy", VIRTUALHOST_CREATED_BY)
.put("createdTime", VIRTUALHOST_CREATE_TIME)
- .put("maximumPoolSize", 7)
- .put("minimumIdle", 6)
+ .put("maxConnectionsPerPartition", 7)
+ .put("minConnectionsPerPartition", 6)
+ .put("partitionCount", 2)
.put("storeType", "jdbc")
.put("type", "STANDARD")
.put("jdbcBigIntType", "mybigint")
@@ -121,9 +124,10 @@ public class BrokerStoreUpgraderAndRecovererTest extends
UnitTestBase
"qpid.jdbcstore.varBinaryType", "myvarbinary",
"qpid.jdbcstore.blobType", "myblob",
"qpid.jdbcstore.useBytesForBlob", true,
- "qpid.jdbcstore.hikaricp.maximumPoolSize", 7,
- "qpid.jdbcstore.hikaricp.minimumIdle", 6);
- final Map<String,Object> expectedAttributes =
Map.of("connectionPoolType", "HIKARICP",
+ "qpid.jdbcstore.bonecp.maxConnectionsPerPartition", 7,
+ "qpid.jdbcstore.bonecp.minConnectionsPerPartition", 6,
+ "qpid.jdbcstore.bonecp.partitionCount", 2);
+ final Map<String,Object> expectedAttributes =
Map.of("connectionPoolType", "BONECP",
"connectionUrl",
"jdbc:derby://localhost:1527/tmp/vh/test;create=true",
"createdBy", VIRTUALHOST_CREATED_BY,
"createdTime", VIRTUALHOST_CREATE_TIME,
@@ -877,6 +881,71 @@ public class BrokerStoreUpgraderAndRecovererTest extends
UnitTestBase
assertEquals("Ssl.*",
contextMap.get(CommonProperties.QPID_SECURITY_TLS_CIPHER_SUITE_DENY_LIST));
}
+ @ParameterizedTest
+ @CsvSource(value =
+ {
+ "4,20,5,80,20", "0,20,5,40,20", "null,20,5,40,20",
"4,null,null,40,20", "null,null,null,40,20"
+ }, nullValues = { "null" })
+ public void testContextVariableUpgradeFromBoneCPToHikariCPProvider(final
String partitionCount,
+ final
String maxConnectionsPerPartition,
+ final
String minConnectionsPerPartition,
+ final
String maximumPoolSize,
+ final
String minimumIdle)
+ {
+ _brokerRecord.getAttributes().put("modelVersion", "9.0");
+
+ final Map<String, String> context = new HashMap<>();
+ context.put("qpid.jdbcstore.bonecp.partitionCount", partitionCount);
+ context.put("qpid.jdbcstore.bonecp.maxConnectionsPerPartition",
maxConnectionsPerPartition);
+ context.put("qpid.jdbcstore.bonecp.minConnectionsPerPartition",
minConnectionsPerPartition);
+ final Map<String, Object> attributes = Map.of("name", getTestName(),
+ "type", "JDBC",
+ "connectionPoolType", "BONECP",
+ "context", context);
+ final ConfiguredObjectRecord virtualHostRecord =
mock(ConfiguredObjectRecord.class);;
+ when(virtualHostRecord.getId()).thenReturn(randomUUID());
+ when(virtualHostRecord.getType()).thenReturn("VirtualHost");
+ when(virtualHostRecord.getAttributes()).thenReturn(attributes);
+
+ final DurableConfigurationStore dcs = new
DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord);
+ final BrokerStoreUpgraderAndRecoverer recoverer = new
BrokerStoreUpgraderAndRecoverer(_systemConfig);
+ final List<ConfiguredObjectRecord> records = upgrade(dcs, recoverer);
+ final Map<String, String> contextMap =
findCategoryRecordAndGetContext("VirtualHost", records);
+
+ final ConfiguredObjectRecord upgradedVirtualHost = records.stream()
+ .filter(record ->
record.getType().equals("VirtualHost")).findFirst()
+ .orElse(null);
+
+ assertNotNull(upgradedVirtualHost);
+ assertEquals(maximumPoolSize,
contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
+ assertEquals(minimumIdle,
contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle"));
+ assertEquals("HIKARICP",
upgradedVirtualHost.getAttributes().get("connectionPoolType"));
+ }
+
+ @Test
+ public void testContextVariableUpgradeFromDefaultCPToHikariCPProvider()
+ {
+ _brokerRecord.getAttributes().put("modelVersion", "9.0");
+
+ final Map<String, Object> attributes = Map.of("name", getTestName(),
+ "type", "JDBC",
+ "connectionPoolType", "NONE",
+ "context", new HashMap<>());
+ final ConfiguredObjectRecord virtualHostRecord =
mock(ConfiguredObjectRecord.class);;
+ when(virtualHostRecord.getId()).thenReturn(randomUUID());
+ when(virtualHostRecord.getType()).thenReturn("VirtualHost");
+ when(virtualHostRecord.getAttributes()).thenReturn(attributes);
+
+ final DurableConfigurationStore dcs = new
DurableConfigurationStoreStub(virtualHostRecord, _brokerRecord);
+ final BrokerStoreUpgraderAndRecoverer recoverer = new
BrokerStoreUpgraderAndRecoverer(_systemConfig);
+ final List<ConfiguredObjectRecord> records = upgrade(dcs, recoverer);
+ final Map<String, String> contextMap =
findCategoryRecordAndGetContext("VirtualHost", records);
+
+ assertNull(contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
+ assertNull(contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle"));
+ assertEquals("NONE",
virtualHostRecord.getAttributes().get("connectionPoolType"));
+ }
+
private ConfiguredObjectRecord
createMockRecordForGivenCategoryTypeAndContext(final String category,
final String type,
final Map<String, String> context)
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
b/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
index 78466e11b1..39970112b1 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -34,6 +35,8 @@ import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.apache.qpid.server.configuration.CommonProperties;
import org.apache.qpid.server.model.Broker;
@@ -302,6 +305,80 @@ public class VirtualHostStoreUpgraderAndRecovererTest
extends UnitTestBase
assertEquals("Ssl.*",
newContext.get(CommonProperties.QPID_SECURITY_TLS_CIPHER_SUITE_DENY_LIST));
}
+ @ParameterizedTest
+ @CsvSource(value =
+ {
+ "4,20,5,80,20", "0,20,5,40,20", "null,20,5,40,20",
"4,null,null,40,20", "null,null,null,40,20"
+ }, nullValues = { "null" })
+ public void testContextVariableUpgradeFromBoneCPToHikariCPProvider(final
String partitionCount,
+ final
String maxConnectionsPerPartition,
+ final
String minConnectionsPerPartition,
+ final
String maximumPoolSize,
+ final
String minimumIdle)
+ {
+ final Map<String, Object> rootAttributes = Map.of("modelVersion",
"9.0", "name", "root");
+ final ConfiguredObjectRecord rootRecord = new
ConfiguredObjectRecordImpl(randomUUID(), "VirtualHost", rootAttributes);
+
+ final Map<String, String> context = new HashMap<>();
+ context.put("qpid.jdbcstore.bonecp.partitionCount", partitionCount);
+ context.put("qpid.jdbcstore.bonecp.maxConnectionsPerPartition",
maxConnectionsPerPartition);
+ context.put("qpid.jdbcstore.bonecp.minConnectionsPerPartition",
minConnectionsPerPartition);
+ final Map<String, Object> attributes = Map.of("name", getTestName(),
+ "modelVersion", "9.0",
+ "type", "JDBC",
+ "connectionPoolType", "BONECP",
+ "context", context);
+ final ConfiguredObjectRecord virtualHostRecord =
mock(ConfiguredObjectRecord.class);;
+ when(virtualHostRecord.getId()).thenReturn(randomUUID());
+ when(virtualHostRecord.getType()).thenReturn("VirtualHost");
+ when(virtualHostRecord.getAttributes()).thenReturn(attributes);
+
+ final List<ConfiguredObjectRecord> records = List.of(rootRecord,
virtualHostRecord);
+ final List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost",
"modelVersion");
+
+ final ConfiguredObjectRecord upgradedVirtualHost =
upgradedRecords.stream()
+ .filter(record ->
record.getId().equals(virtualHostRecord.getId())).findFirst()
+ .orElse(null);
+ final Map<String, String> contextMap = (Map<String, String>)
upgradedVirtualHost.getAttributes().get("context");
+
+ assertNotNull(upgradedVirtualHost);
+ assertEquals(maximumPoolSize,
contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
+ assertEquals(minimumIdle,
contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle"));
+ assertEquals("HIKARICP",
upgradedVirtualHost.getAttributes().get("connectionPoolType"));
+ }
+
+ @Test
+ public void testContextVariableUpgradeFromDefaultCPToHikariCPProvider()
+ {
+ final Map<String, Object> rootAttributes = Map.of("modelVersion",
"9.0", "name", "root");
+ final ConfiguredObjectRecord rootRecord = new
ConfiguredObjectRecordImpl(randomUUID(), "VirtualHost", rootAttributes);
+
+ final Map<String, Object> attributes = Map.of("name", getTestName(),
+ "modelVersion", "9.0",
+ "type", "JDBC",
+ "connectionPoolType", "NONE",
+ "context", new HashMap<>());
+ final ConfiguredObjectRecord virtualHostRecord =
mock(ConfiguredObjectRecord.class);;
+ when(virtualHostRecord.getId()).thenReturn(randomUUID());
+ when(virtualHostRecord.getType()).thenReturn("VirtualHost");
+ when(virtualHostRecord.getAttributes()).thenReturn(attributes);
+
+ final List<ConfiguredObjectRecord> records = List.of(rootRecord,
virtualHostRecord);
+ final List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost",
"modelVersion");
+
+ final ConfiguredObjectRecord upgradedVirtualHost =
upgradedRecords.stream()
+ .filter(record ->
record.getId().equals(virtualHostRecord.getId())).findFirst()
+ .orElse(null);
+ final Map<String, String> contextMap = (Map<String, String>)
upgradedVirtualHost.getAttributes().get("context");
+
+ assertNotNull(upgradedVirtualHost);
+ assertNull(contextMap.get("qpid.jdbcstore.hikaricp.maximumPoolSize"));
+ assertNull(contextMap.get("qpid.jdbcstore.hikaricp.minimumIdle"));
+ assertEquals("NONE",
virtualHostRecord.getAttributes().get("connectionPoolType"));
+ }
+
private ConfiguredObjectRecord findRecordById(final UUID id, final
List<ConfiguredObjectRecord> records)
{
return records.stream().filter(record ->
record.getId().equals(id)).findFirst().orElse(null);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]