This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a25650ca3e ARTEMIS-5428 Hardening of the broker around PagingWrites /
Shutdown
a25650ca3e is described below
commit a25650ca3ec1ba524748f992bd2a5bf315136e49
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Apr 14 16:43:26 2025 -0400
ARTEMIS-5428 Hardening of the broker around PagingWrites / Shutdown
While the broker is being stopped, and if the system is paging, I
recently found a few issues that I am addressing here:
- When the server is stopped, the IDGenerator could be used after
the snapshot is generated, what could lead to data loss in case of a
regular shutdown while system under heavy load
- delayed Transaction could be completed after exceptions
(the markRollbackOnly or rollback should clear any pending state)
- The Acceptor notification is being called after the IDGenerator is
stopped (found this by accident after stopping the IDGEnerator).
---
.../artemis/utils/actors/ArtemisExecutor.java | 4 +
.../core/protocol/openwire/OpenWireConnection.java | 4 +
.../core/management/impl/AcceptorControlImpl.java | 1 +
.../cursor/impl/PageSubscriptionCounterImpl.java | 3 +
.../artemis/core/paging/impl/PageTimedWriter.java | 122 +++++--
.../artemis/core/paging/impl/PagingStoreImpl.java | 13 +-
.../artemis/core/persistence/StorageManager.java | 14 +
.../journal/AbstractJournalStorageManager.java | 14 +-
.../impl/journal/BatchingIDGenerator.java | 59 +++-
.../impl/journal/JournalStorageManager.java | 33 +-
.../core/postoffice/impl/PostOfficeImpl.java | 1 +
.../core/remoting/impl/invm/InVMAcceptor.java | 11 +-
.../core/remoting/impl/netty/NettyAcceptor.java | 75 ++--
.../core/remoting/server/RemotingService.java | 5 +
.../remoting/server/impl/RemotingServiceImpl.java | 55 ++-
.../artemis/core/server/ActiveMQMessageBundle.java | 4 +
.../core/server/impl/ActiveMQServerImpl.java | 46 ++-
.../core/transaction/impl/TransactionImpl.java | 31 +-
.../artemis/spi/core/remoting/Acceptor.java | 2 +
.../paging/impl/PagingManagerImplAccessor.java | 11 +
.../paging/impl/PagingManagerTestAccessor.java | 38 --
...lAccessor.java => PagingStoreImplAccessor.java} | 18 +-
.../integration/bridge/BridgeSimulationTest.java | 194 ++++++++++
.../integration/management/QueueControlTest.java | 4 +-
.../integration/paging/MaxMessagesPagingTest.java | 6 +-
.../transaction/impl/TransactionImplAccessor.java} | 16 +-
.../core/paging/impl/PageTimedWriterUnitTest.java | 392 ++++++++++++++++++---
.../persistence/impl/BatchIDGeneratorUnitTest.java | 12 +-
.../server/impl/fake/FakeAcceptorFactory.java | 5 +
29 files changed, 964 insertions(+), 229 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 43bbd9cf0c..f878af49a0 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -41,6 +41,10 @@ public interface ArtemisExecutor extends Executor {
return 0;
}
+ default boolean inHandler() {
+ return false;
+ }
+
/**
* To be used to flush an executor from a different thread.
* <b>WARNING</b>: Do not call this within the executor. That would be
stoopid ;)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index cd46df13bc..732606594b 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -690,6 +690,10 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
}
private void disconnect(ActiveMQException me, String reason, boolean fail) {
+ ThresholdActor<Command> localActor = openWireActor;
+ if (localActor != null) {
+ localActor.shutdown();
+ }
if (context == null || destroyed) {
return;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AcceptorControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AcceptorControlImpl.java
index dd1ebd5045..8c1648cd88 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AcceptorControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AcceptorControlImpl.java
@@ -136,6 +136,7 @@ public class AcceptorControlImpl extends AbstractControl
implements AcceptorCont
}
clearIO();
try {
+ acceptor.notifyStop();
acceptor.stop();
} finally {
blockOnIO();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 807bd811ae..b267f4232e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -248,6 +248,9 @@ public class PageSubscriptionCounterImpl extends
BasePagingCounter {
try (ArtemisCloseable lock = storage.closeableReadLock()) {
synchronized (this) {
if (recordID >= 0) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Deleting page counter with recordID={}, using
TX={}", this.recordID, tx.getID());
+ }
storage.deletePageCounter(tx.getID(), this.recordID);
tx.setContainsPersistent();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
index 4725c2d7ae..a6f584f677 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
@@ -25,7 +25,9 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -34,6 +36,7 @@ import
org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
+import org.apache.activemq.artemis.utils.ReusableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +59,7 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
private final int maxCredits;
private static final AtomicIntegerFieldUpdater<PageTimedWriter>
pendingTasksUpdater =
AtomicIntegerFieldUpdater.newUpdater(PageTimedWriter.class, "pendingTasks");
+ private final ReusableLatch pendingProcessings = new ReusableLatch(0);
public boolean hasPendingIO() {
return pendingTasksUpdater.get(this) > 0;
@@ -94,9 +98,15 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
}
@Override
- public synchronized void stop() {
- super.stop();
- processMessages();
+ public void stop() {
+ synchronized (this) {
+ super.stop();
+ }
+ try {
+ pendingProcessings.await(30, TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
}
/**
@@ -113,16 +123,21 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
Transaction tx,
RouteContextList listCtx) {
- if (!isStarted()) {
- throw new IllegalStateException("PageWriter Service is stopped");
- }
+ logger.trace("Adding paged message {} to paged writer", message);
+
int credits = Math.min(message.getEncodeSize() +
PageReadWriter.SIZE_RECORD, maxCredits);
writeCredits.acquireUninterruptibly(credits);
- if (tx != null) {
- // this will delay the commit record until the portion of this task
has been completed
- tx.delay();
- }
synchronized (this) {
+ if (!isStarted()) {
+ writeCredits.release(credits);
+ throw new IllegalStateException("PageWriter Service is stopped");
+ }
+
+ if (tx != null) {
+ // this will delay the commit record until the portion of this
task has been completed
+ tx.delay();
+ }
+
final boolean replicated = storageManager.isReplicated();
PageEvent event = new PageEvent(context, message, tx, listCtx,
credits, replicated);
context.storeLineUp();
@@ -132,13 +147,16 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
this.pageEvents.add(event);
delay();
}
-
}
- private synchronized PageEvent[] extractPendingEvents() {
+ private synchronized PageEvent[] extractPendingEvents() {
+ if (!isStarted()) {
+ return null;
+ }
if (pageEvents.isEmpty()) {
return null;
}
+ pendingProcessings.countUp();
PageEvent[] pendingsWrites = new PageEvent[pageEvents.size()];
pendingsWrites = pageEvents.toArray(pendingsWrites);
pageEvents.clear();
@@ -147,6 +165,10 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
@Override
public void run() {
+ if (!isStarted()) {
+ return;
+ }
+
ArtemisCloseable closeable = storageManager.closeableReadLock(true);
if (closeable == null) {
logger.trace("Delaying PagedTimedWriter as it's currently locked");
@@ -161,51 +183,87 @@ public class PageTimedWriter extends
ActiveMQScheduledComponent {
}
protected void processMessages() {
- PageEvent[] pendingEvents = extractPendingEvents();
- if (pendingEvents == null) {
- return;
+ PageEvent[] pendingEvents;
+ boolean wasStarted;
+ synchronized (this) {
+ pendingEvents = extractPendingEvents();
+ if (pendingEvents == null) {
+ return;
+ }
+ wasStarted = isStarted();
}
+
OperationContext beforeContext = OperationContextImpl.getContext();
try {
- boolean requireSync = false;
- for (PageEvent event : pendingEvents) {
- OperationContextImpl.setContext(event.context);
- pagingStore.directWritePage(event.message, false,
event.replicated);
+ if (wasStarted) {
+ boolean requireSync = false;
+ for (PageEvent event : pendingEvents) {
+ OperationContextImpl.setContext(event.context);
+ logger.trace("writing message {}", event.message);
+ pagingStore.directWritePage(event.message, false,
event.replicated);
- if (event.tx != null || syncNonTX) {
- requireSync = true;
+ if (event.tx != null || syncNonTX) {
+ requireSync = true;
+ }
+ }
+ if (requireSync) {
+ logger.trace("performing sync");
+ performSync();
+ }
+ for (PageEvent event : pendingEvents) {
+ if (event.tx != null) {
+ event.tx.delayDone();
+ }
+ }
+ logger.trace("Completing events");
+ for (PageEvent event : pendingEvents) {
+ event.context.done();
}
}
- if (requireSync) {
- performSync();
- }
+ } catch (Throwable e) {
+ logger.warn("Captured Exception {}", e.getMessage(), e);
+ ActiveMQException amqException = new
ActiveMQIllegalStateException(e.getMessage());
+ amqException.initCause(e);
+
for (PageEvent event : pendingEvents) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Error processing Message {}, tx={} ",
event.message, event.tx);
+ }
if (event.tx != null) {
- event.tx.delayDone();
+ if (logger.isTraceEnabled()) {
+ logger.trace("tx.markRollbackOnly on TX {}",
event.tx.getID());
+ }
+ event.tx.markAsRollbackOnly(amqException);
}
}
- } catch (Exception e) {
- logger.warn(e.getMessage(), e);
// In case of failure, The context should propagate an exception to
the client
// We send an exception to the client even on the case of a failure
// to avoid possible locks and the client not getting the exception
back
executor.execute(() -> {
+ logger.trace("onError processing for callback", e);
// The onError has to be called from a separate executor
// because this PagedWriter will be holding the lock on the
storage manager
// and this might lead to a deadlock
for (PageEvent event : pendingEvents) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("onError {}, error={}", event.message,
e.getMessage());
+ }
event.context.onError(ActiveMQExceptionType.IO_ERROR.getCode(),
e.getClass() + " during ioSync for paging on " + pagingStore.getStoreName() +
": " + e.getMessage());
}
});
} finally {
- for (PageEvent event : pendingEvents) {
- event.context.done();
- pendingTasksUpdater.decrementAndGet(this);
- writeCredits.release(event.credits);
+ try {
+ for (PageEvent event : pendingEvents) {
+ pendingTasksUpdater.decrementAndGet(this);
+ writeCredits.release(event.credits);
+ }
+ OperationContextImpl.setContext(beforeContext);
+ } catch (Throwable t) {
+ logger.debug(t.getMessage(), t);
}
- OperationContextImpl.setContext(beforeContext);
+ pendingProcessings.countDown();
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 0ff67d3244..3507fd3759 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -97,7 +97,7 @@ public class PagingStoreImpl implements PagingStore {
private final PagingStoreFactory storeFactory;
// this is used to batch and sync into paging asynchronously
- private final PageTimedWriter timedWriter;
+ private PageTimedWriter timedWriter;
private long maxSize;
@@ -232,6 +232,11 @@ public class PagingStoreImpl implements PagingStore {
return localWriter;
}
+ // for tests, used through an accessor
+ protected void replacePagedTimedWriter(PageTimedWriter writer) {
+ this.timedWriter = writer;
+ }
+
private void overSized() {
full = true;
}
@@ -1383,7 +1388,7 @@ public class PagingStoreImpl implements PagingStore {
return true;
}
- void directWritePage(PagedMessage pagedMessage, boolean lineUp, boolean
originalReplicated) throws Exception {
+ protected void directWritePage(PagedMessage pagedMessage, boolean lineUp,
boolean originalReplicated) throws Exception {
int bytesToWrite = pagedMessage.getEncodeSize() +
PageReadWriter.SIZE_RECORD;
currentPageSize += bytesToWrite;
@@ -1398,6 +1403,10 @@ public class PagingStoreImpl implements PagingStore {
// doing this will give us a possibility of recovering the page counters
final Page page = currentPage;
+ if (!page.isOpen()) {
+ page.open(false);
+ }
+
page.write(pagedMessage, lineUp, originalReplicated);
if (logger.isTraceEnabled()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 53ff9c92ff..fd8404f3b7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence;
import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -65,6 +66,7 @@ import
org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.IDGenerator;
@@ -96,6 +98,14 @@ public interface StorageManager extends MapStorageManager,
IDGenerator, ActiveMQ
default void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception
{
}
+ default void writeLock() {
+
+ }
+
+ default void writeUnlock() {
+
+ }
+
default SequentialFileFactory getJournalSequentialFileFactory() {
return null;
}
@@ -129,6 +139,10 @@ public interface StorageManager extends MapStorageManager,
IDGenerator, ActiveMQ
*/
void stop(boolean ioCriticalError, boolean sendFailover) throws Exception;
+ default Set<RemotingConnection> getUsedConnections() {
+ return Collections.emptySet();
+ }
+
// Message related operations
void pageClosed(SimpleString address, long pageNumber);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 7eeb0e0613..32aae12d74 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -455,14 +455,22 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
}
}
+ @Override
+ public void writeLock() {
+ storageManagerLock.writeLock().lock();
+ }
+
+ @Override
+ public void writeUnlock() {
+ storageManagerLock.writeLock().unlock();
+ }
+
@Override
public ArtemisCloseable closeableReadLock(boolean tryLock) {
if (reentrant.get()) {
return dummyCloseable;
}
- reentrant.set(true);
-
CriticalCloseable measure = measureCritical(CRITICAL_STORE);
if (tryLock) {
@@ -473,6 +481,8 @@ public abstract class AbstractJournalStorageManager extends
CriticalComponentImp
storageManagerLock.readLock().lock();
}
+ reentrant.set(true);
+
if (CriticalMeasure.isDummy(measure)) {
// The next statement could have been called like this:
// return storageManagerLock.readLock()::unlock;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java
index 3e7a2bb30d..553cb4050d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java
@@ -24,12 +24,15 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* An ID generator that allocates a batch of IDs of size {@link
#checkpointSize} and records the ID in the journal only
@@ -47,10 +50,14 @@ public final class BatchingIDGenerator implements
IDGenerator {
private volatile long nextID;
+ private boolean started = true;
+
private final StorageManager storageManager;
private List<Long> cleanupRecords = null;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
public BatchingIDGenerator(final long start, final long checkpointSize,
final StorageManager storageManager) {
counter = new AtomicLong(start);
@@ -62,6 +69,19 @@ public final class BatchingIDGenerator implements
IDGenerator {
this.storageManager = storageManager;
}
+ public void stop() {
+ lock.writeLock().lock();
+ try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Stopping generator", new Exception("Trace"));
+ }
+ persistCurrentID();
+ started = false;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
public void persistCurrentID() {
final long recordID = counter.incrementAndGet();
storeID(recordID, recordID);
@@ -86,15 +106,23 @@ public final class BatchingIDGenerator implements
IDGenerator {
}
public void loadState(final long journalID, final ActiveMQBuffer buffer) {
- addCleanupRecord(journalID);
- IDCounterEncoding encoding = new IDCounterEncoding();
+ lock.writeLock().lock();
+ try {
+ addCleanupRecord(journalID);
+ IDCounterEncoding encoding = new IDCounterEncoding();
- encoding.decode(buffer);
+ encoding.decode(buffer);
- // Keep nextID and counter the same, the next generateID will update the
checkpoint
- nextID = encoding.id + 1;
+ // Keep nextID and counter the same, the next generateID will update
the checkpoint
+ nextID = encoding.id + 1;
- counter.set(nextID);
+ counter.set(nextID);
+
+ // if we are loading we are restarting it
+ started = true;
+ } finally {
+ lock.writeLock().unlock();
+ }
}
// for testcases
@@ -107,12 +135,23 @@ public final class BatchingIDGenerator implements
IDGenerator {
@Override
public long generateID() {
- long id = counter.getAndIncrement();
+ lock.readLock().lock();
+ try {
+ if (!started) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("BatchIDGenerator is not supposed to be used", new
Exception("trace"));
+ }
+ throw ActiveMQMessageBundle.BUNDLE.idGeneratorStopped();
+ }
+ long id = counter.getAndIncrement();
- if (id >= nextID) {
- saveCheckPoint(id);
+ if (id >= nextID) {
+ saveCheckPoint(id);
+ }
+ return id;
+ } finally {
+ lock.readLock().unlock();
}
- return id;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 54d0624c93..d863ed310e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -20,8 +20,10 @@ package
org.apache.activemq.artemis.core.persistence.impl.journal;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -64,6 +66,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
@@ -119,6 +122,17 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
super(config, analyzer, executorFactory, null, ioExecutors,
criticalErrorListener);
}
+ @Override
+ public Set<RemotingConnection> getUsedConnections() {
+ if (replicator == null) {
+ return Collections.emptySet();
+ } else {
+ HashSet<RemotingConnection> usedConnections = new HashSet<>();
+ usedConnections.add(replicator.getBackupTransportConnection());
+ return usedConnections;
+ }
+ }
+
@Override
public SequentialFileFactory getJournalSequentialFileFactory() {
return journalFF;
@@ -271,9 +285,9 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
if (!ioCriticalError) {
performCachedLargeMessageDeletes();
- // Must call close to make sure last id is persisted
+ // Must call stop to make sure last id is persisted
if (journalLoaded && idGenerator != null)
- idGenerator.persistCurrentID();
+ idGenerator.stop();
}
final CountDownLatch latch = new CountDownLatch(1);
@@ -637,16 +651,11 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
originalBindingsJournal.replicationSyncPreserveOldFiles();
originalMessageJournal.replicationSyncPreserveOldFiles();
- pagingManager.lock();
- try {
- pagingManager.disableCleanup();
- messageFiles = prepareJournalForCopy(originalMessageJournal,
JournalContent.MESSAGES, nodeID, autoFailBack);
- bindingsFiles =
prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID,
autoFailBack);
- pageFilesToSync = getPageInformationForSync(pagingManager);
- pendingLargeMessages = recoverPendingLargeMessages();
- } finally {
- pagingManager.unlock();
- }
+ pagingManager.disableCleanup();
+ messageFiles = prepareJournalForCopy(originalMessageJournal,
JournalContent.MESSAGES, nodeID, autoFailBack);
+ bindingsFiles = prepareJournalForCopy(originalBindingsJournal,
JournalContent.BINDINGS, nodeID, autoFailBack);
+ pageFilesToSync = getPageInformationForSync(pagingManager);
+ pendingLargeMessages = recoverPendingLargeMessages();
} finally {
originalMessageJournal.synchronizationUnlock();
originalBindingsJournal.synchronizationUnlock();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index c442501bed..e45e9dfbf9 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1906,6 +1906,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
// if the message is being sent from the bridge, we just ignore the
duplicate id, and use the internal one
final DuplicateIDCache cacheBridge =
getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction()))
{
+ logger.trace("Message {} hit a bridge duplicate", message);
context.getTransaction().rollback();
message.usageDown(); // this will cause large message delete
return DuplicateCheckResult.DuplicateNotStartedTX;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
index 4e7279777b..0d258081af 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
@@ -165,6 +165,13 @@ public final class InVMAcceptor extends AbstractAcceptor {
connections.clear();
+ started = false;
+
+ paused = false;
+ }
+
+ @Override
+ public void notifyStop() {
if (notificationService != null) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(SimpleString.of("factory"),
SimpleString.of(InVMAcceptorFactory.class.getName()));
@@ -176,10 +183,6 @@ public final class InVMAcceptor extends AbstractAcceptor {
ActiveMQServerLogger.LOGGER.failedToSendNotification(e);
}
}
-
- started = false;
-
- paused = false;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 80879c2484..6a089813ec 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -748,47 +748,63 @@ public class NettyAcceptor extends AbstractAcceptor {
callback.run();
return;
}
+ try {
- if (protocolHandler != null) {
- protocolHandler.close();
- }
+ if (protocolHandler != null) {
+ protocolHandler.close();
+ }
- if (batchFlusherFuture != null) {
- batchFlusherFuture.cancel(false);
+ if (batchFlusherFuture != null) {
+ batchFlusherFuture.cancel(false);
- flusher.cancel();
+ flusher.cancel();
- flusher = null;
+ flusher = null;
- batchFlusherFuture = null;
- }
+ batchFlusherFuture = null;
+ }
- // serverChannelGroup has been unbound in pause()
- if (serverChannelGroup != null) {
- serverChannelGroup.close().awaitUninterruptibly();
- }
+ // serverChannelGroup has been unbound in pause()
+ if (serverChannelGroup != null) {
+ serverChannelGroup.close().awaitUninterruptibly();
+ }
- if (channelGroup != null) {
- ChannelGroupFuture future =
channelGroup.close().awaitUninterruptibly();
+ if (channelGroup != null) {
+ ChannelGroupFuture future =
channelGroup.close().awaitUninterruptibly();
- if (!future.isSuccess()) {
- ActiveMQServerLogger.LOGGER.nettyChannelGroupError();
- for (Channel channel : future.group()) {
- if (channel.isActive()) {
- ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel,
channel.remoteAddress());
+ if (!future.isSuccess()) {
+ ActiveMQServerLogger.LOGGER.nettyChannelGroupError();
+ for (Channel channel : future.group()) {
+ if (channel.isActive()) {
+
ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel,
channel.remoteAddress());
+ }
}
}
}
- }
- channelClazz = null;
+ channelClazz = null;
- for (Connection connection : connections.values()) {
- listener.connectionDestroyed(connection.getID(), true);
- }
+ for (Connection connection : connections.values()) {
+ listener.connectionDestroyed(connection.getID(), true);
+ }
- connections.clear();
+ connections.clear();
+
+ paused = false;
+ } finally {
+ if (eventLoopGroup == null) {
+ callback.run();
+ } else {
+ // Shutdown the EventLoopGroup if no new task was added for 100ms
or if
+ // 3000ms elapsed.
+ eventLoopGroup.shutdownGracefully(quietPeriod, shutdownTimeout,
TimeUnit.MILLISECONDS).addListener(f -> callback.run());
+ eventLoopGroup = null;
+ }
+ }
+ }
+ @Override
+ public void notifyStop() {
if (notificationService != null) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(SimpleString.of("factory"),
SimpleString.of(NettyAcceptorFactory.class.getName()));
@@ -801,13 +817,6 @@ public class NettyAcceptor extends AbstractAcceptor {
ActiveMQServerLogger.LOGGER.failedToSendNotification(e);
}
}
-
- paused = false;
-
- // Shutdown the EventLoopGroup if no new task was added for 100ms or if
- // 3000ms elapsed.
- eventLoopGroup.shutdownGracefully(quietPeriod, shutdownTimeout,
TimeUnit.MILLISECONDS).addListener(f -> callback.run());
- eventLoopGroup = null;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index 3ffae9576a..652c89fd86 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -75,6 +75,11 @@ public interface RemotingService {
boolean removeOutgoingInterceptor(BaseInterceptor interceptor);
+ void notifyStop();
+
+ /** The Prepare stop will close all the connections however it will use the
one used by storage manager */
+ void prepareStop(boolean criticalError, Set<RemotingConnection>
ignoreConnections) throws Exception;
+
void stop(boolean criticalError) throws Exception;
void start() throws Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index a2ec85ddcd..0ac17307c7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -372,17 +372,22 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
}
@Override
- public void stop(final boolean criticalError) throws Exception {
- if (!started) {
- return;
- }
- SSLContextFactoryProvider.getSSLContextFactory().clearSSLContexts();
- OpenSSLContextFactory openSSLContextFactory =
OpenSSLContextFactoryProvider.getOpenSSLContextFactory();
- if (openSSLContextFactory != null) {
- openSSLContextFactory.clearSslContexts();
+ public void notifyStop() {
+
+ // We need to stop them accepting first so no new connections are
accepted after we send the disconnect message
+ for (Acceptor acceptor : acceptors.values()) {
+ logger.debug("send stop notifications on acceptor {}", acceptor);
+
+ try {
+ acceptor.notifyStop();
+ } catch (Throwable t) {
+
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
+ }
}
+ }
- failureCheckAndFlushThread.close(criticalError);
+ @Override
+ public void prepareStop(boolean criticalError, Set<RemotingConnection>
ignoreList) throws Exception {
// We need to stop them accepting first so no new connections are
accepted after we send the disconnect message
for (Acceptor acceptor : acceptors.values()) {
@@ -396,7 +401,7 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
}
- logger.debug("Sending disconnect on client connections");
+ logger.info("Sending disconnect on client connections");
Set<ConnectionEntry> connectionEntries = new
HashSet<>(connections.values());
@@ -405,10 +410,34 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
for (ConnectionEntry entry : connectionEntries) {
RemotingConnection conn = entry.connection;
- logger.trace("Sending connection.disconnection packet to {}", conn);
+ if (ignoreList.contains(conn)) {
+ logger.debug("ignoring connection {} during the close", conn);
+ } else {
+ logger.debug("Sending disconnect on connection {} from server {}",
conn.getID(), server);
+ conn.disconnect(criticalError);
+ }
+ }
- conn.disconnect(criticalError);
+ }
+
+ @Override
+ public void stop(final boolean criticalError) throws Exception {
+ if (!started) {
+ return;
}
+ SSLContextFactoryProvider.getSSLContextFactory().clearSSLContexts();
+ OpenSSLContextFactory openSSLContextFactory =
OpenSSLContextFactoryProvider.getOpenSSLContextFactory();
+ if (openSSLContextFactory != null) {
+ openSSLContextFactory.clearSslContexts();
+ }
+
+ failureCheckAndFlushThread.close(criticalError);
+
+ // ActiveMQServerImpl already calls prepareStop
+ // however we call this again here for two reasons:
+ // I - ActiveMQServer might have ignored the one connection for
Replication
+ // II - this method could be called in other places for Embedding or
testing and the semantic must be kept the same
+ prepareStop(criticalError, Collections.emptySet());
CountDownLatch acceptorCountDownLatch = new
CountDownLatch(acceptors.size());
for (Acceptor acceptor : acceptors.values()) {
@@ -586,7 +615,7 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
AuditLogger.createdConnection(connection.getProtocolConnection() ==
null ? null : connection.getProtocolConnection().getProtocolName(),
connection.getID(), connection.getRemoteAddress());
}
if (logger.isDebugEnabled()) {
- logger.debug("Adding connection {}, we now have {}",
connection.getID(), connections.size());
+ logger.debug("Adding connection {}, we now have {} on server {}",
connection.getID(), connections.size(), server);
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 10783a770f..014616d8fa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -531,4 +531,8 @@ public interface ActiveMQMessageBundle {
@Message(id = 229256, value = "{} must be a positive power of 2 (actual
value: {})")
IllegalArgumentException positivePowerOfTwo(String name, Number val);
+
+ @Message(id = 229257, value = "IDGenerator has been stopped")
+ RuntimeException idGeneratorStopped();
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 082adebc7a..1ca6ffe6aa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1341,37 +1341,71 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
}
}
+ final RemotingService remotingService = this.remotingService;
+
+ if (remotingService != null) {
+ // The notification must be sent with the StorageManager still up
+ // this is because NotificationService will use
storageManager.generateID
+ remotingService.notifyStop();
+ }
+
+ // We start the preparation for the broker shutdown by first stopping
acceptors, and removing connections
+ // this is to avoid Exceptions while the broker is stopping as much as
possible
+ // by first stopping any pending connections before stopping the storage.
+ // if we stopped the journal first before remoting we could have more
exceptions being sent for the client.
+ try {
+ if (remotingService != null) {
+ // it will close all connections except to the one used by
replication
+ remotingService.prepareStop(criticalIOError, storageManager !=
null ? storageManager.getUsedConnections() : Collections.emptySet());
+ }
+ } catch (Throwable t) {
+
ActiveMQServerLogger.LOGGER.errorStoppingComponent(remotingService.getClass().getName(),
t);
+ }
+
+ stopComponent(pagingManager);
+
if (!criticalIOError && pagingManager != null) {
pagingManager.counterSnapshot();
}
- stopComponent(pagingManager);
+ final ManagementService managementService = this.managementService;
- if (storageManager != null)
+ // we have to disable management service before stopping the storage
+ // otherwise management would send notifications eventually
+ if (managementService != null) {
+ try {
+ managementService.enableNotifications(false);
+ } catch (Throwable t) {
+
ActiveMQServerLogger.LOGGER.errorStoppingComponent(managementService.getClass().getName(),
t);
+ }
+ }
+
+ if (storageManager != null) {
try {
storageManager.stop(criticalIOError, failoverOnServerShutdown);
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(storageManager.getClass().getName(),
t);
}
+ }
// We stop remotingService before otherwise we may lock the system in
case of a critical IO
// error shutdown
- final RemotingService remotingService = this.remotingService;
- if (remotingService != null)
+ if (remotingService != null) {
try {
remotingService.stop(criticalIOError);
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(remotingService.getClass().getName(),
t);
}
+ }
// Stop the management service after the remoting service to ensure all
acceptors are deregistered with JMX
- final ManagementService managementService = this.managementService;
- if (managementService != null)
+ if (managementService != null) {
try {
managementService.unregisterServer();
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(managementService.getClass().getName(),
t);
}
+ }
stopComponent(managementService);
stopComponent(resourceManager);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 83bf5616c2..a245865fc7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -399,6 +399,19 @@ public class TransactionImpl implements Transaction {
}
}
+ class NonPersistentDelay extends DelayedRunnable {
+ NonPersistentDelay(long id) {
+ super(id);
+ }
+
+ // a non persistent delay means a transaction with non persistent data
was delayed
+ // no commit was generated but we still need to wait completions and
delays before we move on
+ @Override
+ protected void actualRun() throws Exception {
+ }
+ }
+
+
class DelayedCommit extends DelayedRunnable {
DelayedCommit(long id) {
super(id);
@@ -414,7 +427,6 @@ public class TransactionImpl implements Transaction {
}
}
-
class DelayedPrepare extends DelayedRunnable {
long id;
Xid xid;
@@ -445,6 +457,10 @@ public class TransactionImpl implements Transaction {
storageManager.commit(id);
}
}
+ } else {
+ if (delayed > 0) {
+ delayedRunnable = new NonPersistentDelay(id);
+ }
}
state = State.COMMITTED;
@@ -478,6 +494,8 @@ public class TransactionImpl implements Transaction {
logger.trace("TransactionImpl::rollback::{}", this);
synchronized (timeoutLock) {
+ // if it was marked as prepare/commit while delay, it needs to be
cancelled
+ delayedRunnable = null;
if (state == State.ROLLEDBACK) {
// I don't think this could happen, but just in case
logger.debug("TransactionImpl::rollback::{} is being ignored",
this);
@@ -500,6 +518,10 @@ public class TransactionImpl implements Transaction {
private void internalRollback() throws Exception {
logger.trace("TransactionImpl::internalRollback {}", this);
+ // even though rollback already sets this as null
+ // I'm setting this again for other cases where internalRollback is
called
+ delayedRunnable = null;
+
beforeRollback();
try {
@@ -591,6 +613,9 @@ public class TransactionImpl implements Transaction {
logger.trace("TransactionImpl::{} marking rollbackOnly for {},
msg={}", this, exception.toString(), exception.getMessage());
}
+ // cancelling any delayed commit or prepare
+ delayedRunnable = null;
+
if (isEffective()) {
if (logger.isDebugEnabled()) {
logger.debug("Trying to mark transaction {} xid={} as
rollbackOnly but it was already effective (prepared, committed or
rolledback!)", id, xid);
@@ -629,6 +654,10 @@ public class TransactionImpl implements Transaction {
}
}
+ public int getPendingDelay() {
+ return delayed;
+ }
+
@Override
public synchronized void addOperation(final TransactionOperation operation)
{
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
index 0446cbaac8..2f0b08e7de 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
@@ -42,6 +42,8 @@ public interface Acceptor extends ActiveMQComponent {
*/
void pause();
+ void notifyStop();
+
/**
* This will update the list of interceptors for each ProtocolManager
inside the acceptor.
*/
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
index 4951a0f214..f8fda0ca7d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
@@ -16,8 +16,19 @@
*/
package org.apache.activemq.artemis.core.paging.impl;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.utils.SizeAwareMetric;
+
public class PagingManagerImplAccessor {
public static void setDiskFull(PagingManagerImpl pagingManager, boolean
diskFull) {
pagingManager.setDiskFull(diskFull);
}
+
+ public static void resetMaxSize(PagingManager pagingManager, long maxSize,
long maxElements) {
+ ((PagingManagerImpl)pagingManager).resetMaxSize(maxSize, maxElements);
+ }
+
+ public static SizeAwareMetric globalSizeAwareMetric(PagingManager
pagingManager) {
+ return ((PagingManagerImpl)pagingManager).getSizeAwareMetric();
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java
deleted file mode 100644
index b0f213f316..0000000000
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.paging.impl;
-
-import org.apache.activemq.artemis.core.paging.PagingManager;
-import org.apache.activemq.artemis.utils.SizeAwareMetric;
-
-/**
- * Use this class to access things that are meant on test only
- */
-public class PagingManagerTestAccessor {
-
- public static void resetMaxSize(PagingManager pagingManager, long maxSize,
long maxElements) {
- ((PagingManagerImpl)pagingManager).resetMaxSize(maxSize, maxElements);
- }
-
- public static SizeAwareMetric globalSizeAwareMetric(PagingManager
pagingManager) {
- return ((PagingManagerImpl)pagingManager).getSizeAwareMetric();
- }
-
- public static String debugMessages(Page page) throws Exception {
- return page.debugMessages();
- }
-}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImplAccessor.java
similarity index 58%
copy from
tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
copy to
tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImplAccessor.java
index 4951a0f214..3265e3e242 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImplAccessor.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,10 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.activemq.artemis.core.paging.impl;
-public class PagingManagerImplAccessor {
- public static void setDiskFull(PagingManagerImpl pagingManager, boolean
diskFull) {
- pagingManager.setDiskFull(diskFull);
+import org.apache.activemq.artemis.core.paging.PagingStore;
+
+public class PagingStoreImplAccessor {
+
+ public static void replacePageTimedWriter(PagingStore store,
PageTimedWriter newWriter) {
+ final PagingStoreImpl storeImpl = (PagingStoreImpl) store;
+ storeImpl.replacePagedTimedWriter(newWriter);
}
+
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeSimulationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeSimulationTest.java
new file mode 100644
index 0000000000..613253a218
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeSimulationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.bridge;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.paging.impl.PageTimedWriter;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImplAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * this test will simulate what a bridge sender would do with auto completes
+ */
+public class BridgeSimulationTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private ActiveMQServer server;
+
+ private final SimpleString address = SimpleString.of("address");
+
+ private final SimpleString queueName = SimpleString.of("queue");
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+
+ server = createServer(true);
+ server.start();
+ }
+
+ @Test
+ public void testSendAcknowledgements() throws Exception {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setConfirmationWindowSize(100 * 1024);
+
+ try (ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession(null, null, false, true, true, false,
1)) {
+
+ Queue queue =
server.createQueue(QueueConfiguration.of(queueName).setAddress(address).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+ ClientProducer prod = session.createProducer(address);
+
+ PagingStoreImpl pagingStore = (PagingStoreImpl)
queue.getPagingStore();
+
+ CountDownLatch allowRunning = new CountDownLatch(1);
+ CountDownLatch enteredSync = new CountDownLatch(1);
+ CountDownLatch allowSync = new CountDownLatch(1);
+ AtomicInteger doneSync = new AtomicInteger(0);
+
+ runAfter(allowRunning::countDown);
+ runAfter(allowSync::countDown);
+
+ PageTimedWriter newWriter = new PageTimedWriter(100_000,
server.getStorageManager(), pagingStore, server.getScheduledPool(),
server.getExecutorFactory().getExecutor(), true, 100) {
+ @Override
+ public void run() {
+ logger.info("newWriter waiting to run");
+ try {
+ allowRunning.await();
+ } catch (InterruptedException e) {
+ logger.warn(e.getMessage(), e);
+ Thread.currentThread().interrupt();
+
+ }
+ logger.info("newWriter running");
+ super.run();
+ logger.info("newWriter ran");
+ }
+
+ @Override
+ protected void performSync() throws Exception {
+ logger.info("newWriter waiting to perform sync");
+ enteredSync.countDown();
+ super.performSync();
+ try {
+ allowSync.await();
+ } catch (InterruptedException e) {
+ logger.warn(e.getMessage(), e);
+ }
+ doneSync.incrementAndGet();
+ logger.info("newWriter done with sync");
+ }
+ };
+
+ runAfter(newWriter::stop);
+
+ PageTimedWriter olderWriter = pagingStore.getPageTimedWriter();
+ olderWriter.stop();
+
+ newWriter.start();
+
+ PagingStoreImplAccessor.replacePageTimedWriter(pagingStore,
newWriter);
+
+ CountDownLatch ackDone = new CountDownLatch(1);
+
+ pagingStore.startPaging();
+
+ SendAcknowledgementHandler handler = new SendAcknowledgementHandler()
{
+ @Override
+ public void sendAcknowledged(Message message) {
+ logger.info("ACK Confirmation from {}", message);
+ ackDone.countDown();
+ }
+ };
+
+ session.setSendAcknowledgementHandler(handler);
+
+ ClientMessage message = session.createMessage(true);
+ message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS,
ByteBuffer.allocate(8).putLong(queue.getID()).array());
+ message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID,
RandomUtil.randomBytes(10));
+ prod.send(message);
+
+ logger.info("Sent..");
+
+ assertEquals(0, queue.getMessageCount());
+ assertFalse(ackDone.await(20, TimeUnit.MILLISECONDS));
+ allowRunning.countDown();
+ assertFalse(ackDone.await(20, TimeUnit.MILLISECONDS));
+ assertEquals(0, queue.getMessageCount());
+ assertTrue(enteredSync.await(10, TimeUnit.SECONDS));
+ assertFalse(ackDone.await(20, TimeUnit.MILLISECONDS));
+ assertEquals(0, queue.getMessageCount());
+ verifyReceive(locator, false);
+ allowSync.countDown();
+ assertTrue(ackDone.await(1000, TimeUnit.MILLISECONDS));
+ Wait.assertEquals(1L, queue::getMessageCount, 5000, 100);
+ verifyReceive(locator, true);
+ }
+
+ }
+
+ private void verifyReceive(ServerLocator locator, boolean receive) throws
Exception {
+ try (ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, true)) {
+ ClientConsumer consumer = session.createConsumer(queueName);
+ session.start();
+ if (receive) {
+ if (consumer.receive(5_000) == null) {
+ logger.warn("No message received");
+ fail("no message received");
+ }
+ } else {
+ if (consumer.receiveImmediate() != null) {
+ logger.warn("message was received");
+ fail("message was received");
+ }
+ }
+ }
+
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 149fd6e27c..012b169bee 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -78,7 +78,7 @@ import
org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
@@ -3151,7 +3151,7 @@ public class QueueControlTest extends ManagementTestBase {
final int MESSAGE_SIZE = 1024 * 3; // 3k
- PagingManagerTestAccessor.resetMaxSize(server.getPagingManager(), 10240,
0);
+ PagingManagerImplAccessor.resetMaxSize(server.getPagingManager(), 10240,
0);
clearDataRecreateServerDirs();
SimpleString address = RandomUtil.randomUUIDSimpleString();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java
index e2d0c9a404..6a0dcdd264 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java
@@ -46,7 +46,7 @@ import
org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -150,7 +150,7 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase
{
Wait.assertTrue(queue.getPagingStore()::isPaging);
- SizeAwareMetric globalSizeMetric =
PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager());
+ SizeAwareMetric globalSizeMetric =
PagingManagerImplAccessor.globalSizeAwareMetric(server.getPagingManager());
// this is validating the test is actually validating paging after over
elements
assertTrue(globalSizeMetric.isOverElements());
@@ -213,7 +213,7 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase
{
}
}
- SizeAwareMetric globalSizeMetric =
PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager());
+ SizeAwareMetric globalSizeMetric =
PagingManagerImplAccessor.globalSizeAwareMetric(server.getPagingManager());
// this is validating the test is actually validating paging after over
elements
assertTrue(globalSizeMetric.isOverElements());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplAccessor.java
similarity index 60%
copy from
tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
copy to
tests/unit-tests/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplAccessor.java
index 4951a0f214..9d9d66f88c 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplAccessor.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.paging.impl;
-public class PagingManagerImplAccessor {
- public static void setDiskFull(PagingManagerImpl pagingManager, boolean
diskFull) {
- pagingManager.setDiskFull(diskFull);
+package org.apache.activemq.artemis.core.transaction.impl;
+
+public class TransactionImplAccessor {
+
+ public static void setContainsPersistent(TransactionImpl tx, boolean
persistent) {
+ tx.containsPersistent = persistent;
}
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index 9067ef2d57..be8ce5531a 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -19,15 +19,20 @@ package
org.apache.activemq.artemis.tests.unit.core.paging.impl;
import javax.transaction.xa.Xid;
import java.lang.invoke.MethodHandles;
+import java.util.LinkedHashSet;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
@@ -49,14 +54,17 @@ import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import
org.apache.activemq.artemis.core.transaction.impl.TransactionImplAccessor;
import
org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ArtemisTestCase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -119,14 +127,16 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
AtomicBoolean returnSynchronizing = new AtomicBoolean(false);
ReplicationManager mockReplicationManager;
+ Consumer<PagedMessage> directWriteInterceptor = null;
+ Runnable ioSyncInterceptor = null;
private class MockableJournalStorageManager extends JournalStorageManager {
MockableJournalStorageManager(Configuration config,
- Journal bindingsJournal,
- Journal messagesJournal,
- ExecutorFactory executorFactory,
- ExecutorFactory ioExecutors) {
+ Journal bindingsJournal,
+ Journal messagesJournal,
+ ExecutorFactory executorFactory,
+ ExecutorFactory ioExecutors) {
super(config, Mockito.mock(CriticalAnalyzer.class), executorFactory,
ioExecutors);
this.bindingsJournal = bindingsJournal;
this.messageJournal = messagesJournal;
@@ -144,7 +154,11 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
}
@Override
- public void pageWrite(final SimpleString address, final PagedMessage
message, final long pageNumber, boolean storageUp, boolean
originallyReplicated) {
+ public void pageWrite(final SimpleString address,
+ final PagedMessage message,
+ final long pageNumber,
+ boolean storageUp,
+ boolean originallyReplicated) {
super.pageWrite(address, message, pageNumber, storageUp,
originallyReplicated);
pageWrites.incrementAndGet();
}
@@ -173,7 +187,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
return numberOfPreparesMessageJournal.incrementAndGet();
}
-
@BeforeEach
public void setupMocks() throws Exception {
configuration = new ConfigurationImpl();
@@ -227,12 +240,11 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
AddressSettings settings = new
AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- pageStore = new PagingStoreImpl(ADDRESS, scheduledExecutorService, 100,
mockPagingManager, realJournalStorageManager, inMemoryFileFactory,
- mockPageStoreFactory, ADDRESS, settings,
executorFactory.getExecutor(), false) {
+ pageStore = new PagingStoreImpl(ADDRESS, scheduledExecutorService, 100,
mockPagingManager, realJournalStorageManager, inMemoryFileFactory,
mockPageStoreFactory, ADDRESS, settings, executorFactory.getExecutor(), false) {
@Override
protected PageTimedWriter
createPageTimedWriter(ScheduledExecutorService scheduledExecutor, long
syncTimeout) {
- PageTimedWriter timer = new PageTimedWriter(100_000,
realJournalStorageManager, this, scheduledExecutorService,
executorFactory.getExecutor(), true, 100) {
+ PageTimedWriter timer = new PageTimedWriter(100_000,
realJournalStorageManager, this, scheduledExecutorService, getExecutor(), true,
100) {
@Override
public void run() {
try {
@@ -260,7 +272,28 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
timer.start();
return timer;
+ }
+
+ @Override
+ protected void directWritePage(PagedMessage pagedMessage,
+ boolean lineUp,
+ boolean originalReplicated) throws
Exception {
+ if (directWriteInterceptor != null) {
+ directWriteInterceptor.accept(pagedMessage);
+ }
+
+ if (!pageStore.getExecutor().inHandler()) {
+ logger.warn("WHAT????", new Exception("trace"));
+ }
+ super.directWritePage(pagedMessage, lineUp, originalReplicated);
+ }
+ @Override
+ public void ioSync() throws Exception {
+ if (ioSyncInterceptor != null) {
+ ioSyncInterceptor.run();
+ }
+ super.ioSync();
}
};
@@ -273,6 +306,9 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
Queue mockQueue = Mockito.mock(Queue.class);
Mockito.when(mockQueue.getID()).thenReturn(1L);
routeContextList.addAckedQueue(mockQueue);
+
+ OperationContextImpl.clearContext();
+ runAfter(OperationContextImpl::clearContext);
}
// a test to validate if the Mocks are correctly setup
@@ -291,7 +327,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
tx.commit();
assertEquals(1, count.get(), "tx.commit is not correctly wired on
mocking");
-
realJournalStorageManager.afterCompleteOperations(new IOCallback() {
@Override
public void done() {
@@ -325,7 +360,7 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
}
PagedMessage createPagedMessage() {
- return new PagedMessageImpl(createMessage(), new long[] {1}, -1);
+ return new PagedMessageImpl(createMessage(), new long[]{1}, -1);
}
@Test
@@ -438,39 +473,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
assertEquals(numberOfMessages, pageWrites.get());
}
- @Test
- public void testVerifyWritesAfterStop() throws Exception {
- int numberOfMessages = 100;
- CountDownLatch latch = new CountDownLatch(numberOfMessages);
-
- allowRunning.countDown();
- useReplication.set(false);
- allowSync.setCount(1);
-
- for (int i = 0; i < numberOfMessages; i++) {
- TransactionImpl newTX = new
TransactionImpl(realJournalStorageManager);
- newTX.setContainsPersistent();
- newTX.addOperation(new TransactionOperationAbstract() {
- @Override
- public void afterCommit(Transaction tx) {
- super.afterCommit(tx);
- latch.countDown();
- }
- });
- pageStore.page(createMessage(), newTX, routeContextList);
- newTX.commit();
- }
-
- assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
- allowSync.countDown();
- // issuing a stop should finish whatever was pending before
- // instead of sending it to the limbo
- pageStore.stop();
- assertTrue(latch.await(10, TimeUnit.SECONDS));
-
- assertEquals(numberOfMessages, pageWrites.get());
- }
-
@Test
public void testVerifyTimedWriterIsStopped() throws Exception {
allowRunning.countDown();
@@ -507,7 +509,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
Wait.assertEquals(1, numberOfCommitsMessageJournal::get, 5000, 100);
}
-
@Test
public void testTXCompletionPrepare() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
@@ -538,7 +539,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
Wait.assertEquals(1, numberOfPreparesMessageJournal::get, 5000, 100);
}
-
// add a task while replicating, process it when no longer replicating
(disconnect a node scenario)
@Test
public void testDisableReplica() throws Exception {
@@ -594,14 +594,6 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
@Test
public void testTXCompletion() throws Exception {
- ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(10);
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- runAfter(scheduledExecutorService::shutdownNow);
- runAfter(executorService::shutdownNow);
- runAfter(() -> OperationContextImpl.clearContext());
-
- OrderedExecutorFactory executorFactory = new
OrderedExecutorFactory(executorService);
-
OperationContextImpl.clearContext();
OperationContext context =
OperationContextImpl.getContext(executorFactory);
@@ -628,4 +620,294 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
}
-}
+ @Test
+ public void testDelayNonPersistent() throws Exception {
+ internalDelay(false);
+ }
+
+ @Test
+ public void testDelayPersistent() throws Exception {
+ internalDelay(true);
+ }
+
+ private void internalDelay(boolean persistent) throws Exception {
+ TransactionImpl tx = new TransactionImpl(realJournalStorageManager);
+
+ final AtomicInteger afterRollback = new AtomicInteger(0);
+ final AtomicInteger afterCommit = new AtomicInteger(0);
+
+ tx.addOperation(new TransactionOperation() {
+ @Override
+ public void beforePrepare(Transaction tx) throws Exception {
+
+ }
+
+ @Override
+ public void afterPrepare(Transaction tx) {
+
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) throws Exception {
+
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ afterCommit.incrementAndGet();
+ }
+
+ @Override
+ public void beforeRollback(Transaction tx) throws Exception {
+
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ afterRollback.incrementAndGet();
+ }
+
+ @Override
+ public List<MessageReference> getRelatedMessageReferences() {
+ return null;
+ }
+
+ @Override
+ public List<MessageReference> getListOnConsumer(long consumerID) {
+ return null;
+ }
+ });
+ TransactionImplAccessor.setContainsPersistent(tx, persistent);
+ tx.delay();
+ tx.commit();
+ assertEquals(0, afterCommit.get());
+ assertEquals(0, afterRollback.get());
+ tx.delayDone();
+ Wait.assertEquals(1, afterCommit::get, 5000, 100);
+ }
+
+ @Test
+ public void testRollbackCancelDelay() throws Exception {
+ testRollback(true);
+ }
+
+ @Test
+ public void testMarkRollbackCancelDelay() throws Exception {
+ testRollback(true);
+ }
+
+ private void testRollback(boolean rollback) throws Exception {
+ TransactionImpl tx = new TransactionImpl(realJournalStorageManager);
+
+ final AtomicInteger afterRollback = new AtomicInteger(0);
+ final AtomicInteger afterCommit = new AtomicInteger(0);
+
+ tx.addOperation(new TransactionOperation() {
+ @Override
+ public void beforePrepare(Transaction tx) throws Exception {
+
+ }
+
+ @Override
+ public void afterPrepare(Transaction tx) {
+
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) throws Exception {
+
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ afterCommit.incrementAndGet();
+ }
+
+ @Override
+ public void beforeRollback(Transaction tx) throws Exception {
+
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ afterRollback.incrementAndGet();
+ }
+
+ @Override
+ public List<MessageReference> getRelatedMessageReferences() {
+ return null;
+ }
+
+ @Override
+ public List<MessageReference> getListOnConsumer(long consumerID) {
+ return null;
+ }
+ });
+ TransactionImplAccessor.setContainsPersistent(tx, true);
+ tx.delay();
+ tx.commit();
+ assertEquals(0, afterCommit.get());
+ assertEquals(0, afterRollback.get());
+ if (rollback) {
+ tx.markAsRollbackOnly(new ActiveMQException("duh!"));
+ } else {
+ tx.rollback();
+ }
+ tx.delayDone();
+ Wait.assertEquals(0, afterCommit::get, 5000, 100);
+
+ if (!rollback) {
+ tx.rollback();
+ }
+
+ Wait.assertEquals(0, afterCommit::get, 5000, 100);
+ Wait.assertEquals(0, afterRollback::get, 5000, 100);
+ }
+
+ @Test
+ public void testSimulateFlow() throws Exception {
+ AtomicBoolean notSupposedToWrite = new AtomicBoolean(false);
+
+ AtomicInteger errors = new AtomicInteger(0);
+
+ int sleepTime = 1;
+ int totalTime = 500;
+ AtomicBoolean pageStoreThrowsExceptions = new AtomicBoolean(false);
+
+ LinkedHashSet<String> interceptedWrite = new LinkedHashSet<>();
+ LinkedHashSet<String> sentWrite = new LinkedHashSet<>();
+
+ directWriteInterceptor = m -> {
+ if (pageStoreThrowsExceptions.get()) {
+ throw new NullPointerException("simulating a NPE on directWrite");
+ }
+ String messageID = m.getMessage().getStringProperty("testId");
+ if (messageID == null) {
+ logger.warn("no messageID defined on message");
+ errors.incrementAndGet();
+ }
+ if (notSupposedToWrite.get()) {
+ logger.warn("Not supposed to write message {}",
m.getMessage().getStringProperty("testId"));
+ errors.incrementAndGet();
+ }
+ interceptedWrite.add(m.getMessage().getStringProperty("testId"));
+ };
+ ioSyncInterceptor = () -> {
+ if (pageStoreThrowsExceptions.get()) {
+ throw new NullPointerException("simulating a NPE on ioSync");
+ }
+ };
+
+ allowRunning.countDown();
+ // I don't want to mess with the Executor simulating to be on the
+ ExecutorService testExecutor = Executors.newFixedThreadPool(1);
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ runAfter(() -> running.set(false));
+ runAfter(testExecutor::shutdownNow);
+
+ CountDownLatch runLatch = new CountDownLatch(1);
+ CyclicBarrier flagStart = new CyclicBarrier(2);
+
+ // stop.. start.. stop ... start..
+ testExecutor.execute(() -> {
+ try {
+ flagStart.await(10, TimeUnit.SECONDS);
+ while (running.get()) {
+ Thread.sleep(sleepTime);
+ logger.debug("Forcing page");
+
+ // this is simulating what would happen on replication, we lock
the storage
+ // and force another page
+ // also no writing should happen while we hold the lock
+ realJournalStorageManager.writeLock();
+ try {
+ notSupposedToWrite.set(true);
+ Thread.sleep(sleepTime);
+ pageStore.forceAnotherPage(false);
+ notSupposedToWrite.set(false);
+ } finally {
+ realJournalStorageManager.writeUnlock();
+ }
+ Thread.sleep(sleepTime);
+ logger.debug("stopping");
+ timer.stop();
+ pageStoreThrowsExceptions.set(true);
+ Thread.sleep(sleepTime);
+ pageStoreThrowsExceptions.set(false);
+ logger.debug("starting");
+ timer.start();
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ runLatch.countDown();
+ }
+ });
+
+ flagStart.await(10, TimeUnit.SECONDS);
+
+ long timeout = System.currentTimeMillis() + totalTime;
+
+ RouteContextList routeContextListMocked =
Mockito.mock(RouteContextList.class);
+
+ CountDownLatch committed = new CountDownLatch(1);
+
+ TransactionImpl tx = new TransactionImpl(realJournalStorageManager);
+ tx.afterStore(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ committed.countDown();
+ }
+ });
+ TransactionImplAccessor.setContainsPersistent(tx, true);
+
+ int sentNumber = 0;
+ while (timeout > System.currentTimeMillis()) {
+ try {
+ PagedMessage message = createPagedMessage();
+ message.getMessage().putStringProperty("testId",
String.valueOf(sentNumber));
+ timer.addTask(context, message, tx, routeContextListMocked);
+ sentWrite.add(String.valueOf(sentNumber));
+ sentNumber++;
+ if (sentNumber % 1000 == 0) {
+ logger.info("Sent {}", sentNumber);
+ }
+ } catch (IllegalStateException notStarted) {
+ logger.debug("Retrying {}", sentNumber);
+ // ok
+ }
+ }
+
+ running.set(false);
+ assertTrue(runLatch.await(10, TimeUnit.SECONDS));
+ assertTrue(timer.isStarted());
+ timer.delay(); // calling one more delay as the last one done could
still be missing
+ assertEquals(0, errors.get());
+
+ // not supposed to throw any exception
+ Wait.assertEquals(0, tx::getPendingDelay, 5000, 100);
+ tx.commit();
+ assertTrue(committed.await(10, TimeUnit.SECONDS));
+
+ int interceptorOriginalSize = interceptedWrite.size();
+ int sentOriginalSize = sentWrite.size();
+
+ interceptedWrite.forEach(s -> {
+ sentWrite.remove(s);
+ });
+ sentWrite.forEach(m -> {
+ logger.warn("message {} missed", m);
+ });
+
+ assertEquals(interceptorOriginalSize, sentOriginalSize);
+
+ assertEquals(0, sentWrite.size());
+
+ assertEquals(interceptorOriginalSize, sentOriginalSize);
+ assertEquals(sentNumber, interceptorOriginalSize);
+
+ }
+
+}
\ No newline at end of file
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
index bb232ded08..c889c46075 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
@@ -18,6 +18,7 @@ package
org.apache.activemq.artemis.tests.unit.core.persistence.impl;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
@@ -67,9 +68,9 @@ public class BatchIDGeneratorUnitTest extends
ActiveMQTestBase {
assertTrue(id4 > id3 && id4 < 2000);
- batch.persistCurrentID();
-
+ batch.stop();
journal.stop();
+ validateStoppedGenerator(batch);
batch = new BatchingIDGenerator(0, 1000,
getJournalStorageManager(journal));
loadIDs(journal, batch);
@@ -100,8 +101,9 @@ public class BatchIDGeneratorUnitTest extends
ActiveMQTestBase {
lastId = id;
}
- batch.persistCurrentID();
+ batch.stop();
journal.stop();
+ validateStoppedGenerator(batch);
batch = new BatchingIDGenerator(0, 1000,
getJournalStorageManager(journal));
loadIDs(journal, batch);
@@ -117,6 +119,10 @@ public class BatchIDGeneratorUnitTest extends
ActiveMQTestBase {
}
+ private void validateStoppedGenerator(BatchingIDGenerator stoppedGenerator)
{
+ assertThrowsExactly(RuntimeException.class,
stoppedGenerator::generateID);
+ }
+
protected void loadIDs(final Journal journal, final BatchingIDGenerator
batch) throws Exception {
List<RecordInfo> records = new ArrayList<>();
List<PreparedTransactionInfo> tx = new ArrayList<>();
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java
index b2305ec1f8..07a7a50c00 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java
@@ -59,6 +59,11 @@ public class FakeAcceptorFactory implements AcceptorFactory {
}
+ @Override
+ public void notifyStop() {
+
+ }
+
@Override
public void updateInterceptors(List<BaseInterceptor>
incomingInterceptors,
List<BaseInterceptor>
outgoingInterceptors) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact