Repository: activemq-artemis Updated Branches: refs/heads/ARTEMIS-780 2972e092e -> 6303fcb43
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java new file mode 100644 index 0000000..c4c2214 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.persistence; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.File; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class SyncSendTest extends ActiveMQTestBase { + + private static long totalRecordTime = -1; + private static final int RECORDS = 300; + private static final int MEASURE_RECORDS = 100; + private static final int WRAMP_UP = 100; + + @Parameterized.Parameters(name = "storage={0}, protocol={1}") + public static Collection getParameters() { + Object[] storages = new Object[]{"libaio", "nio", "null"}; + Object[] protocols = new Object[]{"core", "openwire", "amqp"}; + + ArrayList<Object[]> objects = new ArrayList<>(); + for (Object s : storages) { + if (s.equals("libaio") && !LibaioContext.isLoaded()) { + continue; + } + for (Object p : protocols) { + objects.add(new Object[]{s, p}); + } + } + + return objects; + } + + private final String storage; + private final String protocol; + + public SyncSendTest(String storage, String protocol) { + this.storage = storage; + this.protocol = protocol; + } + + ActiveMQServer server; + JMSServerManagerImpl jms; + + @Override + public void setUp() throws Exception { + super.setUp(); + + if (storage.equals("null")) { + server = createServer(false, true); + } else { + server = createServer(true, true); + } + + jms = new JMSServerManagerImpl(server); + + if (storage.equals("libaio")) { + server.getConfiguration().setJournalType(JournalType.ASYNCIO); + } else { + server.getConfiguration().setJournalType(JournalType.NIO); + + } + jms.start(); + } + + private long getTimePerSync() throws Exception { + + if (storage.equals("null")) { + return 0; + } + if (totalRecordTime < 0) { + File measureFile = temporaryFolder.newFile(); + + System.out.println("File::" + measureFile); + + RandomAccessFile rfile = new RandomAccessFile(measureFile, "rw"); + FileChannel channel = rfile.getChannel(); + + channel.position(0); + + ByteBuffer buffer = ByteBuffer.allocate(10); + buffer.put(new byte[10]); + buffer.position(0); + + Assert.assertEquals(10, channel.write(buffer)); + channel.force(true); + + long time = System.nanoTime(); + + for (int i = 0; i < MEASURE_RECORDS + WRAMP_UP; i++) { + if (i == WRAMP_UP) { + time = System.nanoTime(); + } + channel.position(0); + buffer.position(0); + buffer.putInt(i); + buffer.position(0); + Assert.assertEquals(10, channel.write(buffer)); + channel.force(false); + } + + long timeEnd = System.nanoTime(); + + totalRecordTime = ((timeEnd - time) / MEASURE_RECORDS) * RECORDS; + + System.out.println("total time = " + totalRecordTime); + + } + return totalRecordTime; + + } + + @Test + public void testSendConsumeAudoACK() throws Exception { + + long recordTime = getTimePerSync(); + + jms.createQueue(true, "queue", null, true, null); + + ConnectionFactory factory = newCF(); + + Connection connection = factory.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue; + if (protocol.equals("amqp")) { + queue = session.createQueue("jms.queue.queue"); + } else { + queue = session.createQueue("queue"); + } + MessageProducer producer = session.createProducer(queue); + + long start = System.nanoTime(); + + for (int i = 0; i < (RECORDS + WRAMP_UP); i++) { + if (i == WRAMP_UP) { + start = System.nanoTime(); // wramp up + } + producer.send(session.createMessage()); + } + + long end = System.nanoTime(); + + System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start)); + System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime)); + + if ((end - start) < recordTime) { + Assert.fail("Messages are being sent too fast! Faster than the disk would be able to sync!"); + } + + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < (RECORDS + WRAMP_UP); i++) { + if (i == WRAMP_UP) { + start = System.nanoTime(); // wramp up + } + Message msg = consumer.receive(5000); + Assert.assertNotNull(msg); + } + + end = System.nanoTime(); + + System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start)); + System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime)); + + // There's no way to sync on ack for AMQP + if (!protocol.equals("amqp") && (end - start) < recordTime) { + Assert.fail("Messages are being acked too fast! Faster than the disk would be able to sync!"); + } + } finally { + connection.close(); + } + + } + + // this will set ack as synchronous, to make sure we make proper measures against the sync on disk + private ConnectionFactory newCF() { + if (protocol.equals("core")) { + ConnectionFactory factory = new ActiveMQConnectionFactory(); + ((ActiveMQConnectionFactory) factory).setBlockOnAcknowledge(true); + return factory; + } else if (protocol.equals("amqp")) { + final JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + factory.setForceAsyncAcks(true); + return factory; + } else { + org.apache.activemq.ActiveMQConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true"); + cf.setSendAcksAsync(false); + return cf; + } + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java index 84a3ecc..c445a86 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServi import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.vertx.java.core.Handler; import org.vertx.java.core.Vertx; @@ -48,8 +49,10 @@ import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; /** * This class tests the basics of ActiveMQ - * vertx integration + * vertx inte + * gration */ +@Ignore public class ActiveMQVertxUnitTest extends ActiveMQTestBase { private PlatformManager vertxManager; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java index d0676ee..0316945 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java @@ -60,6 +60,16 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { } @Override + public SequentialFileFactory setDatasync(boolean enabled) { + return null; + } + + @Override + public boolean isDatasync() { + return false; + } + + @Override public int getMaxIO() { return 1; }
