QPID-8158: [Broker-J] [System Tests] Refactor MultiNodeTest and TwoNodeTest to use QpidTestRunner and BrokerAdmin for running the tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/88a12e8c Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/88a12e8c Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/88a12e8c Branch: refs/heads/master Commit: 88a12e8c5c9b7b912745e0382eb2661264003f19 Parents: 381202d Author: Alex Rudyy <oru...@apache.org> Authored: Tue Apr 10 01:51:37 2018 +0100 Committer: Alex Rudyy <oru...@apache.org> Committed: Tue Apr 10 09:50:52 2018 +0100 ---------------------------------------------------------------------- bdbstore/systests/pom.xml | 34 + .../replication/GroupBrokerAdmin.java | 654 +++++++++++ .../berkeleydb/replication/GroupConfig.java | 33 + .../replication/GroupJmsTestBase.java | 110 ++ .../berkeleydb/replication/MultiNodeTest.java | 1023 ++++++++++-------- .../berkeleydb/replication/TwoNodeTest.java | 306 +++--- pom.xml | 7 + .../apache/qpid/tests/http/HttpTestBase.java | 24 +- .../qpid/systests/AmqpManagementFacade.java | 10 +- .../org/apache/qpid/systests/JmsTestBase.java | 20 +- .../java/org/apache/qpid/systests/Utils.java | 55 + systests/qpid-systests-spawn-admin/pom.xml | 123 +++ .../systests/admin/BrokerAdminException.java | 34 + .../qpid/systests/admin/SpawnBrokerAdmin.java | 1023 ++++++++++++++++++ .../src/main/resources/spawn-broker.json | 78 ++ .../systests/admin/SpawnBrokerAdminTest.java | 314 ++++++ .../qpid/test/utils/QpidBrokerTestCase.java | 13 +- .../utils/EmbeddedBrokerPerClassAdminImpl.java | 29 - .../utils/LoggingBrokerAdminDecorator.java | 188 ++++ .../apache/qpid/tests/utils/QpidTestRunner.java | 8 +- .../apache/qpid/tests/utils/RunBrokerAdmin.java | 32 + 21 files changed, 3419 insertions(+), 699 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/pom.xml ---------------------------------------------------------------------- diff --git a/bdbstore/systests/pom.xml b/bdbstore/systests/pom.xml index 3620718..eefb5bc 100644 --- a/bdbstore/systests/pom.xml +++ b/bdbstore/systests/pom.xml @@ -53,6 +53,13 @@ <classifier>tests</classifier> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-systests-spawn-admin</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -66,6 +73,33 @@ <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>build-classpath</id> + <phase>generate-sources</phase> + <goals> + <goal>build-classpath</goal> + </goals> + <configuration> + <outputFile>${project.build.directory}/qpid.build.classpath.txt</outputFile> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <qpid.initialConfigurationLocation>classpath:spawn-broker.json</qpid.initialConfigurationLocation> + <qpid.systests.build.classpath.file>${project.build.directory}/qpid.build.classpath.txt</qpid.systests.build.classpath.file> + <qpid.amqp.version>${profile.broker.version}</qpid.amqp.version> + </systemPropertyVariables> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <!--version specified in parent pluginManagement --> <executions> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupBrokerAdmin.java ---------------------------------------------------------------------- diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupBrokerAdmin.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupBrokerAdmin.java new file mode 100644 index 0000000..c11666d --- /dev/null +++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupBrokerAdmin.java @@ -0,0 +1,654 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.apache.qpid.systests.admin.SpawnBrokerAdmin.SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME; + +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.sleepycat.je.rep.ReplicationConfig; + +import org.apache.qpid.server.plugin.PluggableService; +import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl; +import org.apache.qpid.systests.admin.BrokerAdminException; +import org.apache.qpid.systests.admin.SpawnBrokerAdmin; +import org.apache.qpid.test.utils.PortHelper; +import org.apache.qpid.tests.utils.BrokerAdmin; + +@SuppressWarnings("unused") +@PluggableService +public class GroupBrokerAdmin implements BrokerAdmin +{ + private static final int WAIT_LIMIT = Integer.getInteger("qpid.test.ha.await", 10000); + private static final String AMQP_NODE_TYPE = "org.apache.qpid.VirtualHostNode"; + private static final String AMQP_REMOTE_NODE_TYPE = "org.apache.qpid.server.model.RemoteReplicationNode"; + private static final String ROLE_UNKNOWN = "UNKNOWN"; + private static final String ROLE_MASTER = "MASTER"; + private static final String ROLE_REPLICA = "REPLICA"; + private static final String NODE_TYPE = "BDB_HA"; + private static final String HOST = "127.0.0.1"; + + private GroupMember[] _members; + private ListeningExecutorService _executorService; + private Map<String, String> _lastKnownRoles = new ConcurrentHashMap<>(); + + @Override + public void beforeTestClass(final Class testClass) + { + GroupConfig runBrokerAdmin = (GroupConfig) testClass.getAnnotation(GroupConfig.class); + int numberOfNodes = runBrokerAdmin == null ? 2 : runBrokerAdmin.numberOfNodes(); + String groupName = runBrokerAdmin == null ? "test-ha" : runBrokerAdmin.groupName(); + _executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfNodes)); + + SpawnBrokerAdmin[] admins = + Stream.generate(SpawnBrokerAdmin::new).limit(numberOfNodes).toArray(SpawnBrokerAdmin[]::new); + + boolean started = false; + try + { + int startupTimeout = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000); + awaitFuture(startupTimeout, invokeParallel(Arrays.stream(admins).map(a -> (Callable<Void>) () -> { + a.beforeTestClass(testClass); + return null; + }).collect(Collectors.toList()))); + + _members = initializeGroupData(groupName, admins); + started = true; + } + finally + { + if (!started) + { + for (SpawnBrokerAdmin a : admins) + { + a.afterTestClass(testClass); + } + _executorService.shutdown(); + } + } + } + + @Override + public void beforeTestMethod(final Class testClass, final Method method) + { + _lastKnownRoles.clear(); + GroupMember first = _members[0]; + first.getAdmin().beforeTestMethod(_members[0].getName(), NODE_TYPE, _members[0].getNodeAttributes()); + Object role = awaitNodeRoleReplicaOrMaster(first); + _lastKnownRoles.put(first.getName(), String.valueOf(role)); + ListenableFuture<Void> f; + if (_members.length > 2) + { + f = invokeParallel(Arrays.stream(_members).skip(1).map(m -> (Callable<Void>) () -> { + m.getAdmin().beforeTestMethod(m.getName(), NODE_TYPE, m.getNodeAttributes()); + _lastKnownRoles.put(m.getName(), ROLE_UNKNOWN); + return null; + }).collect(Collectors.toList())); + } + else + { + for (int i = 1; i < _members.length; i++) + { + _members[i].getAdmin() + .beforeTestMethod(_members[i].getName(), NODE_TYPE, _members[i].getNodeAttributes()); + _lastKnownRoles.put(_members[i].getName(), ROLE_UNKNOWN); + } + f = Futures.immediateFuture(null); + } + + awaitFuture(WAIT_LIMIT, f); + awaitAllTransitionIntoReplicaOrMaster(); + } + + + @Override + public void afterTestMethod(final Class testClass, final Method method) + { + awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> { + m.getAdmin().afterTestMethod(testClass, method); + return null; + }).collect(Collectors.toList()))); + _lastKnownRoles.clear(); + } + + @Override + public void afterTestClass(final Class testClass) + { + try + { + awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> { + m.getAdmin().afterTestClass(testClass); + return null; + }).collect(Collectors.toList()))); + } + finally + { + if (_executorService != null) + { + _executorService.shutdown(); + } + } + } + + @Override + public InetSocketAddress getBrokerAddress(final PortType portType) + { + return getLastKnownMasterAdmin().getBrokerAddress(portType); + } + + @Override + public void createQueue(final String queueName) + { + getLastKnownMasterAdmin().createQueue(queueName); + } + + @Override + public void deleteQueue(final String queueName) + { + getLastKnownMasterAdmin().deleteQueue(queueName); + } + + @Override + public void putMessageOnQueue(final String queueName, final String... messages) + { + getLastKnownMasterAdmin().putMessageOnQueue(queueName, messages); + } + + @Override + public int getQueueDepthMessages(final String testQueueName) + { + return getLastKnownMasterAdmin().getQueueDepthMessages(testQueueName); + } + + @Override + public boolean supportsRestart() + { + return getLastKnownMasterAdmin().supportsRestart(); + } + + @Override + public ListenableFuture<Void> restart() + { + awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> { + _lastKnownRoles.put(m.getName(), ROLE_UNKNOWN); + m.getAdmin().restart(); + return null; + }).collect(Collectors.toList()))); + awaitAllTransitionIntoReplicaOrMaster(); + + return Futures.immediateFuture(null); + } + + @Override + public boolean isSASLSupported() + { + return getLastKnownMasterAdmin().isSASLSupported(); + } + + @Override + public boolean isSASLMechanismSupported(final String mechanismName) + { + return getLastKnownMasterAdmin().isSASLMechanismSupported(mechanismName); + } + + @Override + public boolean isWebSocketSupported() + { + return getLastKnownMasterAdmin().isWebSocketSupported(); + } + + @Override + public boolean isQueueDepthSupported() + { + return getLastKnownMasterAdmin().isQueueDepthSupported(); + } + + @Override + public boolean isManagementSupported() + { + return getLastKnownMasterAdmin().isManagementSupported(); + } + + @Override + public String getValidUsername() + { + return getLastKnownMasterAdmin().getValidUsername(); + } + + @Override + public String getValidPassword() + { + return getLastKnownMasterAdmin().getValidPassword(); + } + + @Override + public String getKind() + { + return KIND_BROKER_J; + } + + @Override + public String getType() + { + return "BDB-HA"; + } + + public void stop() + { + awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> { + _lastKnownRoles.put(m.getName(), ROLE_UNKNOWN); + m.getAdmin().stop(); + return null; + }).collect(Collectors.toList()))); + } + + public void start() + { + start(true); + } + + public void start(boolean assertRoles) + { + awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> { + m.getAdmin().start(); + return null; + }).collect(Collectors.toList()))); + + if (assertRoles) + { + awaitAllTransitionIntoReplicaOrMaster(); + } + } + + public void startNode(final int amqpPort) + { + GroupMember member = getMemberByAmqpPort(amqpPort); + member.getAdmin().start(); + awaitNodeRole(amqpPort, ROLE_MASTER, ROLE_REPLICA); + } + + public void stopNode(final int amqpPort) + { + GroupMember member = getMemberByAmqpPort(amqpPort); + member.getAdmin().stop(); + _lastKnownRoles.put(member.getName(), ROLE_UNKNOWN); + } + + public int[] getGroupAmqpPorts() + { + int[] ports = new int[_members.length]; + int i = 0; + for (GroupMember m : _members) + { + ports[i++] = m.getAmqpPort(); + } + return ports; + } + + public int[] getBdbPorts() + { + int[] ports = new int[_members.length]; + int i = 0; + for (GroupMember m : _members) + { + ports[i++] = m.getBdbPort(); + } + return ports; + } + + public int getAmqpPort(final int... exclude) + { + Set<Integer> excluded = Arrays.stream(exclude).boxed().collect(Collectors.toSet()); + return Arrays.stream(_members) + .map(GroupMember::getAmqpPort) + .filter(p -> !excluded.contains(p)) + .findFirst() + .orElseThrow(() -> new BrokerAdminException("Amqp Port not found")); + } + + public String getHost() + { + return HOST; + } + + public Map<Integer, String> groupThreadDumps() + { + Map<Integer, String> threadDumps = new HashMap<>(); + for (GroupMember m : _members) + { + threadDumps.put(m._amqpPort, m.getAdmin().dumpThreads()); + } + return threadDumps; + } + + public String getHelperHostPort() + { + return HOST + ":" + _members[0].getBdbPort(); + } + + public Map<String, Object> getNodeAttributes(final int amqpPort) + { + GroupMember member = getMemberByAmqpPort(amqpPort); + return member.getAdmin().getAttributes(true, member.getName(), AMQP_NODE_TYPE); + } + + public void setNodeAttributes(final int amqpPort, final Map<String, Object> attributes) + { + GroupMember member = getMemberByAmqpPort(amqpPort); + member.getAdmin().update(true, member.getName(), AMQP_NODE_TYPE, attributes); + } + + public void setDesignatedPrimary(int brokerPort, boolean designatedPrimary) + { + setNodeAttributes(brokerPort, Collections.singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, + String.valueOf(designatedPrimary))); + } + + public void awaitNodeRole(final int amqpPort, String... role) + { + Object actualRole = awaitNodeToAttainAttributeValue(amqpPort, BDBHAVirtualHostNode.ROLE, (Object[]) role); + _lastKnownRoles.put(getMemberByAmqpPort(amqpPort).getName(), String.valueOf(actualRole)); + } + + public Object awaitNodeToAttainAttributeValue(final int amqpPort, + final String attributeName, + final Object... attributeValue) + { + GroupMember member = getMemberByAmqpPort(amqpPort); + return member.getAdmin().awaitAttributeValue(WAIT_LIMIT, + true, + member.getName(), + AMQP_NODE_TYPE, + attributeName, + attributeValue); + } + + public Map<String, Object> getRemoteNodeAttributes(final int amqpPort, final int remoteAmqpPort) + { + GroupMember member = getMemberByAmqpPort(amqpPort); + GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort); + return member.getAdmin() + .getAttributes(true, + member.getName() + "/" + member2.getName(), + AMQP_REMOTE_NODE_TYPE); + } + + public void setRemoteNodeAttributes(final int amqpPort, + final int remoteAmqpPort, + final Map<String, Object> attributes) + { + + GroupMember member = getMemberByAmqpPort(amqpPort); + GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort); + member.getAdmin() + .update(true, + member.getName() + "/" + member2.getName(), + AMQP_REMOTE_NODE_TYPE, + attributes); + } + + public void awaitRemoteNodeRole(final int amqpPort, final int remoteAmqpPort, final String... role) + { + + GroupMember member = getMemberByAmqpPort(amqpPort); + GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort); + member.getAdmin().awaitAttributeValue(WAIT_LIMIT, + true, + member.getName() + "/" + member2.getName(), + AMQP_REMOTE_NODE_TYPE, + BDBHARemoteReplicationNode.ROLE, + ROLE_REPLICA, + ROLE_MASTER); + } + + private SpawnBrokerAdmin getNodeAdmin(final int amqpPort) + { + GroupMember member = getMemberByAmqpPort(amqpPort); + return member.getAdmin(); + } + + private SpawnBrokerAdmin getLastKnownMasterAdmin() + { + return getLastKnownMaster().getAdmin(); + } + + private GroupMember[] initializeGroupData(final String groupName, final SpawnBrokerAdmin[] admins) + { + PortHelper helper = new PortHelper(); + int[] ports = new int[admins.length]; + String[] addresses = new String[admins.length]; + int port = -1; + for (int i = 0; i < admins.length; i++) + { + port = i == 0 ? helper.getNextAvailable() : helper.getNextAvailable(port + 1); + addresses[i] = HOST + ":" + port; + ports[i] = port; + } + + Map<String, String> context = new HashMap<>(); + context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + context.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0"); + context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, "{\"type\":\"BDB_HA\"}"); + + String permitted = objectToJson(addresses); + String contextAsString = objectToJson(context); + + GroupMember[] members = new GroupMember[admins.length]; + for (int i = 0; i < admins.length; i++) + { + String nodeName = "node-" + ports[i]; + + Map<String, Object> nodeAttributes = new HashMap<>(); + nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName); + nodeAttributes.put(BDBHAVirtualHostNode.NAME, nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, addresses[i]); + nodeAttributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE); + nodeAttributes.put(BDBHAVirtualHostNode.DEFAULT_VIRTUAL_HOST_NODE, true); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, addresses[0]); + if (i > 0) + { + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node-" + ports[0]); + } + nodeAttributes.put(BDBHAVirtualHostNode.PERMITTED_NODES, permitted); + nodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, contextAsString); + members[i] = new GroupMember(nodeName, + admins[i].getBrokerAddress(PortType.AMQP).getPort(), + port, + admins[i], + nodeAttributes); + } + return members; + } + + private GroupMember getMemberByAmqpPort(final int amqpPort) + { + return Arrays.stream(_members) + .filter(m -> m.getAmqpPort() == amqpPort) + .findFirst() + .orElseThrow(() -> new BrokerAdminException( + String.format("Could not find node by amqp port %d", amqpPort))); + } + + private <T> ListenableFuture<T> invokeParallel(Collection<Callable<T>> tasks) + { + try + { + @SuppressWarnings("unchecked") + List<ListenableFuture<T>> futures = (List) _executorService.invokeAll(tasks); + ListenableFuture<List<T>> combinedFuture = Futures.allAsList(futures); + return Futures.transform(combinedFuture, input -> null, _executorService); + } + catch (InterruptedException e) + { + Thread.interrupted(); + return Futures.immediateFailedFuture(e); + } + } + + private <T> void awaitFuture(long waitLimit, ListenableFuture<T> future) + { + try + { + future.get(waitLimit, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.interrupted(); + throw new BrokerAdminException("Interrupted", e); + } + catch (ExecutionException e) + { + throw new BrokerAdminException("Operation failed", e.getCause()); + } + catch (TimeoutException e) + { + throw new BrokerAdminException("Timeout"); + } + } + + private void awaitAllTransitionIntoReplicaOrMaster() + { + awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> { + awaitNodeRoleReplicaOrMaster(m); + return null; + }).collect(Collectors.toList()))); + + if (_lastKnownRoles.values().stream().noneMatch(role -> ROLE_MASTER.equals(role) || ROLE_REPLICA.equals(role))) + { + throw new BrokerAdminException("Unexpected node roles " + Joiner.on(", ").withKeyValueSeparator(" -> ") + .join(_lastKnownRoles)); + } + } + + private Object awaitNodeRoleReplicaOrMaster(final GroupMember m) + { + Object result = m.getAdmin().awaitAttributeValue(WAIT_LIMIT, + true, + m.getName(), + AMQP_NODE_TYPE, + BDBHAVirtualHostNode.ROLE, + ROLE_REPLICA, + ROLE_MASTER); + _lastKnownRoles.put(m.getName(), String.valueOf(result)); + return result; + } + + + private GroupMember getLastKnownMaster() + { + final Map.Entry<String, String> entry = + _lastKnownRoles.entrySet() + .stream() + .filter(e -> ROLE_MASTER.equals(e.getValue())) + .findFirst() + .orElseThrow(() -> new BrokerAdminException("Master node is not found")); + + return Arrays.stream(_members) + .filter(m -> entry.getKey().equals(m.getName())) + .findFirst() + .orElseThrow(() -> new BrokerAdminException("Master node is not found")); + } + + private String objectToJson(final Object object) + { + try + { + return new ObjectMapper().writeValueAsString(object); + } + catch (JsonProcessingException e) + { + throw new BrokerAdminException("Cannot convert object to json", e); + } + } + + private class GroupMember + { + private final Map<String, Object> _nodeAttributes; + private final SpawnBrokerAdmin _admin; + private final int _bdbPort; + private final int _amqpPort; + private final String _name; + + private GroupMember(final String name, + final int amqpPort, + final int bdbPort, + final SpawnBrokerAdmin admin, + final Map<String, Object> nodeAttributes) + { + _name = name; + _admin = admin; + _bdbPort = bdbPort; + _amqpPort = amqpPort; + _nodeAttributes = nodeAttributes; + } + + private String getName() + { + return _name; + } + + private SpawnBrokerAdmin getAdmin() + { + return _admin; + } + + private int getBdbPort() + { + return _bdbPort; + } + + private int getAmqpPort() + { + return _amqpPort; + } + + private Map<String, Object> getNodeAttributes() + { + return _nodeAttributes; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupConfig.java ---------------------------------------------------------------------- diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupConfig.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupConfig.java new file mode 100644 index 0000000..4ed8b62 --- /dev/null +++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupConfig.java @@ -0,0 +1,33 @@ +package org.apache.qpid.server.store.berkeleydb.replication;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface GroupConfig +{ + int numberOfNodes() default 2; + String groupName() default "test"; +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupJmsTestBase.java ---------------------------------------------------------------------- diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupJmsTestBase.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupJmsTestBase.java new file mode 100644 index 0000000..1c2e6fc --- /dev/null +++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupJmsTestBase.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.systests.ConnectionBuilder; +import org.apache.qpid.systests.JmsProvider; +import org.apache.qpid.systests.JmsTestBase; +import org.apache.qpid.systests.Utils; +import org.apache.qpid.tests.utils.RunBrokerAdmin; + +@RunBrokerAdmin(type = "BDB-HA") +public class GroupJmsTestBase extends JmsTestBase +{ + private static final int FAILOVER_CYCLECOUNT = 40; + private static final int FAILOVER_CONNECTDELAY = 1000; + static final int SHORT_FAILOVER_CYCLECOUNT = 2; + static final int SHORT_FAILOVER_CONNECTDELAY = 200; + + @Override + public GroupBrokerAdmin getBrokerAdmin() + { + return (GroupBrokerAdmin) super.getBrokerAdmin(); + } + + @Override + public ConnectionBuilder getConnectionBuilder() + { + final ConnectionBuilder connectionBuilder = getJmsProvider().getConnectionBuilder() + .setClientId(getTestName()) + .setFailoverReconnectDelay(FAILOVER_CONNECTDELAY) + .setFailoverReconnectAttempts(FAILOVER_CYCLECOUNT) + .setVirtualHost("test") + .setFailover(true) + .setHost(getBrokerAdmin().getHost()); + int[] ports = getBrokerAdmin().getGroupAmqpPorts(); + for (int i = 0; i < ports.length; i++) + { + int port = ports[i]; + if (i == 0) + { + connectionBuilder.setPort(port); + } + else + { + connectionBuilder.addFailoverPort(port); + } + } + return connectionBuilder; + } + + protected void assertProduceConsume(final Queue queue) throws Exception + { + final Connection connection = getConnectionBuilder().build(); + try + { + assertThat(Utils.produceConsume(connection, queue), is(equalTo(true))); + } + finally + { + connection.close(); + } + } + + + protected JmsProvider getJmsProvider() + { + return Utils.getJmsProvider(); + } + + protected Queue createTestQueue(final Connection connection) throws JMSException + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + return getJmsProvider().createQueue(session, getTestName()); + } + finally + { + session.close(); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org