[
https://issues.apache.org/jira/browse/ARTEMIS-4286?focusedWorklogId=862007&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-862007
]
ASF GitHub Bot logged work on ARTEMIS-4286:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/May/23 16:06
Start Date: 22/May/23 16:06
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200650735
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -109,41 +113,46 @@ private void scheduleConnect(int delay) {
}, delay, TimeUnit.SECONDS);
}
- private void connect() throws Exception {
- try {
- if (clientConsumer == null) {
- synchronized (this) {
- this.clientSessionFactory = (ClientSessionFactoryInternal)
upstream.getConnection().clientSessionFactory();
- this.clientSession =
clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(),
false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(),
clientSessionFactory.getServerLocator().getAckBatchSize());
- this.clientSession.addFailureListener(this);
- this.clientSession.addMetaData(FEDERATION_NAME,
federation.getName().toString());
- this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME,
upstream.getName().toString());
- this.clientSession.start();
- if (clientSessionCallback != null) {
- clientSessionCallback.callback(clientSession);
- }
- if (clientSession.queueQuery(key.getQueueName()).isExists()) {
- this.clientConsumer =
clientSession.createConsumer(key.getQueueName(), key.getFilterString(),
key.getPriority(), false);
- this.clientConsumer.setMessageHandler(this);
- } else {
- throw new ActiveMQNonExistentQueueException("Queue " +
key.getQueueName() + " does not exist on remote");
+ private synchronized void connect() throws Exception {
+ if (started) {
+ connectionAttemptTimestamp.set(System.currentTimeMillis());
+ try {
+ if (clientConsumer == null) {
+ synchronized (this) {
+ this.clientSessionFactory = (ClientSessionFactoryInternal)
upstream.getConnection().clientSessionFactory();
+ this.clientSession =
clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(),
false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(),
clientSessionFactory.getServerLocator().getAckBatchSize());
+ this.clientSession.addFailureListener(this);
+ this.clientSession.addMetaData(FEDERATION_NAME,
federation.getName().toString());
+ this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME,
upstream.getName().toString());
+ this.clientSession.start();
+ if (clientSessionCallback != null) {
+ clientSessionCallback.callback(clientSession);
+ }
+ if (clientSession.queueQuery(key.getQueueName()).isExists())
{
+ this.clientConsumer =
clientSession.createConsumer(key.getQueueName(), key.getFilterString(),
key.getPriority(), false);
+ this.clientConsumer.setMessageHandler(this);
+ } else {
+ throw new ActiveMQNonExistentQueueException("Queue " +
key.getQueueName() + " does not exist on remote");
+ }
}
}
- }
- } catch (Exception e) {
- try {
- if (clientSessionFactory != null) {
- clientSessionFactory.cleanup();
+ } catch (Exception e) {
+ try {
+ if (clientSessionFactory != null) {
+ clientSessionFactory.cleanup();
+ }
+ disconnect();
+ } catch (ActiveMQException ignored) {
}
- disconnect();
- } catch (ActiveMQException ignored) {
+ throw e;
}
- throw e;
}
}
@Override
- public void close() {
+ public synchronized void close() {
+ started = false;
Review Comment:
Could also check that it was started originally, so that close fully noops
rather than scheduling a task if there is nothing to do.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import
org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+ @Test
+ public void testClose() throws Exception {
+ ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+ server.start();
+ Federation federation = new Federation(server, new
FederationConfiguration().setName(RandomUtil.randomString()));
+ federation.start();
+ FederatedQueueConsumerImpl consumer = new
FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+ consumer.start();
+ Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
Review Comment:
Its unclear this should wait up to 30 seconds at 100ms intervals. A lower
timeout and interval would seem appropriate for what it is checking in this
case.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import
org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+ @Test
+ public void testClose() throws Exception {
+ ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+ server.start();
+ Federation federation = new Federation(server, new
FederationConfiguration().setName(RandomUtil.randomString()));
+ federation.start();
+ FederatedQueueConsumerImpl consumer = new
FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+ consumer.start();
+ Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+ consumer.close();
+ long closed = System.currentTimeMillis();
+ assertFalse(Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp()
> closed, 5000, 100));
Review Comment:
Is it necessary to burn 5 seconds waiting for it not to be set to something
else? Even with a couple of exceptions+retries after an initial connect failure
it would only get to ~3sec total, though it seems likely the test would
complete inside the first retry, so 1sec typically.
Verifying somehow (inc/dec a counter?) that there become no outstanding
connect tasks to run would seem like a way to ensure more quickly and reliably
that its not going to continue making new attempts.
Issue Time Tracking
-------------------
Worklog Id: (was: 862007)
Time Spent: 40m (was: 0.5h)
> Sometimes federated consumer won't stop
> ---------------------------------------
>
> Key: ARTEMIS-4286
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4286
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Justin Bertram
> Assignee: Justin Bertram
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> If a federated consumer is in the process of connecting when it's closed it
> won't actually close. It will keep attempting to connect and once it actually
> connects it will operate as if it was never closed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)