This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ac7b0e85fb ARTEMIS-4113 Fix NPE for backup brokers with connection 
routers
ac7b0e85fb is described below

commit ac7b0e85fbebaa713072c30d4e159d8f3a402ba3
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Sat Dec 10 08:46:37 2022 +0100

    ARTEMIS-4113 Fix NPE for backup brokers with connection routers
    
    The nodeID on backup brokers is available only after they become live.
---
 .../core/server/routing/ConnectionRouter.java      |  8 ++++
 .../server/routing/ConnectionRouterManager.java    |  4 +-
 .../routing/pools/DiscoveryGroupService.java       | 12 ++++--
 .../core/server/routing/targets/LocalTarget.java   |  6 ++-
 .../routing/ConnectionRouterManagerTest.java       |  2 -
 .../core/server/routing/ConnectionRouterTest.java  |  3 --
 .../tests/integration/routing/KeyTypeTest.java     | 44 ++++++++++++++++++++++
 7 files changed, 66 insertions(+), 13 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
index c89e17d646..c0e4335a42 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
@@ -128,6 +128,10 @@ public class ConnectionRouter implements ActiveMQComponent 
{
 
    @Override
    public void start() throws Exception {
+      if (localTarget != null) {
+         localTarget.getTarget().connect();
+      }
+
       if (cache != null) {
          cache.start();
       }
@@ -150,6 +154,10 @@ public class ConnectionRouter implements ActiveMQComponent 
{
       if (cache != null) {
          cache.stop();
       }
+
+      if (localTarget != null) {
+         localTarget.getTarget().disconnect();
+      }
    }
 
    public TargetResult getTarget(Connection connection, String clientID, 
String username) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
index e3ac3aba61..1e7c3de5a4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.routing;
 
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
 import 
org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
 import org.apache.activemq.artemis.core.config.routing.CacheConfiguration;
 import 
org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
@@ -142,8 +141,7 @@ public final class ConnectionRouterManager implements 
ActiveMQComponent {
          DiscoveryGroupConfiguration discoveryGroupConfiguration = 
server.getConfiguration().
             
getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
 
-         DiscoveryService discoveryService = new DiscoveryGroupService(new 
DiscoveryGroup(server.getNodeID().toString(), config.getDiscoveryGroupName(),
-            discoveryGroupConfiguration.getRefreshTimeout(), 
discoveryGroupConfiguration.getBroadcastEndpointFactory(), null));
+         DiscoveryService discoveryService = new 
DiscoveryGroupService(localTarget, discoveryGroupConfiguration);
 
          pool = new DiscoveryPool(targetFactory, scheduledExecutor, 
config.getCheckPeriod(), discoveryService);
       } else if (config.getStaticConnectors() != null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
index b2e7951adb..1e384c9027 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
@@ -16,9 +16,11 @@
  */
 package org.apache.activemq.artemis.core.server.routing.pools;
 
+import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
 import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
 import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
+import org.apache.activemq.artemis.core.server.routing.targets.Target;
 
 import java.util.HashMap;
 import java.util.List;
@@ -26,16 +28,20 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class DiscoveryGroupService extends DiscoveryService implements 
DiscoveryListener {
-   private final DiscoveryGroup discoveryGroup;
+   private final Target localTarget;
+   private final DiscoveryGroupConfiguration config;
+   private DiscoveryGroup discoveryGroup;
 
    private final Map<String, Entry> entries = new ConcurrentHashMap<>();
 
-   public DiscoveryGroupService(DiscoveryGroup discoveryGroup) {
-      this.discoveryGroup = discoveryGroup;
+   public DiscoveryGroupService(Target localTarget, 
DiscoveryGroupConfiguration config) {
+      this.localTarget = localTarget;
+      this.config = config;
    }
 
    @Override
    public void start() throws Exception {
+      discoveryGroup = new DiscoveryGroup(localTarget.getNodeID(), 
config.getName(), config.getRefreshTimeout(), 
config.getBroadcastEndpointFactory(), null);
       discoveryGroup.registerListener(this);
 
       discoveryGroup.start();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java
index 9232f9adf0..afa13cc95c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java
@@ -25,7 +25,7 @@ public class LocalTarget extends AbstractTarget {
    private final ManagementService managementService;
 
    public LocalTarget(TransportConfiguration connector, ActiveMQServer server) 
{
-      super(connector, server.getNodeID().toString());
+      super(connector, null);
 
       this.server = server;
       this.managementService = server.getManagementService();
@@ -43,7 +43,9 @@ public class LocalTarget extends AbstractTarget {
 
    @Override
    public void connect() throws Exception {
-
+      if (getNodeID() == null) {
+         setNodeID(server.getNodeID().toString());
+      }
    }
 
    @Override
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
index 44bcd05ec8..427eab32ff 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.routing;
 
 import java.util.Collections;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
 import 
org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
 import org.apache.activemq.artemis.core.config.routing.PoolConfiguration;
@@ -46,7 +45,6 @@ public class ConnectionRouterManagerTest {
    public void setUp() throws Exception {
 
       mockServer = mock(ActiveMQServer.class);
-      
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
 
       underTest = new ConnectionRouterManager(null, mockServer, null);
       underTest.start();
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
index 6e7aa732c1..17f84e80fc 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.routing;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.routing.policies.AbstractPolicy;
 import org.apache.activemq.artemis.core.server.routing.policies.Policy;
@@ -29,7 +28,6 @@ import 
org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -41,7 +39,6 @@ public class ConnectionRouterTest {
    @Before
    public void setUp() {
       ActiveMQServer mockServer = mock(ActiveMQServer.class);
-      
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
       localTarget = new LocalTarget(null, mockServer);
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java
index 511e102ef5..d69c1ab6b7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java
@@ -16,9 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.routing;
 
+import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
+import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import 
org.apache.activemq.artemis.core.server.routing.policies.FirstElementPolicy;
 import org.apache.activemq.artemis.core.server.routing.policies.Policy;
 import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactory;
@@ -47,6 +51,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 @RunWith(Parameterized.class)
 public class KeyTypeTest extends RoutingTestBase {
@@ -121,6 +126,45 @@ public class KeyTypeTest extends RoutingTestBase {
       Assert.assertEquals("test", keys.get(0));
    }
 
+   @Override
+   protected boolean isForceUniqueStorageManagerIds() {
+      return false;
+   }
+
+   @Test
+   public void testClientIDKeyOnBackup() throws Exception {
+      setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, 
false);
+      setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", 
MessageLoadBalancingType.OFF, 1, true);
+      setupRouterServerWithCluster(0, KeyType.CLIENT_ID, 
FirstElementPolicy.NAME, null, true, null, 1, "cluster0");
+      setupBackupServer(1, 0, false, HAType.SharedNothingReplication, true);
+      UDPBroadcastEndpointFactory endpoint = new 
UDPBroadcastEndpointFactory().setGroupAddress(GROUP_ADDRESS).setGroupPort(GROUP_PORT);
+      List<String> connectorInfos = 
getServer(1).getConfiguration().getConnectorConfigurations().keySet().stream().collect(Collectors.toList());
+      BroadcastGroupConfiguration bcConfig = new 
BroadcastGroupConfiguration().setName("bg1").setBroadcastPeriod(1000).setConnectorInfos(connectorInfos).setEndpointFactory(endpoint);
+      DiscoveryGroupConfiguration dcConfig = new 
DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(5000).setDiscoveryInitialWaitTimeout(5000).setBroadcastEndpointFactory(endpoint);
+      
getServer(1).getConfiguration().addBroadcastGroupConfiguration(bcConfig).addDiscoveryGroupConfiguration(dcConfig.getName(),
 dcConfig);
+      setupDiscoveryClusterConnection("cluster0", 1, "dg1", "queues", 
MessageLoadBalancingType.OFF, 1, true);
+      setupRouterServerWithCluster(1, KeyType.CLIENT_ID, MOCK_POLICY_NAME, 
null, true, null, 1, "cluster0");
+      startServers(0, 1);
+
+      waitForTopology(getServer(0), 1, 1);
+
+      getServer(0).fail(true);
+
+      waitForFailoverTopology(1);
+
+      ConnectionFactory connectionFactory = createFactory(protocol, false, 
TransportConstants.DEFAULT_HOST,
+                                                          
TransportConstants.DEFAULT_PORT + 1, "test", null, null);
+
+      keys.clear();
+
+      try (Connection connection = connectionFactory.createConnection()) {
+         connection.start();
+      }
+
+      Assert.assertEquals(1, keys.size());
+      Assert.assertEquals("test", keys.get(0));
+   }
+
    @Test
    public void testSNIHostKey() throws Exception {
       String localHostname = "localhost.localdomain";

Reply via email to