This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ac7b0e85fb ARTEMIS-4113 Fix NPE for backup brokers with connection
routers
ac7b0e85fb is described below
commit ac7b0e85fbebaa713072c30d4e159d8f3a402ba3
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Sat Dec 10 08:46:37 2022 +0100
ARTEMIS-4113 Fix NPE for backup brokers with connection routers
The nodeID on backup brokers is available only after they become live.
---
.../core/server/routing/ConnectionRouter.java | 8 ++++
.../server/routing/ConnectionRouterManager.java | 4 +-
.../routing/pools/DiscoveryGroupService.java | 12 ++++--
.../core/server/routing/targets/LocalTarget.java | 6 ++-
.../routing/ConnectionRouterManagerTest.java | 2 -
.../core/server/routing/ConnectionRouterTest.java | 3 --
.../tests/integration/routing/KeyTypeTest.java | 44 ++++++++++++++++++++++
7 files changed, 66 insertions(+), 13 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
index c89e17d646..c0e4335a42 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
@@ -128,6 +128,10 @@ public class ConnectionRouter implements ActiveMQComponent
{
@Override
public void start() throws Exception {
+ if (localTarget != null) {
+ localTarget.getTarget().connect();
+ }
+
if (cache != null) {
cache.start();
}
@@ -150,6 +154,10 @@ public class ConnectionRouter implements ActiveMQComponent
{
if (cache != null) {
cache.stop();
}
+
+ if (localTarget != null) {
+ localTarget.getTarget().disconnect();
+ }
}
public TargetResult getTarget(Connection connection, String clientID,
String username) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
index e3ac3aba61..1e7c3de5a4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.routing;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import
org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.routing.CacheConfiguration;
import
org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
@@ -142,8 +141,7 @@ public final class ConnectionRouterManager implements
ActiveMQComponent {
DiscoveryGroupConfiguration discoveryGroupConfiguration =
server.getConfiguration().
getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
- DiscoveryService discoveryService = new DiscoveryGroupService(new
DiscoveryGroup(server.getNodeID().toString(), config.getDiscoveryGroupName(),
- discoveryGroupConfiguration.getRefreshTimeout(),
discoveryGroupConfiguration.getBroadcastEndpointFactory(), null));
+ DiscoveryService discoveryService = new
DiscoveryGroupService(localTarget, discoveryGroupConfiguration);
pool = new DiscoveryPool(targetFactory, scheduledExecutor,
config.getCheckPeriod(), discoveryService);
} else if (config.getStaticConnectors() != null) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
index b2e7951adb..1e384c9027 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
@@ -16,9 +16,11 @@
*/
package org.apache.activemq.artemis.core.server.routing.pools;
+import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
+import org.apache.activemq.artemis.core.server.routing.targets.Target;
import java.util.HashMap;
import java.util.List;
@@ -26,16 +28,20 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class DiscoveryGroupService extends DiscoveryService implements
DiscoveryListener {
- private final DiscoveryGroup discoveryGroup;
+ private final Target localTarget;
+ private final DiscoveryGroupConfiguration config;
+ private DiscoveryGroup discoveryGroup;
private final Map<String, Entry> entries = new ConcurrentHashMap<>();
- public DiscoveryGroupService(DiscoveryGroup discoveryGroup) {
- this.discoveryGroup = discoveryGroup;
+ public DiscoveryGroupService(Target localTarget,
DiscoveryGroupConfiguration config) {
+ this.localTarget = localTarget;
+ this.config = config;
}
@Override
public void start() throws Exception {
+ discoveryGroup = new DiscoveryGroup(localTarget.getNodeID(),
config.getName(), config.getRefreshTimeout(),
config.getBroadcastEndpointFactory(), null);
discoveryGroup.registerListener(this);
discoveryGroup.start();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java
index 9232f9adf0..afa13cc95c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java
@@ -25,7 +25,7 @@ public class LocalTarget extends AbstractTarget {
private final ManagementService managementService;
public LocalTarget(TransportConfiguration connector, ActiveMQServer server)
{
- super(connector, server.getNodeID().toString());
+ super(connector, null);
this.server = server;
this.managementService = server.getManagementService();
@@ -43,7 +43,9 @@ public class LocalTarget extends AbstractTarget {
@Override
public void connect() throws Exception {
-
+ if (getNodeID() == null) {
+ setNodeID(server.getNodeID().toString());
+ }
}
@Override
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
index 44bcd05ec8..427eab32ff 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.routing;
import java.util.Collections;
-import org.apache.activemq.artemis.api.core.SimpleString;
import
org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import
org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.routing.PoolConfiguration;
@@ -46,7 +45,6 @@ public class ConnectionRouterManagerTest {
public void setUp() throws Exception {
mockServer = mock(ActiveMQServer.class);
-
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
underTest = new ConnectionRouterManager(null, mockServer, null);
underTest.start();
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
index 6e7aa732c1..17f84e80fc 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.routing;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.routing.policies.AbstractPolicy;
import org.apache.activemq.artemis.core.server.routing.policies.Policy;
@@ -29,7 +28,6 @@ import
org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@@ -41,7 +39,6 @@ public class ConnectionRouterTest {
@Before
public void setUp() {
ActiveMQServer mockServer = mock(ActiveMQServer.class);
-
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
localTarget = new LocalTarget(null, mockServer);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java
index 511e102ef5..d69c1ab6b7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java
@@ -16,9 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.routing;
+import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import
org.apache.activemq.artemis.core.server.routing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.routing.policies.Policy;
import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactory;
@@ -47,6 +51,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class KeyTypeTest extends RoutingTestBase {
@@ -121,6 +126,45 @@ public class KeyTypeTest extends RoutingTestBase {
Assert.assertEquals("test", keys.get(0));
}
+ @Override
+ protected boolean isForceUniqueStorageManagerIds() {
+ return false;
+ }
+
+ @Test
+ public void testClientIDKeyOnBackup() throws Exception {
+ setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true,
false);
+ setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues",
MessageLoadBalancingType.OFF, 1, true);
+ setupRouterServerWithCluster(0, KeyType.CLIENT_ID,
FirstElementPolicy.NAME, null, true, null, 1, "cluster0");
+ setupBackupServer(1, 0, false, HAType.SharedNothingReplication, true);
+ UDPBroadcastEndpointFactory endpoint = new
UDPBroadcastEndpointFactory().setGroupAddress(GROUP_ADDRESS).setGroupPort(GROUP_PORT);
+ List<String> connectorInfos =
getServer(1).getConfiguration().getConnectorConfigurations().keySet().stream().collect(Collectors.toList());
+ BroadcastGroupConfiguration bcConfig = new
BroadcastGroupConfiguration().setName("bg1").setBroadcastPeriod(1000).setConnectorInfos(connectorInfos).setEndpointFactory(endpoint);
+ DiscoveryGroupConfiguration dcConfig = new
DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(5000).setDiscoveryInitialWaitTimeout(5000).setBroadcastEndpointFactory(endpoint);
+
getServer(1).getConfiguration().addBroadcastGroupConfiguration(bcConfig).addDiscoveryGroupConfiguration(dcConfig.getName(),
dcConfig);
+ setupDiscoveryClusterConnection("cluster0", 1, "dg1", "queues",
MessageLoadBalancingType.OFF, 1, true);
+ setupRouterServerWithCluster(1, KeyType.CLIENT_ID, MOCK_POLICY_NAME,
null, true, null, 1, "cluster0");
+ startServers(0, 1);
+
+ waitForTopology(getServer(0), 1, 1);
+
+ getServer(0).fail(true);
+
+ waitForFailoverTopology(1);
+
+ ConnectionFactory connectionFactory = createFactory(protocol, false,
TransportConstants.DEFAULT_HOST,
+
TransportConstants.DEFAULT_PORT + 1, "test", null, null);
+
+ keys.clear();
+
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ }
+
+ Assert.assertEquals(1, keys.size());
+ Assert.assertEquals("test", keys.get(0));
+ }
+
@Test
public void testSNIHostKey() throws Exception {
String localHostname = "localhost.localdomain";