gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200650735


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -109,41 +113,46 @@ private void scheduleConnect(int delay) {
       }, delay, TimeUnit.SECONDS);
    }
 
-   private void connect() throws Exception {
-      try {
-         if (clientConsumer == null) {
-            synchronized (this) {
-               this.clientSessionFactory = (ClientSessionFactoryInternal) 
upstream.getConnection().clientSessionFactory();
-               this.clientSession = 
clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), 
false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), 
clientSessionFactory.getServerLocator().getAckBatchSize());
-               this.clientSession.addFailureListener(this);
-               this.clientSession.addMetaData(FEDERATION_NAME, 
federation.getName().toString());
-               this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, 
upstream.getName().toString());
-               this.clientSession.start();
-               if (clientSessionCallback != null) {
-                  clientSessionCallback.callback(clientSession);
-               }
-               if (clientSession.queueQuery(key.getQueueName()).isExists()) {
-                  this.clientConsumer = 
clientSession.createConsumer(key.getQueueName(), key.getFilterString(), 
key.getPriority(), false);
-                  this.clientConsumer.setMessageHandler(this);
-               } else {
-                  throw new ActiveMQNonExistentQueueException("Queue " + 
key.getQueueName() + " does not exist on remote");
+   private synchronized void connect() throws Exception {
+      if (started) {
+         connectionAttemptTimestamp.set(System.currentTimeMillis());
+         try {
+            if (clientConsumer == null) {
+               synchronized (this) {
+                  this.clientSessionFactory = (ClientSessionFactoryInternal) 
upstream.getConnection().clientSessionFactory();
+                  this.clientSession = 
clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), 
false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), 
clientSessionFactory.getServerLocator().getAckBatchSize());
+                  this.clientSession.addFailureListener(this);
+                  this.clientSession.addMetaData(FEDERATION_NAME, 
federation.getName().toString());
+                  this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, 
upstream.getName().toString());
+                  this.clientSession.start();
+                  if (clientSessionCallback != null) {
+                     clientSessionCallback.callback(clientSession);
+                  }
+                  if (clientSession.queueQuery(key.getQueueName()).isExists()) 
{
+                     this.clientConsumer = 
clientSession.createConsumer(key.getQueueName(), key.getFilterString(), 
key.getPriority(), false);
+                     this.clientConsumer.setMessageHandler(this);
+                  } else {
+                     throw new ActiveMQNonExistentQueueException("Queue " + 
key.getQueueName() + " does not exist on remote");
+                  }
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();
+            } catch (ActiveMQException ignored) {
             }
-            disconnect();
-         } catch (ActiveMQException ignored) {
+            throw e;
          }
-         throw e;
       }
    }
 
    @Override
-   public void close() {
+   public synchronized void close() {
+      started = false;

Review Comment:
   Could also check that it was started originally, so that close fully noops 
rather than scheduling a task if there is nothing to do.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import 
org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new 
FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new 
FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);

Review Comment:
   Its unclear this should wait up to 30 seconds at 100ms intervals. A lower 
timeout and interval would seem appropriate for what it is checking in this 
case.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import 
org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new 
FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new 
FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+      consumer.close();
+      long closed = System.currentTimeMillis();
+      assertFalse(Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() 
> closed, 5000, 100));

Review Comment:
   Is it necessary to burn 5 seconds waiting for it not to be set to something 
else? Even with a couple of exceptions+retries after an initial connect failure 
it would only get to ~3sec total, though it seems likely the test would 
complete inside the first retry, so 1sec typically.
   
   Verifying somehow (inc/dec a counter?) that there become no outstanding 
connect tasks to run would seem like a way to ensure more quickly and reliably 
that its not going to continue making new attempts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to