ARTEMIS-829 Removing messages re-encoding https://issues.apache.org/jira/browse/ARTEMIS-829
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e0021252 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e0021252 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e0021252 Branch: refs/heads/master Commit: e0021252ee94dcafe664520e080d5a6e13e3350f Parents: 4b5cbb8 Author: Clebert Suconic <[email protected]> Authored: Mon Oct 24 18:20:20 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Fri Oct 28 16:54:58 2016 -0400 ---------------------------------------------------------------------- .../core/client/impl/ClientProducerImpl.java | 2 - .../core/client/impl/ClientSessionImpl.java | 26 +-- .../core/client/impl/ClientSessionInternal.java | 2 - .../core/impl/ActiveMQSessionContext.java | 10 +- .../spi/core/remoting/SessionContext.java | 3 +- .../client/HornetQClientSessionContext.java | 5 +- .../tests/extras/byteman/PagingLeakTest.java | 14 +- .../tests/integration/client/ProducerTest.java | 6 +- .../cluster/failover/BackupSyncJournalTest.java | 2 +- .../journal/NIOJournalCompactTest.java | 180 ++++++++++--------- .../journal/ValidateTransactionHealthTest.java | 25 +-- .../tests/integration/paging/PagingTest.java | 12 +- 12 files changed, 140 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index fddd4de..1dfbe72 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -284,8 +284,6 @@ public class ClientProducerImpl implements ClientProducerInternal { theCredits.acquireCredits(creditSize); - session.checkDefaultAddress(sendingAddress); - sessionContext.sendFullMessage(msgI, sendBlocking, handler, address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index de45066..fd6355a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -135,8 +135,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private volatile boolean mayAttemptToFailover = true; - private volatile SimpleString defaultAddress; - /** * Current XID. this will be used in case of failover */ @@ -957,7 +955,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // want // to recreate the session, we just want to unblock the blocking call if (!inClose && mayAttemptToFailover) { - sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress); + sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge); for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet()) { @@ -1036,27 +1034,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi @Override public void setAddress(final Message message, final SimpleString address) { - if (defaultAddress == null) { - logger.tracef("setAddress() Setting default address as %s", address); + logger.tracef("setAddress() Setting default address as %s", address); - message.setAddress(address); - } else { - if (!address.equals(defaultAddress)) { - logger.tracef("setAddress() setting non default address %s on message", address); - message.setAddress(address); - } else { - logger.trace("setAddress() being set as null"); - message.setAddress(null); - } - } - } - - @Override - public void checkDefaultAddress(SimpleString address) { - if (defaultAddress == null) { - logger.tracef("checkDefaultAddress(%s)", address); - defaultAddress = address; - } + message.setAddress(address); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index ed636bd..4e06068 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -93,8 +93,6 @@ public interface ClientSessionInternal extends ClientSession { */ void setAddress(Message message, SimpleString address); - void checkDefaultAddress(SimpleString address); - void setPacketSize(int packetSize); void resetIfNeeded() throws ActiveMQException; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index c72e19b..56c7135 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -629,9 +629,8 @@ public class ActiveMQSessionContext extends SessionContext { final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks, - final boolean preAcknowledge, - final SimpleString defaultAddress) throws ActiveMQException { - Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress); + final boolean preAcknowledge) throws ActiveMQException { + Packet createRequest = newCreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge); boolean retry; do { try { @@ -662,9 +661,8 @@ public class ActiveMQSessionContext extends SessionContext { boolean xa, boolean autoCommitSends, boolean autoCommitAcks, - boolean preAcknowledge, - SimpleString defaultAddress) { - return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString()); + boolean preAcknowledge) { + return new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, null); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 175360c..1f15cc6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -250,8 +250,7 @@ public abstract class SessionContext { final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks, - final boolean preAcknowledge, - final SimpleString defaultAddress) throws ActiveMQException; + final boolean preAcknowledge) throws ActiveMQException; public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index d932274..caa94a1 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -63,9 +63,8 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { boolean xa, boolean autoCommitSends, boolean autoCommitAcks, - boolean preAcknowledge, - SimpleString defaultAddress) { - return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString()); + boolean preAcknowledge) { + return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), null); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java index f9744d2..4ffd2bd 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PagingLeakTest.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; @@ -92,10 +93,13 @@ public class PagingLeakTest extends ActiveMQTestBase { positions.clear(); - timeout = System.currentTimeMillis() + 5000; - while (pagePosInstances.get() != 0 && timeout > System.currentTimeMillis()) { - forceGC(); - } + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + forceGC(); + return pagePosInstances.get() == 0; + } + }, 5000, 100); // This is just to validate the rules are correctly applied on byteman assertEquals("You have changed something on PagePositionImpl in such way that these byteman rules are no longer working", 0, pagePosInstances.get()); @@ -110,7 +114,7 @@ public class PagingLeakTest extends ActiveMQTestBase { server.start(); - AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + AddressSettings settings = new AddressSettings().setPageSizeBytes(2 * 1024).setMaxSizeBytes(10 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); server.getAddressSettingsRepository().addMatch("#", settings); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java index c409d5f..d7af4b8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerTest.java @@ -104,12 +104,12 @@ public class ProducerTest extends ActiveMQTestBase { ClientProducer producer = session.createProducer(); for (int i = 0; i < 62; i++) { - if (i == 61) { + if (i == 30) { // the point where the send would block latch.countDown(); } ClientMessage msg = session.createMessage(false); - msg.getBodyBuffer().writeBytes(new byte[1024]); + msg.getBodyBuffer().writeBytes(new byte[2048]); producer.send(QUEUE, msg); } } catch (Exception e) { @@ -119,7 +119,7 @@ public class ProducerTest extends ActiveMQTestBase { }; t.start(); - assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertTrue(latch.await(10, TimeUnit.SECONDS)); session.close(); t.join(5000); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java index 20ddae3..b51ff8a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java @@ -93,7 +93,7 @@ public class BackupSyncJournalTest extends FailoverTestBase { @Test public void testReserveFileIdValuesOnBackup() throws Exception { - final int totalRounds = 50; + final int totalRounds = 5; createProducerSendSomeMessages(); JournalImpl messageJournal = getMessageJournalFromServer(liveServer); for (int i = 0; i < totalRounds; i++) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index ee1ac11..2dd38ae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -730,7 +730,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase { @Test public void testCompactAddAndUpdateFollowedByADelete() throws Exception { - setup(2, 60 * 1024, false); SimpleIDGenerator idGen = new SimpleIDGenerator(1000); @@ -779,7 +778,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase { createJournal(); startJournal(); loadAndCheck(); - } @Test @@ -1610,8 +1608,9 @@ public class NIOJournalCompactTest extends JournalImplTestBase { } + @Test - public void testStressDeletesNoSync() throws Exception { + public void testStressDeletesNoSync() throws Throwable { Configuration config = createBasicConfig().setJournalFileSize(100 * 1024).setJournalSyncNonTransactional(false).setJournalSyncTransactional(false).setJournalCompactMinFiles(0).setJournalCompactPercentage(0); final AtomicInteger errors = new AtomicInteger(0); @@ -1629,114 +1628,129 @@ public class NIOJournalCompactTest extends JournalImplTestBase { final JournalStorageManager storage = new JournalStorageManager(config, factory); storage.start(); - storage.loadInternalOnly(); - - ((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false); - final LinkedList<Long> survivingMsgs = new LinkedList<>(); - Runnable producerRunnable = new Runnable() { - @Override - public void run() { - try { - while (running.get()) { - final long[] values = new long[100]; - long tx = seqGenerator.incrementAndGet(); + try { + storage.loadInternalOnly(); - OperationContextImpl ctx = new OperationContextImpl(executor); - storage.setContext(ctx); + ((JournalImpl) storage.getMessageJournal()).setAutoReclaim(false); + final LinkedList<Long> survivingMsgs = new LinkedList<>(); - for (int i = 0; i < 100; i++) { - long id = seqGenerator.incrementAndGet(); - values[i] = id; + Runnable producerRunnable = new Runnable() { + @Override + public void run() { + try { + while (running.get()) { + final long[] values = new long[100]; + long tx = seqGenerator.incrementAndGet(); - ServerMessageImpl message = new ServerMessageImpl(id, 100); + OperationContextImpl ctx = new OperationContextImpl(executor); + storage.setContext(ctx); - message.getBodyBuffer().writeBytes(new byte[1024]); + for (int i = 0; i < 100; i++) { + long id = seqGenerator.incrementAndGet(); + values[i] = id; - storage.storeMessageTransactional(tx, message); - } - ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100); + ServerMessageImpl message = new ServerMessageImpl(id, 100); - survivingMsgs.add(message.getMessageID()); + message.getBodyBuffer().writeBytes(new byte[1024]); - // This one will stay here forever - storage.storeMessage(message); - - storage.commit(tx); - - ctx.executeOnCompletion(new IOCallback() { - @Override - public void onError(int errorCode, String errorMessage) { + storage.storeMessageTransactional(tx, message); } - - @Override - public void done() { - deleteExecutor.execute(new Runnable() { - @Override - public void run() { - try { - for (long messageID : values) { - storage.deleteMessage(messageID); + ServerMessageImpl message = new ServerMessageImpl(seqGenerator.incrementAndGet(), 100); + + survivingMsgs.add(message.getMessageID()); + + // This one will stay here forever + storage.storeMessage(message); + + storage.commit(tx); + + ctx.executeOnCompletion(new IOCallback() { + @Override + public void onError(int errorCode, String errorMessage) { + } + + @Override + public void done() { + deleteExecutor.execute(new Runnable() { + @Override + public void run() { + try { + for (long messageID : values) { + storage.deleteMessage(messageID); + } + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); } - } catch (Exception e) { - e.printStackTrace(); - errors.incrementAndGet(); - } - } - }); - } - }); + } + }); + } + }); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); } - } catch (Throwable e) { - e.printStackTrace(); - errors.incrementAndGet(); } - } - }; - - Runnable compressRunnable = new Runnable() { - @Override - public void run() { - try { - while (running.get()) { - Thread.sleep(500); - System.out.println("Compacting"); - ((JournalImpl) storage.getMessageJournal()).testCompact(); - ((JournalImpl) storage.getMessageJournal()).checkReclaimStatus(); + }; + + Runnable compressRunnable = new Runnable() { + @Override + public void run() { + try { + while (running.get()) { + Thread.sleep(500); + System.out.println("Compacting"); + ((JournalImpl) storage.getMessageJournal()).testCompact(); + ((JournalImpl) storage.getMessageJournal()).checkReclaimStatus(); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); } - } catch (Throwable e) { - e.printStackTrace(); - errors.incrementAndGet(); + } + }; - } - }; + Thread producerThread = new Thread(producerRunnable); + producerThread.start(); - Thread producerThread = new Thread(producerRunnable); - producerThread.start(); + Thread compactorThread = new Thread(compressRunnable); + compactorThread.start(); - Thread compactorThread = new Thread(compressRunnable); - compactorThread.start(); + Thread.sleep(1000); - Thread.sleep(1000); + running.set(false); - running.set(false); + producerThread.join(); - producerThread.join(); + compactorThread.join(); - compactorThread.join(); + deleteExecutor.shutdown(); - storage.stop(); + assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS)); - executor.shutdown(); + executor.shutdown(); - assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS)); + assertTrue("executor terminated", executor.awaitTermination(10, TimeUnit.SECONDS)); - deleteExecutor.shutdown(); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { + try { + storage.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + + executor.shutdownNow(); + deleteExecutor.shutdownNow(); + } - assertTrue("delete executor terminated", deleteExecutor.awaitTermination(30, TimeUnit.SECONDS)); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java index 1972863..2d3df3e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java @@ -144,18 +144,21 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { JournalImpl journal = ValidateTransactionHealthTest.createJournal(type, journalDir); journal.start(); - Loader loadTest = new Loader(numberOfRecords); - journal.load(loadTest); - Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds); - Assert.assertEquals(0, loadTest.numberOfPreparedTransactions); - Assert.assertEquals(0, loadTest.numberOfUpdates); - Assert.assertEquals(0, loadTest.numberOfDeletes); - - journal.stop(); - - if (loadTest.ex != null) { - throw loadTest.ex; + try { + Loader loadTest = new Loader(numberOfRecords); + journal.load(loadTest); + Assert.assertEquals(numberOfRecords * numberOfThreads, loadTest.numberOfAdds); + Assert.assertEquals(0, loadTest.numberOfPreparedTransactions); + Assert.assertEquals(0, loadTest.numberOfUpdates); + Assert.assertEquals(0, loadTest.numberOfDeletes); + + if (loadTest.ex != null) { + throw loadTest.ex; + } + } finally { + journal.stop(); } + } // Inner classes ------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e0021252/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 6f0bdc1..00c0bdf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -3335,7 +3335,7 @@ public class PagingTest extends ActiveMQTestBase { ClientMessage message = null; for (int i = 0; i < numberOfMessages; i++) { - byte[] body = new byte[1024]; + byte[] body = new byte[2048]; message = session.createMessage(true); message.getBodyBuffer().writeBytes(body); @@ -3360,7 +3360,7 @@ public class PagingTest extends ActiveMQTestBase { Assert.assertEquals(0, server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); for (int i = 0; i < numberOfMessages; i++) { - byte[] body = new byte[1024]; + byte[] body = new byte[2048]; message = session.createMessage(true); message.getBodyBuffer().writeBytes(body); @@ -3385,7 +3385,7 @@ public class PagingTest extends ActiveMQTestBase { producer = session.createProducer(PagingTest.ADDRESS); for (int i = 0; i < numberOfMessages; i++) { - byte[] body = new byte[1024]; + byte[] body = new byte[2048]; message = session.createMessage(true); message.getBodyBuffer().writeBytes(body); @@ -3841,7 +3841,7 @@ public class PagingTest extends ActiveMQTestBase { Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024); - server = createServer(true, config, 512 * 1024, 1024 * 1024); + server = createServer(true, config, 100 * 1024, 1024 * 1024 / 2); server.start(); @@ -4745,7 +4745,7 @@ public class PagingTest extends ActiveMQTestBase { ClientMessage message = session.createMessage(true); - int biggerMessageSize = 1024; + int biggerMessageSize = 2048; byte[] body = new byte[biggerMessageSize]; ByteBuffer bb = ByteBuffer.wrap(body); for (int j = 1; j <= biggerMessageSize; j++) { @@ -4817,7 +4817,7 @@ public class PagingTest extends ActiveMQTestBase { ClientMessage message = session.createMessage(true); - int biggerMessageSize = 1024; + int biggerMessageSize = 2048; byte[] body = new byte[biggerMessageSize]; ByteBuffer bb = ByteBuffer.wrap(body); for (int j = 1; j <= biggerMessageSize; j++) {
