Repository: activemq-artemis Updated Branches: refs/heads/2.6.x ae1edf6c6 -> 73b3ebff1
ARTEMIS-1732 AMQP anonymous producer not blocked on max-disk-usage Anonymous senders (those created without a target address) are not blocked when max-disk-usage is reached. The cause is that when such a sender is created on the broker, the broker doesn't check the disk/memory usage and gives out the credit immediately. Squashed with: ----- ARTEMIS-1732 I simplified some of the changes performed at the previous commit. Also I changed GlobalDiskFullTest to actually block the senders. I moved the Runnables from PagingManager into the Util as AtomicRunnable. (cherry picked from commit 0e36e072bdf0c4636623aacbd15912857770c73f) (cherry picked from commit 53e1d601601204dc2aa587fcb3046d5c1d6d026d) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/73b3ebff Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/73b3ebff Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/73b3ebff Branch: refs/heads/2.6.x Commit: 73b3ebff1a8c2aef84d2df90eb671258201b9fa4 Parents: ae1edf6 Author: Howard Gao <[email protected]> Authored: Mon Mar 12 10:33:09 2018 +0800 Committer: Clebert Suconic <[email protected]> Committed: Tue Jul 31 21:40:17 2018 -0400 ---------------------------------------------------------------------- .../artemis/utils/runnables/AtomicRunnable.java | 47 +++++++ .../runnables/AtomicRunnableWithDelegate.java | 32 +++++ .../amqp/broker/AMQPSessionCallback.java | 27 ++-- .../artemis/core/paging/PagingManager.java | 6 + .../core/paging/impl/PagingManagerImpl.java | 39 +++++- .../core/paging/impl/PagingStoreImpl.java | 43 ++---- .../integration/amqp/GlobalDiskFullTest.java | 134 +++++++++++++++++++ .../tests/unit/util/FakePagingManager.java | 5 + 8 files changed, 281 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java new file mode 100644 index 0000000..f1f53ce --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.runnables; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +public abstract class AtomicRunnable implements Runnable { + + + public static Runnable checkAtomic(Runnable run) { + if (run instanceof AtomicRunnable) { + return run; + } else { + return new AtomicRunnableWithDelegate(run); + } + } + + private volatile int ran; + + private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE = + AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran"); + + @Override + public void run() { + if (RAN_UPDATE.compareAndSet(this, 0, 1)) { + atomicRun(); + } + } + + public abstract void atomicRun(); +} + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java new file mode 100644 index 0000000..d1583de --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.runnables; + +public class AtomicRunnableWithDelegate extends AtomicRunnable { + + private final Runnable runnable; + + public AtomicRunnableWithDelegate(Runnable runnable) { + this.runnable = runnable; + } + + @Override + public void atomicRun() { + runnable.run(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 6461bb2..86c0687 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; +import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -576,7 +577,8 @@ public class AMQPSessionCallback implements SessionCallback { final int threshold, final Receiver receiver) { try { - if (address == null) { + PagingManager pagingManager = manager.getServer().getPagingManager(); + Runnable creditRunnable = () -> { connection.lock(); try { receiver.flow(credits); @@ -584,23 +586,14 @@ public class AMQPSessionCallback implements SessionCallback { connection.unlock(); } connection.flush(); - return; + }; + + if (address == null) { + pagingManager.checkMemory(creditRunnable); + } else { + final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); + store.checkMemory(creditRunnable); } - final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); - store.checkMemory(new Runnable() { - @Override - public void run() { - connection.lock(); - try { - if (receiver.getRemoteCredit() <= threshold) { - receiver.flow(credits); - } - } finally { - connection.unlock(); - } - connection.flush(); - } - }); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index 4d472e1..5d8461e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -111,4 +111,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository return 0; } + /** + * Use this when you have no refernce of an address. (anonymous AMQP Producers for example) + * @param runWhenAvailable + */ + void checkMemory(Runnable runWhenAvailable); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index bca70cf..8893984 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -20,9 +20,12 @@ import java.nio.file.FileStore; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.jboss.logging.Logger; public final class PagingManagerImpl implements PagingManager { @@ -74,6 +78,10 @@ public final class PagingManagerImpl implements PagingManager { private volatile boolean diskFull = false; + private final Executor memoryExecutor; + + private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>(); + private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>(); private ActiveMQScheduledComponent scheduledComponent = null; @@ -102,6 +110,7 @@ public final class PagingManagerImpl implements PagingManager { this.addressSettingsRepository = addressSettingsRepository; addressSettingsRepository.registerListener(this); this.maxSize = maxSize; + this.memoryExecutor = pagingSPI.newExecutor(); } public PagingManagerImpl(final PagingStoreFactory pagingSPI, @@ -154,6 +163,13 @@ public final class PagingManagerImpl implements PagingManager { protected void checkMemoryRelease() { if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) { + if (!memoryCallback.isEmpty()) { + if (memoryExecutor != null) { + memoryExecutor.execute(this::memoryReleased); + } else { + memoryReleased(); + } + } Iterator<PagingStore> storeIterator = blockedStored.iterator(); while (storeIterator.hasNext()) { PagingStore store = storeIterator.next(); @@ -187,7 +203,7 @@ public final class PagingManagerImpl implements PagingManager { @Override public void under(FileStore store, double usage) { - if (diskFull) { + if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) { ActiveMQServerLogger.LOGGER.diskCapacityRestored(); diskFull = false; checkMemoryRelease(); @@ -206,6 +222,27 @@ public final class PagingManagerImpl implements PagingManager { } @Override + public void checkMemory(final Runnable runWhenAvailable) { + + if (isGlobalFull()) { + memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable)); + return; + } + runWhenAvailable.run(); + } + + + private void memoryReleased() { + Runnable runnable; + + while ((runnable = memoryCallback.poll()) != null) { + runnable.run(); + } + } + + + + @Override public boolean isGlobalFull() { return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 6a07ffc..06b42a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.jboss.logging.Logger; /** @@ -641,40 +642,16 @@ public class PagingStoreImpl implements PagingStore { } - private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>(); + private final Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>(); - private class MemoryFreedRunnablesExecutor implements Runnable { + private void memoryReleased() { + Runnable runnable; - @Override - public void run() { - Runnable runnable; - - while ((runnable = onMemoryFreedRunnables.poll()) != null) { - runnable.run(); - } + while ((runnable = onMemoryFreedRunnables.poll()) != null) { + runnable.run(); } } - private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor(); - - // To be used when the memory is oversized either by local settings or global settings on blocking addresses - private static final class OverSizedRunnable implements Runnable { - - private final AtomicBoolean ran = new AtomicBoolean(false); - - private final Runnable runnable; - - private OverSizedRunnable(final Runnable runnable) { - this.runnable = runnable; - } - - @Override - public void run() { - if (ran.compareAndSet(false, true)) { - runnable.run(); - } - } - } @Override public boolean checkMemory(final Runnable runWhenAvailable) { @@ -685,9 +662,8 @@ public class PagingStoreImpl implements PagingStore { } } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) { - OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable); - onMemoryFreedRunnables.add(ourRunnable); + onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); // We check again to avoid a race condition where the size can come down just after the element // has been added, but the check to execute was done before the element was added @@ -695,7 +671,7 @@ public class PagingStoreImpl implements PagingStore { // MUCH better performance in a highly concurrent environment if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize || maxSize < 0)) { // run it now - ourRunnable.run(); + runWhenAvailable.run(); } else { if (usingGlobalMaxSize || pagingManager.isDiskFull()) { pagingManager.addBlockedStore(this); @@ -710,7 +686,6 @@ public class PagingStoreImpl implements PagingStore { blocking.set(true); } } - return true; } } @@ -756,7 +731,7 @@ public class PagingStoreImpl implements PagingStore { public boolean checkReleaseMemory(boolean globalOversized, long newSize) { if (!globalOversized && (newSize <= maxSize || maxSize < 0)) { if (!onMemoryFreedRunnables.isEmpty()) { - executor.execute(memoryFreedRunnablesExecutor); + executor.execute(this::memoryReleased); if (blocking.get()) { ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize); blocking.set(false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java new file mode 100644 index 0000000..0e0f86d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; +import java.nio.file.FileStore; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class GlobalDiskFullTest extends AmqpClientTestSupport { + + @Override + protected void addConfiguration(ActiveMQServer server) { + Configuration serverConfig = server.getConfiguration(); + serverConfig.setDiskScanPeriod(100); + } + + @Test + public void testProducerOnDiskFull() throws Exception { + FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0); + final CountDownLatch latch = new CountDownLatch(1); + monitor.addCallback(new FileStoreMonitor.Callback() { + + @Override + public void tick(FileStore store, double usage) { + } + + @Override + public void over(FileStore store, double usage) { + latch.countDown(); + } + @Override + public void under(FileStore store, double usage) { + } + }); + + Assert.assertTrue(latch.await(1, TimeUnit.MINUTES)); + + AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT)); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName()); + byte[] payload = new byte[1000]; + + + AmqpSender anonSender = session.createSender(); + + CountDownLatch sentWithName = new CountDownLatch(1); + CountDownLatch sentAnon = new CountDownLatch(1); + + Thread threadWithName = new Thread() { + @Override + public void run() { + + try { + final AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + sender.setSendTimeout(-1); + sender.send(message); + } catch (Exception e) { + e.printStackTrace(); + } finally { + sentWithName.countDown(); + } + } + }; + + threadWithName.start(); + + + Thread threadWithAnon = new Thread() { + @Override + public void run() { + try { + final AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + anonSender.setSendTimeout(-1); + message.setAddress(getQueueName()); + anonSender.send(message); + } catch (Exception e) { + e.printStackTrace(); + } finally { + sentAnon.countDown(); + } + } + }; + + threadWithAnon.start(); + + Assert.assertFalse("Thread sender should be blocked", sentWithName.await(500, TimeUnit.MILLISECONDS)); + Assert.assertFalse("Thread sender anonymous should be blocked", sentAnon.await(500, TimeUnit.MILLISECONDS)); + + monitor.setMaxUsage(100.0); + + Assert.assertTrue("Thread sender should be released", sentWithName.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("Thread sender anonymous should be released", sentAnon.await(30, TimeUnit.SECONDS)); + + threadWithName.join(TimeUnit.SECONDS.toMillis(30)); + threadWithAnon.join(TimeUnit.SECONDS.toMillis(30)); + Assert.assertFalse(threadWithName.isAlive()); + Assert.assertFalse(threadWithAnon.isAlive()); + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/73b3ebff/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java index d1012a6..94a9d79 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java @@ -35,6 +35,11 @@ public final class FakePagingManager implements PagingManager { } @Override + public void checkMemory(Runnable runWhenAvailable) { + + } + + @Override public void addTransaction(final PageTransactionInfo pageTransaction) { }
