ARTEMIS-1732 I simplified some of the changes performed at the previous commit.
Also I changed GlobalDiskFullTest to actually block the senders. I moved the Runnables from PagingManager into the Util as AtomicRunnable. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e36e072 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e36e072 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e36e072 Branch: refs/heads/master Commit: 0e36e072bdf0c4636623aacbd15912857770c73f Parents: 53e1d60 Author: Clebert Suconic <[email protected]> Authored: Tue Jul 31 21:08:46 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Tue Jul 31 21:39:04 2018 -0400 ---------------------------------------------------------------------- .../artemis/utils/runnables/AtomicRunnable.java | 47 ++++++++++++ .../runnables/AtomicRunnableWithDelegate.java | 32 ++++++++ .../amqp/broker/AMQPSessionCallback.java | 10 +-- .../artemis/core/paging/PagingManager.java | 59 ++------------ .../artemis/core/paging/PagingStore.java | 7 +- .../core/paging/impl/PagingManagerImpl.java | 81 ++++++++++---------- .../core/paging/impl/PagingStoreImpl.java | 23 ++++-- .../core/server/ActiveMQServerLogger.java | 13 ---- .../core/server/files/FileStoreMonitor.java | 9 +-- .../core/server/files/FileStoreMonitorTest.java | 10 +++ .../integration/amqp/GlobalDiskFullTest.java | 75 ++++++++++++++---- .../tests/unit/util/FakePagingManager.java | 12 +-- 12 files changed, 232 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java new file mode 100644 index 0000000..f1f53ce --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.runnables; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +public abstract class AtomicRunnable implements Runnable { + + + public static Runnable checkAtomic(Runnable run) { + if (run instanceof AtomicRunnable) { + return run; + } else { + return new AtomicRunnableWithDelegate(run); + } + } + + private volatile int ran; + + private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE = + AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran"); + + @Override + public void run() { + if (RAN_UPDATE.compareAndSet(this, 0, 1)) { + atomicRun(); + } + } + + public abstract void atomicRun(); +} + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java new file mode 100644 index 0000000..d1583de --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnableWithDelegate.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.runnables; + +public class AtomicRunnableWithDelegate extends AtomicRunnable { + + private final Runnable runnable; + + public AtomicRunnableWithDelegate(Runnable runnable) { + this.runnable = runnable; + } + + @Override + public void atomicRun() { + runnable.run(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 1f5ccbc..86c0687 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -581,9 +581,7 @@ public class AMQPSessionCallback implements SessionCallback { Runnable creditRunnable = () -> { connection.lock(); try { - if (receiver.getRemoteCredit() <= threshold) { - receiver.flow(credits); - } + receiver.flow(credits); } finally { connection.unlock(); } @@ -592,10 +590,10 @@ public class AMQPSessionCallback implements SessionCallback { if (address == null) { pagingManager.checkMemory(creditRunnable); - return; + } else { + final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); + store.checkMemory(creditRunnable); } - final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); - store.checkMemory(creditRunnable); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index c8eb2ec..5d8461e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -17,9 +17,6 @@ package org.apache.activemq.artemis.core.paging; import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -82,7 +79,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository void resumeCleanup(); - void addBlockedStore(Blockable store); + void addBlockedStore(PagingStore store); void injectMonitor(FileStoreMonitor monitor) throws Exception; @@ -114,54 +111,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository return 0; } - boolean checkMemory(Runnable runnable); - - // To be used when the memory is oversized either by local settings or global settings on blocking addresses - final class OverSizedRunnable implements Runnable { - - private final AtomicBoolean ran = new AtomicBoolean(false); - - private final Runnable runnable; - - public OverSizedRunnable(final Runnable runnable) { - this.runnable = runnable; - } - - @Override - public void run() { - if (ran.compareAndSet(false, true)) { - runnable.run(); - } - } - } - - interface Blockable { - /** - * It will return true if the destination is leaving blocking. - */ - boolean checkReleasedMemory(); - } - - final class MemoryFreedRunnablesExecutor implements Runnable { - - private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>(); - - public void addRunnable(PagingManager.OverSizedRunnable runnable) { - onMemoryFreedRunnables.add(runnable); - } - - @Override - public void run() { - Runnable runnable; - - while ((runnable = onMemoryFreedRunnables.poll()) != null) { - runnable.run(); - } - } - - public boolean isEmpty() { - return onMemoryFreedRunnables.isEmpty(); - } - } + /** + * Use this when you have no refernce of an address. (anonymous AMQP Producers for example) + * @param runWhenAvailable + */ + void checkMemory(Runnable runWhenAvailable); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 27e8c0f..4dd8bf8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; * * @see PagingManager */ -public interface PagingStore extends ActiveMQComponent, RefCountMessageListener, PagingManager.Blockable { +public interface PagingStore extends ActiveMQComponent, RefCountMessageListener { SimpleString getAddress(); @@ -132,6 +132,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener, boolean isRejectingMessages(); /** + * It will return true if the destination is leaving blocking. + */ + boolean checkReleasedMemory(); + + /** * Write lock the PagingStore. * * @param timeout milliseconds to wait for the lock. If value is {@literal -1} then wait http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 878f918..8893984 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -20,8 +20,10 @@ import java.nio.file.FileStore; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -39,6 +41,7 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.jboss.logging.Logger; public final class PagingManagerImpl implements PagingManager { @@ -57,7 +60,7 @@ public final class PagingManagerImpl implements PagingManager { */ private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock(); - private final Set<Blockable> blockedStored = new ConcurrentHashSet<>(); + private final Set<PagingStore> blockedStored = new ConcurrentHashSet<>(); private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<>(); @@ -75,13 +78,14 @@ public final class PagingManagerImpl implements PagingManager { private volatile boolean diskFull = false; + private final Executor memoryExecutor; + + private final Queue<Runnable> memoryCallback = new ConcurrentLinkedQueue<>(); + private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>(); private ActiveMQScheduledComponent scheduledComponent = null; - private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor(); - - private final Executor executor; // Static // -------------------------------------------------------------------------------------------------------------------------- @@ -106,7 +110,7 @@ public final class PagingManagerImpl implements PagingManager { this.addressSettingsRepository = addressSettingsRepository; addressSettingsRepository.registerListener(this); this.maxSize = maxSize; - executor = pagingStoreFactory.newExecutor(); + this.memoryExecutor = pagingSPI.newExecutor(); } public PagingManagerImpl(final PagingStoreFactory pagingSPI, @@ -115,7 +119,7 @@ public final class PagingManagerImpl implements PagingManager { } @Override - public void addBlockedStore(Blockable store) { + public void addBlockedStore(PagingStore store) { blockedStored.add(store); } @@ -157,42 +161,18 @@ public final class PagingManagerImpl implements PagingManager { return globalSizeBytes.get(); } - @Override - public boolean checkMemory(final Runnable runWhenAvailable) { - if (isGlobalFull()) { - OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable); - - memoryFreedRunnablesExecutor.addRunnable(ourRunnable); - addBlockedStore(() -> { - if (!isGlobalFull()) { - if (!memoryFreedRunnablesExecutor.isEmpty()) { - executor.execute(memoryFreedRunnablesExecutor); - ActiveMQServerLogger.LOGGER.unblockingGlobalMessageProduction(getGlobalSize()); - return true; - } - } - return false; - }); - - if (isDiskFull()) { - ActiveMQServerLogger.LOGGER.blockingGlobalDiskFull(); - } else { - ActiveMQServerLogger.LOGGER.blockingGlobalMessageProduction(getGlobalSize()); - } - - return true; - } - - runWhenAvailable.run(); - - return true; - } - protected void checkMemoryRelease() { if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) { - Iterator<Blockable> storeIterator = blockedStored.iterator(); + if (!memoryCallback.isEmpty()) { + if (memoryExecutor != null) { + memoryExecutor.execute(this::memoryReleased); + } else { + memoryReleased(); + } + } + Iterator<PagingStore> storeIterator = blockedStored.iterator(); while (storeIterator.hasNext()) { - Blockable store = storeIterator.next(); + PagingStore store = storeIterator.next(); if (store.checkReleasedMemory()) { storeIterator.remove(); } @@ -223,7 +203,7 @@ public final class PagingManagerImpl implements PagingManager { @Override public void under(FileStore store, double usage) { - if (diskFull) { + if (diskFull || !blockedStored.isEmpty() || !memoryCallback.isEmpty()) { ActiveMQServerLogger.LOGGER.diskCapacityRestored(); diskFull = false; checkMemoryRelease(); @@ -242,6 +222,27 @@ public final class PagingManagerImpl implements PagingManager { } @Override + public void checkMemory(final Runnable runWhenAvailable) { + + if (isGlobalFull()) { + memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable)); + return; + } + runWhenAvailable.run(); + } + + + private void memoryReleased() { + Runnable runnable; + + while ((runnable = memoryCallback.poll()) != null) { + runnable.run(); + } + } + + + + @Override public boolean isGlobalFull() { return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 74212ce..5f0d3c8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -23,7 +23,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -60,6 +62,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.jboss.logging.Logger; /** @@ -639,7 +642,16 @@ public class PagingStoreImpl implements PagingStore { } - private final PagingManager.MemoryFreedRunnablesExecutor memoryFreedRunnablesExecutor = new PagingManager.MemoryFreedRunnablesExecutor(); + private final Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>(); + + private void memoryReleased() { + Runnable runnable; + + while ((runnable = onMemoryFreedRunnables.poll()) != null) { + runnable.run(); + } + } + @Override public boolean checkMemory(final Runnable runWhenAvailable) { @@ -650,9 +662,8 @@ public class PagingStoreImpl implements PagingStore { } } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) { - PagingManager.OverSizedRunnable ourRunnable = new PagingManager.OverSizedRunnable(runWhenAvailable); - memoryFreedRunnablesExecutor.addRunnable(ourRunnable); + onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); // We check again to avoid a race condition where the size can come down just after the element // has been added, but the check to execute was done before the element was added @@ -660,7 +671,7 @@ public class PagingStoreImpl implements PagingStore { // MUCH better performance in a highly concurrent environment if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize || maxSize < 0)) { // run it now - ourRunnable.run(); + runWhenAvailable.run(); } else { if (usingGlobalMaxSize || pagingManager.isDiskFull()) { pagingManager.addBlockedStore(this); @@ -719,8 +730,8 @@ public class PagingStoreImpl implements PagingStore { public boolean checkReleaseMemory(boolean globalOversized, long newSize) { if (!globalOversized && (newSize <= maxSize || maxSize < 0)) { - if (!memoryFreedRunnablesExecutor.isEmpty()) { - executor.execute(memoryFreedRunnablesExecutor); + if (!onMemoryFreedRunnables.isEmpty()) { + executor.execute(this::memoryReleased); if (blocking.get()) { ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize); blocking.set(false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 96fffe5..b10d652 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1950,17 +1950,4 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 224095, value = "Error updating Consumer Count: {0}", format = Message.Format.MESSAGE_FORMAT) void consumerCountError(String reason); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 224096, value = "Disk Full! Blocking message production. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT) - void blockingGlobalDiskFull(); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 224097, value = "Blocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT) - void blockingGlobalMessageProduction(long globalSize); - - @LogMessage(level = Logger.Level.INFO) - @Message(id = 224098, value = "Unblocking message production; size is currently: {0} bytes;", format = Message.Format.MESSAGE_FORMAT) - void unblockingGlobalMessageProduction(long globalSize); - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java index 957661c..ad59117 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java @@ -150,14 +150,11 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { public interface Callback { - default void tick(FileStore store, double usage) { - } + void tick(FileStore store, double usage); - default void over(FileStore store, double usage) { - } + void over(FileStore store, double usage); - default void under(FileStore store, double usage) { - } + void under(FileStore store, double usage); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java index e4f27c3..b91d3de 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java @@ -137,6 +137,16 @@ public class FileStoreMonitorTest extends ActiveMQTestBase { System.out.println("TickS::" + usage); latch.countDown(); } + + @Override + public void over(FileStore store, double usage) { + + } + + @Override + public void under(FileStore store, double usage) { + + } }); storeMonitor.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java index d664013..0e0f86d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java @@ -25,6 +25,7 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Assert; import org.junit.Test; import java.net.URI; @@ -45,6 +46,11 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport { FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0); final CountDownLatch latch = new CountDownLatch(1); monitor.addCallback(new FileStoreMonitor.Callback() { + + @Override + public void tick(FileStore store, double usage) { + } + @Override public void over(FileStore store, double usage) { latch.countDown(); @@ -53,7 +59,8 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport { public void under(FileStore store, double usage) { } }); - latch.await(2, TimeUnit.SECONDS); + + Assert.assertTrue(latch.await(1, TimeUnit.MINUTES)); AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT)); AmqpConnection connection = addConnection(client.connect()); @@ -61,27 +68,65 @@ public class GlobalDiskFullTest extends AmqpClientTestSupport { try { AmqpSession session = connection.createSession(); AmqpSender sender = session.createSender(getQueueName()); - final AmqpMessage message = new AmqpMessage(); byte[] payload = new byte[1000]; - message.setBytes(payload); - - sender.setSendTimeout(1000); - sender.send(message); - org.apache.activemq.artemis.core.server.Queue queueView = getProxyToQueue(getQueueName()); - assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount()); AmqpSender anonSender = session.createSender(); - final AmqpMessage message1 = new AmqpMessage(); - message1.setBytes(payload); - message1.setAddress(getQueueName()); - anonSender.setSendTimeout(1000); - anonSender.send(message1); + CountDownLatch sentWithName = new CountDownLatch(1); + CountDownLatch sentAnon = new CountDownLatch(1); + + Thread threadWithName = new Thread() { + @Override + public void run() { + + try { + final AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + sender.setSendTimeout(-1); + sender.send(message); + } catch (Exception e) { + e.printStackTrace(); + } finally { + sentWithName.countDown(); + } + } + }; + + threadWithName.start(); + + + Thread threadWithAnon = new Thread() { + @Override + public void run() { + try { + final AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + anonSender.setSendTimeout(-1); + message.setAddress(getQueueName()); + anonSender.send(message); + } catch (Exception e) { + e.printStackTrace(); + } finally { + sentAnon.countDown(); + } + } + }; + + threadWithAnon.start(); + + Assert.assertFalse("Thread sender should be blocked", sentWithName.await(500, TimeUnit.MILLISECONDS)); + Assert.assertFalse("Thread sender anonymous should be blocked", sentAnon.await(500, TimeUnit.MILLISECONDS)); + + monitor.setMaxUsage(100.0); - queueView = getProxyToQueue(getQueueName()); - assertEquals("shouldn't receive any messages", 0, queueView.getMessageCount()); + Assert.assertTrue("Thread sender should be released", sentWithName.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("Thread sender anonymous should be released", sentAnon.await(30, TimeUnit.SECONDS)); + threadWithName.join(TimeUnit.SECONDS.toMillis(30)); + threadWithAnon.join(TimeUnit.SECONDS.toMillis(30)); + Assert.assertFalse(threadWithName.isAlive()); + Assert.assertFalse(threadWithAnon.isAlive()); } finally { connection.close(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0e36e072/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java index 3431655..94a9d79 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java @@ -30,7 +30,12 @@ import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; public final class FakePagingManager implements PagingManager { @Override - public void addBlockedStore(Blockable store) { + public void addBlockedStore(PagingStore store) { + + } + + @Override + public void checkMemory(Runnable runWhenAvailable) { } @@ -115,11 +120,6 @@ public final class FakePagingManager implements PagingManager { return false; } - @Override - public boolean checkMemory(Runnable runnable) { - return false; - } - /* * (non-Javadoc) * @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()
