[
https://issues.apache.org/jira/browse/ARTEMIS-4286?focusedWorklogId=862020&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-862020
]
ASF GitHub Bot logged work on ARTEMIS-4286:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/May/23 17:23
Start Date: 22/May/23 17:23
Worklog Time Spent: 10m
Work Description: jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200811548
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -109,41 +113,46 @@ private void scheduleConnect(int delay) {
}, delay, TimeUnit.SECONDS);
}
- private void connect() throws Exception {
- try {
- if (clientConsumer == null) {
- synchronized (this) {
- this.clientSessionFactory = (ClientSessionFactoryInternal)
upstream.getConnection().clientSessionFactory();
- this.clientSession =
clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(),
false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(),
clientSessionFactory.getServerLocator().getAckBatchSize());
- this.clientSession.addFailureListener(this);
- this.clientSession.addMetaData(FEDERATION_NAME,
federation.getName().toString());
- this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME,
upstream.getName().toString());
- this.clientSession.start();
- if (clientSessionCallback != null) {
- clientSessionCallback.callback(clientSession);
- }
- if (clientSession.queueQuery(key.getQueueName()).isExists()) {
- this.clientConsumer =
clientSession.createConsumer(key.getQueueName(), key.getFilterString(),
key.getPriority(), false);
- this.clientConsumer.setMessageHandler(this);
- } else {
- throw new ActiveMQNonExistentQueueException("Queue " +
key.getQueueName() + " does not exist on remote");
+ private synchronized void connect() throws Exception {
+ if (started) {
+ connectionAttemptTimestamp.set(System.currentTimeMillis());
+ try {
+ if (clientConsumer == null) {
+ synchronized (this) {
+ this.clientSessionFactory = (ClientSessionFactoryInternal)
upstream.getConnection().clientSessionFactory();
+ this.clientSession =
clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(),
false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(),
clientSessionFactory.getServerLocator().getAckBatchSize());
+ this.clientSession.addFailureListener(this);
+ this.clientSession.addMetaData(FEDERATION_NAME,
federation.getName().toString());
+ this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME,
upstream.getName().toString());
+ this.clientSession.start();
+ if (clientSessionCallback != null) {
+ clientSessionCallback.callback(clientSession);
+ }
+ if (clientSession.queueQuery(key.getQueueName()).isExists())
{
+ this.clientConsumer =
clientSession.createConsumer(key.getQueueName(), key.getFilterString(),
key.getPriority(), false);
+ this.clientConsumer.setMessageHandler(this);
+ } else {
+ throw new ActiveMQNonExistentQueueException("Queue " +
key.getQueueName() + " does not exist on remote");
+ }
}
}
- }
- } catch (Exception e) {
- try {
- if (clientSessionFactory != null) {
- clientSessionFactory.cleanup();
+ } catch (Exception e) {
+ try {
+ if (clientSessionFactory != null) {
+ clientSessionFactory.cleanup();
+ }
+ disconnect();
+ } catch (ActiveMQException ignored) {
}
- disconnect();
- } catch (ActiveMQException ignored) {
+ throw e;
}
- throw e;
}
}
@Override
- public void close() {
+ public synchronized void close() {
+ started = false;
Review Comment:
I added checks to both `start` & `close`.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import
org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+ @Test
+ public void testClose() throws Exception {
+ ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+ server.start();
+ Federation federation = new Federation(server, new
FederationConfiguration().setName(RandomUtil.randomString()));
+ federation.start();
+ FederatedQueueConsumerImpl consumer = new
FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+ consumer.start();
+ Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
Review Comment:
Fixed.
Issue Time Tracking
-------------------
Worklog Id: (was: 862020)
Time Spent: 1h (was: 50m)
> Sometimes federated consumer won't stop
> ---------------------------------------
>
> Key: ARTEMIS-4286
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4286
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Justin Bertram
> Assignee: Justin Bertram
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> If a federated consumer is in the process of connecting when it's closed it
> won't actually close. It will keep attempting to connect and once it actually
> connects it will operate as if it was never closed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)