[
https://issues.apache.org/jira/browse/ARTEMIS-4286?focusedWorklogId=862173&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-862173
]
ASF GitHub Bot logged work on ARTEMIS-4286:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 23/May/23 16:37
Start Date: 23/May/23 16:37
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202581258
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -129,22 +144,26 @@ private void connect() throws Exception {
throw new ActiveMQNonExistentQueueException("Queue " +
key.getQueueName() + " does not exist on remote");
}
}
- }
- } catch (Exception e) {
- try {
- if (clientSessionFactory != null) {
- clientSessionFactory.cleanup();
+ } catch (Exception e) {
+ try {
+ if (clientSessionFactory != null) {
+ clientSessionFactory.cleanup();
+ }
+ disconnect();
Review Comment:
Can cleanup() throw? Should disconnect() perhaps be in a finally, in case?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -49,6 +55,8 @@ public class FederatedQueueConsumerImpl implements
FederatedQueueConsumer, Sessi
private final int intialConnectDelayMultiplier = 2;
private final int intialConnectDelayMax = 30;
private final ClientSessionCallback clientSessionCallback;
+ private boolean started = false;
+ private ScheduledFuture currentConnectTask;
Review Comment:
its set/used from different threads so should probably be volatile
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import
org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+ @Test
+ public void testClose() throws Exception {
+ ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+ server.start();
+ Federation federation = new Federation(server, new
FederationConfiguration().setName(RandomUtil.randomString()));
+ federation.start();
+ FederatedQueueConsumerImpl consumer = new
FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+ consumer.start();
+ Wait.assertTrue(() -> consumer.getCurrentConnectTask() != null, 2000,
20);
Review Comment:
As its being set before return from start(), could probably just verify it
was null before, not-null after.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -129,22 +144,26 @@ private void connect() throws Exception {
throw new ActiveMQNonExistentQueueException("Queue " +
key.getQueueName() + " does not exist on remote");
}
}
- }
- } catch (Exception e) {
- try {
- if (clientSessionFactory != null) {
- clientSessionFactory.cleanup();
+ } catch (Exception e) {
+ try {
+ if (clientSessionFactory != null) {
+ clientSessionFactory.cleanup();
+ }
+ disconnect();
+ } catch (ActiveMQException ignored) {
}
- disconnect();
- } catch (ActiveMQException ignored) {
+ throw e;
}
- throw e;
}
}
@Override
- public void close() {
- scheduleDisconnect(0);
+ public synchronized void close() {
+ if (started) {
+ started = false;
+ currentConnectTask.cancel(true);
Review Comment:
Not sure it should try to bother interrupt a running connect? Wondering if
that could make it throw, and then handle it by scheduling another connect
hehe. Since this goes on to schedule a disconnect anyway it doesnt seem like it
would necessarily be gaining much to try interrupting?
Issue Time Tracking
-------------------
Worklog Id: (was: 862173)
Time Spent: 2h (was: 1h 50m)
> Sometimes federated consumer won't stop
> ---------------------------------------
>
> Key: ARTEMIS-4286
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4286
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Justin Bertram
> Assignee: Justin Bertram
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> If a federated consumer is in the process of connecting when it's closed it
> won't actually close. It will keep attempting to connect and once it actually
> connects it will operate as if it was never closed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)