[ 
https://issues.apache.org/jira/browse/ARTEMIS-4286?focusedWorklogId=862007&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-862007
 ]

ASF GitHub Bot logged work on ARTEMIS-4286:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/May/23 16:06
            Start Date: 22/May/23 16:06
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200650735


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -109,41 +113,46 @@ private void scheduleConnect(int delay) {
       }, delay, TimeUnit.SECONDS);
    }
 
-   private void connect() throws Exception {
-      try {
-         if (clientConsumer == null) {
-            synchronized (this) {
-               this.clientSessionFactory = (ClientSessionFactoryInternal) 
upstream.getConnection().clientSessionFactory();
-               this.clientSession = 
clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), 
false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), 
clientSessionFactory.getServerLocator().getAckBatchSize());
-               this.clientSession.addFailureListener(this);
-               this.clientSession.addMetaData(FEDERATION_NAME, 
federation.getName().toString());
-               this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, 
upstream.getName().toString());
-               this.clientSession.start();
-               if (clientSessionCallback != null) {
-                  clientSessionCallback.callback(clientSession);
-               }
-               if (clientSession.queueQuery(key.getQueueName()).isExists()) {
-                  this.clientConsumer = 
clientSession.createConsumer(key.getQueueName(), key.getFilterString(), 
key.getPriority(), false);
-                  this.clientConsumer.setMessageHandler(this);
-               } else {
-                  throw new ActiveMQNonExistentQueueException("Queue " + 
key.getQueueName() + " does not exist on remote");
+   private synchronized void connect() throws Exception {
+      if (started) {
+         connectionAttemptTimestamp.set(System.currentTimeMillis());
+         try {
+            if (clientConsumer == null) {
+               synchronized (this) {
+                  this.clientSessionFactory = (ClientSessionFactoryInternal) 
upstream.getConnection().clientSessionFactory();
+                  this.clientSession = 
clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), 
false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), 
clientSessionFactory.getServerLocator().getAckBatchSize());
+                  this.clientSession.addFailureListener(this);
+                  this.clientSession.addMetaData(FEDERATION_NAME, 
federation.getName().toString());
+                  this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, 
upstream.getName().toString());
+                  this.clientSession.start();
+                  if (clientSessionCallback != null) {
+                     clientSessionCallback.callback(clientSession);
+                  }
+                  if (clientSession.queueQuery(key.getQueueName()).isExists()) 
{
+                     this.clientConsumer = 
clientSession.createConsumer(key.getQueueName(), key.getFilterString(), 
key.getPriority(), false);
+                     this.clientConsumer.setMessageHandler(this);
+                  } else {
+                     throw new ActiveMQNonExistentQueueException("Queue " + 
key.getQueueName() + " does not exist on remote");
+                  }
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();
+            } catch (ActiveMQException ignored) {
             }
-            disconnect();
-         } catch (ActiveMQException ignored) {
+            throw e;
          }
-         throw e;
       }
    }
 
    @Override
-   public void close() {
+   public synchronized void close() {
+      started = false;

Review Comment:
   Could also check that it was started originally, so that close fully noops 
rather than scheduling a task if there is nothing to do.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import 
org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new 
FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new 
FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);

Review Comment:
   Its unclear this should wait up to 30 seconds at 100ms intervals. A lower 
timeout and interval would seem appropriate for what it is checking in this 
case.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import 
org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new 
FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new 
FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+      consumer.close();
+      long closed = System.currentTimeMillis();
+      assertFalse(Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() 
> closed, 5000, 100));

Review Comment:
   Is it necessary to burn 5 seconds waiting for it not to be set to something 
else? Even with a couple of exceptions+retries after an initial connect failure 
it would only get to ~3sec total, though it seems likely the test would 
complete inside the first retry, so 1sec typically.
   
   Verifying somehow (inc/dec a counter?) that there become no outstanding 
connect tasks to run would seem like a way to ensure more quickly and reliably 
that its not going to continue making new attempts.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 862007)
    Time Spent: 40m  (was: 0.5h)

> Sometimes federated consumer won't stop
> ---------------------------------------
>
>                 Key: ARTEMIS-4286
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4286
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Justin Bertram
>            Assignee: Justin Bertram
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> If a federated consumer is in the process of connecting when it's closed it 
> won't actually close. It will keep attempting to connect and once it actually 
> connects it will operate as if it was never closed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to