http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SyncSpeedTest.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SyncSpeedTest.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SyncSpeedTest.java deleted file mode 100644 index 0c982dc..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/SyncSpeedTest.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; - -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; - -/** - * A SyncSpeedTest - * - * This class just provides some diagnostics on how fast your disk can sync - * Useful when determining performance issues - */ -public class SyncSpeedTest -{ - public static void main(final String[] args) - { - try - { - new SyncSpeedTest().testScaleAIO(); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - protected SequentialFileFactory fileFactory; - - public boolean AIO = true; - - protected void setupFactory() - { - if (AIO) - { - fileFactory = new AIOSequentialFileFactory(new File("."), 0, 0, false, null); - } - else - { - fileFactory = new NIOSequentialFileFactory(new File("."), false, 0, 0, false, null); - } - } - - protected SequentialFile createSequentialFile(final String fileName) - { - if (AIO) - { - return new AIOSequentialFile(fileFactory, - 0, - 0, - new File("."), - fileName, - 100000, - null, - null, - Executors.newSingleThreadExecutor()); - } - else - { - return new NIOSequentialFile(fileFactory, new File("."), fileName, 1000, null); - } - } - - public void run2() throws Exception - { - setupFactory(); - - int recordSize = 128 * 1024; - - while (true) - { - System.out.println("** record size is " + recordSize); - - int warmup = 500; - - int its = 500; - - int fileSize = (its + warmup) * recordSize; - - SequentialFile file = createSequentialFile("sync-speed-test.dat"); - - if (file.exists()) - { - file.delete(); - } - - file.open(); - - file.fill(0, fileSize, (byte)'X'); - - if (!AIO) - { - file.sync(); - } - - ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h'); - - long start = 0; - - for (int i = 0; i < its + warmup; i++) - { - if (i == warmup) - { - start = System.currentTimeMillis(); - } - - bb1.rewind(); - - file.writeDirect(bb1, true); - } - - long end = System.currentTimeMillis(); - - double rate = 1000 * (double)its / (end - start); - - double throughput = recordSize * rate; - - System.out.println("Rate of " + rate + " syncs per sec"); - System.out.println("Throughput " + throughput + " bytes per sec"); - System.out.println("*************"); - - recordSize *= 2; - } - } - - public void run() throws Exception - { - int recordSize = 256; - - while (true) - { - System.out.println("** record size is " + recordSize); - - int warmup = 500; - - int its = 500; - - int fileSize = (its + warmup) * recordSize; - - File file = new File("sync-speed-test.dat"); - - if (file.exists()) - { - if (!file.delete()) - { - ActiveMQJournalLogger.LOGGER.errorDeletingFile(file); - } - } - - boolean created = file.createNewFile(); - if (!created) - throw new IOException("could not create file " + file); - - RandomAccessFile rfile = new RandomAccessFile(file, "rw"); - - FileChannel channel = rfile.getChannel(); - - ByteBuffer bb = generateBuffer(fileSize, (byte)'x'); - - write(bb, channel, fileSize); - - channel.force(true); - - channel.position(0); - - ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h'); - - long start = 0; - - for (int i = 0; i < its + warmup; i++) - { - if (i == warmup) - { - start = System.currentTimeMillis(); - } - - bb1.flip(); - channel.write(bb1); - channel.force(false); - } - - long end = System.currentTimeMillis(); - - double rate = 1000 * (double)its / (end - start); - - double throughput = recordSize * rate; - - System.out.println("Rate of " + rate + " syncs per sec"); - System.out.println("Throughput " + throughput + " bytes per sec"); - - recordSize *= 2; - } - } - - public void testScaleAIO() throws Exception - { - setupFactory(); - - final int recordSize = 1024; - - System.out.println("** record size is " + recordSize); - - final int its = 10; - - for (int numThreads = 1; numThreads <= 10; numThreads++) - { - - int fileSize = its * recordSize * numThreads; - - final SequentialFile file = createSequentialFile("sync-speed-test.dat"); - - if (file.exists()) - { - file.delete(); - } - - file.open(); - - file.fill(0, fileSize, (byte)'X'); - - if (!AIO) - { - file.sync(); - } - - final CountDownLatch latch = new CountDownLatch(its * numThreads); - - class MyIOAsyncTask implements IOAsyncTask - { - public void done() - { - latch.countDown(); - } - - public void onError(final int errorCode, final String errorMessage) - { - - } - } - - final MyIOAsyncTask task = new MyIOAsyncTask(); - - class MyRunner implements Runnable - { - private final ByteBuffer bb1; - - MyRunner() - { - bb1 = generateBuffer(recordSize, (byte)'h'); - } - - public void run() - { - for (int i = 0; i < its; i++) - { - bb1.rewind(); - - file.writeDirect(bb1, true, task); - // try - // { - // file.writeDirect(bb1, true); - // } - // catch (Exception e) - // { - // e.printStackTrace(); - // } - } - } - } - - Set<Thread> threads = new HashSet<Thread>(); - - for (int i = 0; i < numThreads; i++) - { - MyRunner runner = new MyRunner(); - - Thread t = new Thread(runner); - - threads.add(t); - } - - long start = System.currentTimeMillis(); - - for (Thread t : threads) - { - ActiveMQJournalLogger.LOGGER.startingThread(); - t.start(); - } - - for (Thread t : threads) - { - t.join(); - } - - latch.await(); - - long end = System.currentTimeMillis(); - - double rate = 1000 * (double)its * numThreads / (end - start); - - double throughput = recordSize * rate; - - System.out.println("For " + numThreads + " threads:"); - System.out.println("Rate of " + rate + " records per sec"); - System.out.println("Throughput " + throughput + " bytes per sec"); - System.out.println("*************"); - } - } - - private void write(final ByteBuffer buffer, final FileChannel channel, final int size) throws Exception - { - buffer.flip(); - - channel.write(buffer); - } - - private ByteBuffer generateBuffer(final int size, final byte ch) - { - ByteBuffer bb = ByteBuffer.allocateDirect(size); - - for (int i = 0; i < size; i++) - { - bb.put(ch); - } - - return bb; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBuffer.java deleted file mode 100644 index 45d4b62..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBuffer.java +++ /dev/null @@ -1,558 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; - -public class TimedBuffer -{ - // Constants ----------------------------------------------------- - - // The number of tries on sleep before switching to spin - public static final int MAX_CHECKS_ON_SLEEP = 20; - - // Attributes ---------------------------------------------------- - - private TimedBufferObserver bufferObserver; - - // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread - // in spinning and checking the time - and using up CPU in the process - this semaphore is used to - // prevent that - private final Semaphore spinLimiter = new Semaphore(1); - - private CheckTimer timerRunnable = new CheckTimer(); - - private final int bufferSize; - - private final ActiveMQBuffer buffer; - - private int bufferLimit = 0; - - private List<IOAsyncTask> callbacks; - - private volatile int timeout; - - // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen - private volatile boolean pendingSync = false; - - private Thread timerThread; - - private volatile boolean started; - - // We use this flag to prevent flush occurring between calling checkSize and addBytes - // CheckSize must always be followed by it's corresponding addBytes otherwise the buffer - // can get in an inconsistent state - private boolean delayFlush; - - // for logging write rates - - private final boolean logRates; - - private final AtomicLong bytesFlushed = new AtomicLong(0); - - private final AtomicLong flushesDone = new AtomicLong(0); - - private Timer logRatesTimer; - - private TimerTask logRatesTimerTask; - - private boolean useSleep = true; - - // no need to be volatile as every access is synchronized - private boolean spinning = false; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - public TimedBuffer(final int size, final int timeout, final boolean logRates) - { - bufferSize = size; - - this.logRates = logRates; - - if (logRates) - { - logRatesTimer = new Timer(true); - } - // Setting the interval for nano-sleeps - - buffer = ActiveMQBuffers.fixedBuffer(bufferSize); - - buffer.clear(); - - bufferLimit = 0; - - callbacks = new ArrayList<IOAsyncTask>(); - - this.timeout = timeout; - } - - // for Debug purposes - public synchronized boolean isUseSleep() - { - return useSleep; - } - - public synchronized void setUseSleep(boolean useSleep) - { - this.useSleep = useSleep; - } - - public synchronized void start() - { - if (started) - { - return; - } - - // Need to start with the spin limiter acquired - try - { - spinLimiter.acquire(); - } - catch (InterruptedException e) - { - throw new ActiveMQInterruptedException(e); - } - - timerRunnable = new CheckTimer(); - - timerThread = new Thread(timerRunnable, "activemq-buffer-timeout"); - - timerThread.start(); - - if (logRates) - { - logRatesTimerTask = new LogRatesTimerTask(); - - logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000); - } - - started = true; - } - - public void stop() - { - if (!started) - { - return; - } - - flush(); - - bufferObserver = null; - - timerRunnable.close(); - - spinLimiter.release(); - - if (logRates) - { - logRatesTimerTask.cancel(); - } - - while (timerThread.isAlive()) - { - try - { - timerThread.join(); - } - catch (InterruptedException e) - { - throw new ActiveMQInterruptedException(e); - } - } - - started = false; - } - - public synchronized void setObserver(final TimedBufferObserver observer) - { - if (bufferObserver != null) - { - flush(); - } - - bufferObserver = observer; - } - - /** - * Verify if the size fits the buffer - * - * @param sizeChecked - */ - public synchronized boolean checkSize(final int sizeChecked) - { - if (!started) - { - throw new IllegalStateException("TimedBuffer is not started"); - } - - if (sizeChecked > bufferSize) - { - throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize + - ") on the journal"); - } - - if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) - { - // Either there is not enough space left in the buffer for the sized record - // Or a flush has just been performed and we need to re-calcualate bufferLimit - - flush(); - - delayFlush = true; - - final int remainingInFile = bufferObserver.getRemainingBytes(); - - if (sizeChecked > remainingInFile) - { - return false; - } - else - { - // There is enough space in the file for this size - - // Need to re-calculate buffer limit - - bufferLimit = Math.min(remainingInFile, bufferSize); - - return true; - } - } - else - { - delayFlush = true; - - return true; - } - } - - public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOAsyncTask callback) - { - addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback); - } - - public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) - { - if (!started) - { - throw new IllegalStateException("TimedBuffer is not started"); - } - - delayFlush = false; - - bytes.encode(buffer); - - callbacks.add(callback); - - if (sync) - { - pendingSync = true; - - startSpin(); - } - - } - - public void flush() - { - flush(false); - } - - /** - * force means the Journal is moving to a new file. Any pending write need to be done immediately - * or data could be lost - */ - public void flush(final boolean force) - { - synchronized (this) - { - if (!started) - { - throw new IllegalStateException("TimedBuffer is not started"); - } - - if ((force || !delayFlush) && buffer.writerIndex() > 0) - { - int pos = buffer.writerIndex(); - - if (logRates) - { - bytesFlushed.addAndGet(pos); - } - - ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); - - // Putting a byteArray on a native buffer is much faster, since it will do in a single native call. - // Using bufferToFlush.put(buffer) would make several append calls for each byte - // We also transfer the content of this buffer to the native file's buffer - - bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos); - - bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); - - stopSpin(); - - pendingSync = false; - - // swap the instance as the previous callback list is being used asynchronously - callbacks = new LinkedList<IOAsyncTask>(); - - buffer.clear(); - - bufferLimit = 0; - - flushesDone.incrementAndGet(); - } - } - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - - private class LogRatesTimerTask extends TimerTask - { - private boolean closed; - - private long lastExecution; - - private long lastBytesFlushed; - - private long lastFlushesDone; - - @Override - public synchronized void run() - { - if (!closed) - { - long now = System.currentTimeMillis(); - - long bytesF = bytesFlushed.get(); - long flushesD = flushesDone.get(); - - if (lastExecution != 0) - { - double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution); - ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024))); - double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution); - ActiveMQJournalLogger.LOGGER.flushRate(flushRate); - } - - lastExecution = now; - - lastBytesFlushed = bytesF; - - lastFlushesDone = flushesD; - } - } - - @Override - public synchronized boolean cancel() - { - closed = true; - - return super.cancel(); - } - } - - private class CheckTimer implements Runnable - { - private volatile boolean closed = false; - - int checks = 0; - int failedChecks = 0; - long timeBefore = 0; - - final int sleepMillis = timeout / 1000000; // truncates - final int sleepNanos = timeout % 1000000; - - - public void run() - { - long lastFlushTime = 0; - - while (!closed) - { - // We flush on the timer if there are pending syncs there and we've waited at least one - // timeout since the time of the last flush. - // Effectively flushing "resets" the timer - // On the timeout verification, notice that we ignore the timeout check if we are using sleep - - if (pendingSync) - { - if (isUseSleep()) - { - // if using sleep, we will always flush - flush(); - lastFlushTime = System.nanoTime(); - } - else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) - { - // if not using flush we will spin and do the time checks manually - flush(); - lastFlushTime = System.nanoTime(); - } - - } - - sleepIfPossible(); - - try - { - spinLimiter.acquire(); - - Thread.yield(); - - spinLimiter.release(); - } - catch (InterruptedException e) - { - throw new ActiveMQInterruptedException(e); - } - } - } - - /** - * We will attempt to use sleep only if the system supports nano-sleep - * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well. - * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin - */ - private void sleepIfPossible() - { - if (isUseSleep()) - { - if (checks < MAX_CHECKS_ON_SLEEP) - { - timeBefore = System.nanoTime(); - } - - try - { - sleep(sleepMillis, sleepNanos); - } - catch (InterruptedException e) - { - throw new ActiveMQInterruptedException(e); - } - catch (Exception e) - { - setUseSleep(false); - ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e); - } - - if (checks < MAX_CHECKS_ON_SLEEP) - { - long realTimeSleep = System.nanoTime() - timeBefore; - - // I'm letting the real time to be up to 50% than the requested sleep. - if (realTimeSleep > timeout * 1.5) - { - failedChecks++; - } - - if (++checks >= MAX_CHECKS_ON_SLEEP) - { - if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) - { - ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts"); - setUseSleep(false); - } - } - } - } - } - - public void close() - { - closed = true; - } - } - - /** - * Sub classes (tests basically) can use this to override how the sleep is being done - * - * @param sleepMillis - * @param sleepNanos - * @throws InterruptedException - */ - protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException - { - Thread.sleep(sleepMillis, sleepNanos); - } - - /** - * Sub classes (tests basically) can use this to override disabling spinning - */ - protected void stopSpin() - { - if (spinning) - { - try - { - // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning - // when the buffer is inactive - spinLimiter.acquire(); - } - catch (InterruptedException e) - { - throw new ActiveMQInterruptedException(e); - } - - spinning = false; - } - } - - - /** - * Sub classes (tests basically) can use this to override disabling spinning - */ - protected void startSpin() - { - if (!spinning) - { - spinLimiter.release(); - - spinning = true; - } - } - - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBufferObserver.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBufferObserver.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBufferObserver.java deleted file mode 100644 index f219f08..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TimedBufferObserver.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal.impl; - -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.activemq.artemis.core.journal.IOAsyncTask; - -public interface TimedBufferObserver -{ - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOAsyncTask> callbacks); - - /** Return the number of remaining bytes that still fit on the observer (file) */ - int getRemainingBytes(); - - ByteBuffer newBuffer(int size, int limit); - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java index 4035202..140927e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/TransactionCallback.java @@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.journal.impl; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.utils.ReusableLatch; -public class TransactionCallback implements IOAsyncTask +public class TransactionCallback implements IOCallback { private final ReusableLatch countLatch = new ReusableLatch(); @@ -33,7 +33,7 @@ public class TransactionCallback implements IOAsyncTask private int done = 0; - private volatile IOAsyncTask delegateCompletion; + private volatile IOCallback delegateCompletion; public void countUp() { @@ -46,7 +46,7 @@ public class TransactionCallback implements IOAsyncTask countLatch.countDown(); if (++done == up.get() && delegateCompletion != null) { - final IOAsyncTask delegateToCall = delegateCompletion; + final IOCallback delegateToCall = delegateCompletion; // We need to set the delegateCompletion to null first or blocking commits could miss a callback // What would affect mainly tests delegateCompletion = null; @@ -81,7 +81,7 @@ public class TransactionCallback implements IOAsyncTask /** * @return the delegateCompletion */ - public IOAsyncTask getDelegateCompletion() + public IOCallback getDelegateCompletion() { return delegateCompletion; } @@ -89,7 +89,7 @@ public class TransactionCallback implements IOAsyncTask /** * @param delegateCompletion the delegateCompletion to set */ - public void setDelegateCompletion(final IOAsyncTask delegateCompletion) + public void setDelegateCompletion(final IOCallback delegateCompletion) { this.delegateCompletion = delegateCompletion; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/CallbackOrderTest.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/CallbackOrderTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/CallbackOrderTest.java new file mode 100644 index 0000000..82d4502 --- /dev/null +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/CallbackOrderTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.io.aio; + +import java.io.File; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.io.IOCallback; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** This will emulate callbacks out of order from libaio*/ +public class CallbackOrderTest +{ + + @Rule + public TemporaryFolder temporaryFolder; + + public CallbackOrderTest() + { + File parent = new File("./target"); + parent.mkdirs(); + temporaryFolder = new TemporaryFolder(parent); + } + + /** This method will make sure callbacks will come back in order even when out order from libaio */ + @Test + public void testCallbackOutOfOrder() throws Exception + { + AIOSequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + AIOSequentialFile file = (AIOSequentialFile)factory.createSequentialFile("test.bin"); + + final AtomicInteger count = new AtomicInteger(0); + + IOCallback callback = new IOCallback() + { + @Override + public void done() + { + count.incrementAndGet(); + } + + @Override + public void onError(int errorCode, String errorMessage) + { + + } + }; + + ArrayList<AIOSequentialFileFactory.AIOSequentialCallback> list = new ArrayList<>(); + + // We will repeat the teset a few times, increasing N + // to increase possibility of issues due to reuse of callbacks + for (int n = 1; n < 100; n++) + { + System.out.println("n = " + n); + int N = n; + count.set(0); + list.clear(); + for (int i = 0; i < N; i++) + { + list.add(file.getCallback(callback, null)); + } + + + for (int i = N - 1; i >= 0; i--) + { + list.get(i).done(); + } + + Assert.assertEquals(N, count.get()); + Assert.assertEquals(0, file.pendingCallbackList.size()); + Assert.assertTrue(file.pendingCallbackList.isEmpty()); + } + + factory.stop(); + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java ---------------------------------------------------------------------- diff --git a/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java b/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java index 78154b6..bae871e 100644 --- a/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java +++ b/artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ActiveMQCreatePlugin.java @@ -250,6 +250,7 @@ public class ActiveMQCreatePlugin extends AbstractMojo add(listCommands, "--failover-on-shutdown"); } + add(listCommands, "--no-sync-test"); add(listCommands, "--verbose"); add(listCommands, instance.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/bin/libartemis-native-32.so ---------------------------------------------------------------------- diff --git a/artemis-native/bin/libartemis-native-32.so b/artemis-native/bin/libartemis-native-32.so index 7178069..df4b560 100755 Binary files a/artemis-native/bin/libartemis-native-32.so and b/artemis-native/bin/libartemis-native-32.so differ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/bin/libartemis-native-64.so ---------------------------------------------------------------------- diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so index 1c4983c..aec757a 100755 Binary files a/artemis-native/bin/libartemis-native-64.so and b/artemis-native/bin/libartemis-native-64.so differ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-native/pom.xml b/artemis-native/pom.xml index abd5f0d..0206166 100644 --- a/artemis-native/pom.xml +++ b/artemis-native/pom.xml @@ -32,6 +32,30 @@ <artifactId>artemis-commons</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-processor</artifactId> + <scope>provided</scope> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging</artifactId> + </dependency> + <dependency> + <groupId>org.jboss.logmanager</groupId> + <artifactId>jboss-logmanager</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AIOController.cpp ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/AIOController.cpp b/artemis-native/src/main/c/AIOController.cpp deleted file mode 100644 index a61bf04..0000000 --- a/artemis-native/src/main/c/AIOController.cpp +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include <string> -#include "AIOController.h" -#include "JavaUtilities.h" -#include "JAIODatatypes.h" - -AIOController::AIOController(std::string fileName, int maxIO) : logger(0), fileOutput(fileName, this, maxIO) -{ -} - -void AIOController::log(THREAD_CONTEXT threadContext, short level, const char * message) -{ - jmethodID methodID = 0; - - switch (level) - { - case 0: methodID = loggerError; break; - case 1: methodID = loggerWarn; break; - case 2: methodID = loggerInfo; break; - case 3: methodID = loggerDebug; break; - default: methodID = loggerDebug; break; - } - -#ifdef DEBUG - fprintf (stderr,"Callig log methodID=%ld, message=%s, logger=%ld, threadContext = %ld\n", (long) methodID, message, (long) logger, (long) threadContext); fflush(stderr); -#endif - threadContext->CallVoidMethod(logger,methodID,threadContext->NewStringUTF(message)); -} - - -void AIOController::destroy(THREAD_CONTEXT context) -{ - if (logger != 0) - { - context->DeleteGlobalRef(logger); - } -} - -/* - * level = 0-error, 1-warn, 2-info, 3-debug - */ - - -AIOController::~AIOController() -{ -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AIOController.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/AIOController.h b/artemis-native/src/main/c/AIOController.h deleted file mode 100644 index 913565f..0000000 --- a/artemis-native/src/main/c/AIOController.h +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef AIOCONTROLLER_H_ -#define AIOCONTROLLER_H_ -#include <jni.h> -#include <string> -#include "JAIODatatypes.h" -#include "AsyncFile.h" - -class AIOController -{ -public: - jmethodID done; - jmethodID error; - - jobject logger; - - jmethodID loggerError; - jmethodID loggerWarn; - jmethodID loggerDebug; - jmethodID loggerInfo; - - /* - * level = 0-error, 1-warn, 2-info, 3-debug - */ - void log(THREAD_CONTEXT threadContext, short level, const char * message); - - AsyncFile fileOutput; - - void destroy(THREAD_CONTEXT context); - - AIOController(std::string fileName, int maxIO); - virtual ~AIOController(); -}; -#endif /*AIOCONTROLLER_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AIOException.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/AIOException.h b/artemis-native/src/main/c/AIOException.h deleted file mode 100644 index 98745a2..0000000 --- a/artemis-native/src/main/c/AIOException.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - - -#ifndef AIOEXCEPTION_H_ -#define AIOEXCEPTION_H_ - -#include <exception> -#include <string> - - -#define NATIVE_ERROR_INTERNAL 200 -#define NATIVE_ERROR_INVALID_BUFFER 201 -#define NATIVE_ERROR_NOT_ALIGNED 202 -#define NATIVE_ERROR_CANT_INITIALIZE_AIO 203 -#define NATIVE_ERROR_CANT_RELEASE_AIO 204 -#define NATIVE_ERROR_CANT_OPEN_CLOSE_FILE 205 -#define NATIVE_ERROR_CANT_ALLOCATE_QUEUE 206 -#define NATIVE_ERROR_PREALLOCATE_FILE 208 -#define NATIVE_ERROR_ALLOCATE_MEMORY 209 -#define NATIVE_ERROR_IO 006 -#define NATIVE_ERROR_AIO_FULL 211 - - -class AIOException : public std::exception -{ -private: - int errorCode; - std::string message; -public: - AIOException(int _errorCode, std::string _message) throw() : errorCode(_errorCode), message(_message) - { - errorCode = _errorCode; - message = _message; - } - - AIOException(int _errorCode, const char * _message) throw () - { - message = std::string(_message); - errorCode = _errorCode; - } - - virtual ~AIOException() throw() - { - - } - - int inline getErrorCode() - { - return errorCode; - } - - const char* what() const throw() - { - return message.data(); - } - -}; - -#endif /*AIOEXCEPTION_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AsyncFile.cpp ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/AsyncFile.cpp b/artemis-native/src/main/c/AsyncFile.cpp deleted file mode 100644 index 2385a0d..0000000 --- a/artemis-native/src/main/c/AsyncFile.cpp +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef _GNU_SOURCE -#define _GNU_SOURCE -#endif - - -#include <stdlib.h> -#include <list> -#include <iostream> -#include <sstream> -#include <memory.h> -#include <errno.h> -#include <libaio.h> -#include <fcntl.h> -#include <unistd.h> -#include <sys/stat.h> -#include "AsyncFile.h" -#include "AIOController.h" -#include "AIOException.h" -#include "pthread.h" -#include "LockClass.h" -#include "CallbackAdapter.h" -#include "LockClass.h" - -//#define DEBUG - -#define WAIT_FOR_SPOT 10000 -#define TRIES_BEFORE_WARN 0 -#define TRIES_BEFORE_ERROR 500 - - -std::string io_error(int rc) -{ - std::stringstream buffer; - - if (rc == -ENOSYS) - buffer << "AIO not in this kernel"; - else - buffer << "Error:= " << strerror((int)-rc); - - return buffer.str(); -} - - -AsyncFile::AsyncFile(std::string & _fileName, AIOController * _controller, int _maxIO) : aioContext(0), events(0), fileHandle(0), controller(_controller), pollerRunning(0) -{ - ::pthread_mutex_init(&fileMutex,0); - ::pthread_mutex_init(&pollerMutex,0); - - maxIO = _maxIO; - fileName = _fileName; - if (io_queue_init(maxIO, &aioContext)) - { - throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio, out of AIO Handlers"); - } - - fileHandle = ::open(fileName.data(), O_RDWR | O_CREAT | O_DIRECT, 0666); - if (fileHandle < 0) - { - io_queue_release(aioContext); - throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file"); - } - -#ifdef DEBUG - fprintf (stderr,"File Handle %d", fileHandle); -#endif - - events = (struct io_event *)malloc (maxIO * sizeof (struct io_event)); - - if (events == 0) - { - throw AIOException (NATIVE_ERROR_CANT_ALLOCATE_QUEUE, "Can't allocate ioEvents"); - } - -} - -AsyncFile::~AsyncFile() -{ - if (io_queue_release(aioContext)) - { - throw AIOException(NATIVE_ERROR_CANT_RELEASE_AIO,"Can't release aio"); - } - if (::close(fileHandle)) - { - throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE,"Can't close file"); - } - free(events); - ::pthread_mutex_destroy(&fileMutex); - ::pthread_mutex_destroy(&pollerMutex); -} - -int isException (THREAD_CONTEXT threadContext) -{ - return JNI_ENV(threadContext)->ExceptionOccurred() != 0; -} - -void AsyncFile::pollEvents(THREAD_CONTEXT threadContext) -{ - - LockClass lock(&pollerMutex); - pollerRunning=1; - - - while (pollerRunning) - { - if (isException(threadContext)) - { - return; - } - int result = io_getevents(this->aioContext, 1, maxIO, events, 0); - - -#ifdef DEBUG - fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr); -#endif - - if (result > 0) - { - -#ifdef DEBUG - fprintf (stdout, "Received %d events\n", result); - fflush(stdout); -#endif - } - - for (int i=0; i<result; i++) - { - - struct iocb * iocbp = events[i].obj; - - if (iocbp->data == (void *) -1) - { - pollerRunning = 0; -#ifdef DEBUG - controller->log(threadContext, 2, "Received poller request to stop"); -#endif - } - else - { - CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data; - - long result = events[i].res; - if (result < 0) - { - std::string strerror = io_error((int)result); - adapter->onError(threadContext, result, strerror); - } - else - { - adapter->done(threadContext); - } - } - - delete iocbp; - } - } -#ifdef DEBUG - controller->log(threadContext, 2, "Poller finished execution"); -#endif -} - - -void AsyncFile::preAllocate(THREAD_CONTEXT , off_t position, int blocks, size_t size, int fillChar) -{ - - if (size % ALIGNMENT != 0) - { - throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "You can only pre allocate files in multiples of 512"); - } - - void * preAllocBuffer = 0; - if (posix_memalign(&preAllocBuffer, 512, size)) - { - throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign"); - } - - memset(preAllocBuffer, fillChar, size); - - - if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file"); - - for (int i=0; i<blocks; i++) - { - if (::write(fileHandle, preAllocBuffer, size)<0) - { - throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "Error pre allocating the file"); - } - } - - if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (NATIVE_ERROR_IO, "Error positioning the file"); - - free (preAllocBuffer); -} - - -/** Write directly to the file without using libaio queue */ -void AsyncFile::writeInternal(THREAD_CONTEXT, long position, size_t size, void *& buffer) -{ - if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file"); - - if (::write(fileHandle, buffer, size)<0) - { - throw AIOException (NATIVE_ERROR_IO, "Error writing file"); - } - - if (::fsync(fileHandle) < 0) - { - throw AIOException (NATIVE_ERROR_IO, "Error on synchronizing file"); - } - - -} - - -void AsyncFile::write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter) -{ - - struct iocb * iocb = new struct iocb(); - ::io_prep_pwrite(iocb, fileHandle, buffer, size, position); - iocb->data = (void *) adapter; - - int tries = 0; - int result = 0; - - while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN)) - { -#ifdef DEBUG - fprintf (stderr, "Retrying block as iocb was full (retry=%d)\n", tries); -#endif - tries ++; - if (tries > TRIES_BEFORE_WARN) - { -#ifdef DEBUG - fprintf (stderr, "Warning level on retries, informing logger (retry=%d)\n", tries); -#endif - controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times"); - } - - if (tries > TRIES_BEFORE_ERROR) - { -#ifdef DEBUG - fprintf (stderr, "Error level on retries, throwing exception (retry=%d)\n", tries); -#endif - throw AIOException(NATIVE_ERROR_AIO_FULL, "Too many retries (500) waiting for a valid iocb block, please increase MAX_IO limit"); - } - ::usleep(WAIT_FOR_SPOT); - } - - if (result<0) - { - std::stringstream str; - str<< "Problem on submit block, errorCode=" << result; - throw AIOException (NATIVE_ERROR_IO, str.str()); - } -} - -void AsyncFile::read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter) -{ - - struct iocb * iocb = new struct iocb(); - ::io_prep_pread(iocb, fileHandle, buffer, size, position); - iocb->data = (void *) adapter; - - int tries = 0; - int result = 0; - - while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN)) - { -#ifdef DEBUG - fprintf (stderr, "Retrying block as iocb was full (retry=%d)\n", tries); -#endif - tries ++; - if (tries > TRIES_BEFORE_WARN) - { -#ifdef DEBUG - fprintf (stderr, "Warning level on retries, informing logger (retry=%d)\n", tries); -#endif - controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times"); - } - - if (tries > TRIES_BEFORE_ERROR) - { -#ifdef DEBUG - fprintf (stderr, "Error level on retries, throwing exception (retry=%d)\n", tries); -#endif - throw AIOException(NATIVE_ERROR_AIO_FULL, "Too many retries (500) waiting for a valid iocb block, please increase MAX_IO limit"); - } - ::usleep(WAIT_FOR_SPOT); - } - - if (result<0) - { - std::stringstream str; - str<< "Problem on submit block, errorCode=" << result; - throw AIOException (NATIVE_ERROR_IO, str.str()); - } -} - -long AsyncFile::getSize() -{ - struct stat statBuffer; - - if (fstat(fileHandle, &statBuffer) < 0) - { - return -1l; - } - return statBuffer.st_size; -} - - -void AsyncFile::stopPoller(THREAD_CONTEXT threadContext) -{ - pollerRunning = 0; - - - struct iocb * iocb = new struct iocb(); - ::io_prep_pwrite(iocb, fileHandle, 0, 0, 0); - iocb->data = (void *) -1; - - int result = 0; - - while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN)) - { - fprintf(stderr, "Couldn't send request to stop poller, trying again"); - controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again"); - ::usleep(WAIT_FOR_SPOT); - } - - // Waiting the Poller to finish (by giving up the lock) - LockClass lock(&pollerMutex); -} - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/AsyncFile.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/AsyncFile.h b/artemis-native/src/main/c/AsyncFile.h deleted file mode 100644 index 71281c9..0000000 --- a/artemis-native/src/main/c/AsyncFile.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FILEOUTPUT_H_ -#define FILEOUTPUT_H_ - -#include <string> -#include <libaio.h> -#include <stdlib.h> -#include <pthread.h> -#include "JAIODatatypes.h" -#include "AIOException.h" - -class AIOController; - -class CallbackAdapter; - -/** Author: Clebert Suconic at Redhat dot com*/ -class AsyncFile -{ -private: - io_context_t aioContext; - struct io_event *events; - int fileHandle; - std::string fileName; - - pthread_mutex_t fileMutex; - pthread_mutex_t pollerMutex; - - AIOController * controller; - - bool pollerRunning; - - int maxIO; - -public: - AsyncFile(std::string & _fileName, AIOController * controller, int maxIO); - virtual ~AsyncFile(); - - void write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter); - - /** Write directly to the file without using libaio queue */ - void writeInternal(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer); - - void read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter); - - int getHandle() - { - return fileHandle; - } - - long getSize(); - - inline void * newBuffer(int size) - { - void * buffer = 0; - if (::posix_memalign(&buffer, 512, size)) - { - throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign"); - } - return buffer; - - } - - inline void destroyBuffer(void * buffer) - { - ::free(buffer); - } - - - // Finishes the polling thread (if any) and return - void stopPoller(THREAD_CONTEXT threadContext); - void preAllocate(THREAD_CONTEXT threadContext, off_t position, int blocks, size_t size, int fillChar); - - void pollEvents(THREAD_CONTEXT threadContext); - -}; - -#endif /*FILEOUTPUT_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/CMakeLists.txt b/artemis-native/src/main/c/CMakeLists.txt index 0a45fd6..beef8da 100644 --- a/artemis-native/src/main/c/CMakeLists.txt +++ b/artemis-native/src/main/c/CMakeLists.txt @@ -30,35 +30,21 @@ endif() # you may want to remove this next line for debugging # -O3 would make inline debug hard +#ADD_DEFINITIONS("-O3 -Wall -z execstack") ADD_DEFINITIONS("-O3 -Wall") -#ADD_DEFINITIONS("-fdump-tree-all -Wall -pg -g -mstack-protector-guard=guard") +#ADD_DEFINITIONS("-fdump-tree-all -Wall -pg -g") find_library(LIBAIO NAMES aio) INCLUDE_DIRECTORIES(. ${JNI_INCLUDE_DIRS}) ADD_CUSTOM_COMMAND( - OUTPUT org_apache_activemq_artemis_core_libaio_Native.h - COMMAND javah -cp ../java/ org.apache.activemq.artemis.core.libaio.Native - DEPENDS ../java/org/apache/activemq/artemis/core/libaio/Native.java + OUTPUT org_apache_activemq_artemis_jlibaio_LibaioContext.h + COMMAND javah -cp ../java/ org.apache.activemq.artemis.jlibaio.LibaioContext + DEPENDS ../java/org/apache/activemq/artemis/jlibaio/LibaioContext.java ) -ADD_LIBRARY(artemis-native SHARED - AIOController.cpp - AIOController.h - AIOException.h - AsyncFile.cpp - AsyncFile.h - CallbackAdapter.h - JAIODatatypes.h - JavaUtilities.cpp - JavaUtilities.h - JNI_AsynchronousFileImpl.cpp - JNICallbackAdapter.cpp - JNICallbackAdapter.h - LockClass.h - Version.h - org_apache_activemq_artemis_core_libaio_Native.h) +ADD_LIBRARY(artemis-native SHARED org_apache_activemq_artemis_jlibaio_LibaioContext.c org_apache_activemq_artemis_jlibaio_LibaioContext.h exception_helper.h) target_link_libraries(artemis-native aio) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/CallbackAdapter.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/CallbackAdapter.h b/artemis-native/src/main/c/CallbackAdapter.h deleted file mode 100644 index e8b67da..0000000 --- a/artemis-native/src/main/c/CallbackAdapter.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef BUFFERADAPTER_H_ -#define BUFFERADAPTER_H_ - -#include <iostream> - -#include "JAIODatatypes.h" - -class CallbackAdapter -{ -private: - -public: - CallbackAdapter() - { - - } - virtual ~CallbackAdapter() - { - - } - - virtual void done(THREAD_CONTEXT ) = 0; - virtual void onError(THREAD_CONTEXT , long , std::string )=0; -}; -#endif /*BUFFERADAPTER_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JAIODatatypes.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/JAIODatatypes.h b/artemis-native/src/main/c/JAIODatatypes.h deleted file mode 100644 index b611c2b..0000000 --- a/artemis-native/src/main/c/JAIODatatypes.h +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef JAIODATATYPES_H_ -#define JAIODATATYPES_H_ - -#include <jni.h> - -#define THREAD_CONTEXT JNIEnv *& -#define JNI_ENV(pointer) pointer -#define ALIGNMENT 512 - - -#endif /*JAIODATATYPES_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JNICallbackAdapter.cpp ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/JNICallbackAdapter.cpp b/artemis-native/src/main/c/JNICallbackAdapter.cpp deleted file mode 100644 index 0f4cef4..0000000 --- a/artemis-native/src/main/c/JNICallbackAdapter.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <jni.h> -#include "JNICallbackAdapter.h" -#include <iostream> -#include "JavaUtilities.h" - -jobject nullObj = NULL; - -JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter() -{ - controller = _controller; - - sequence = _sequence; - - callback = _callback; - - fileController = _fileController; - - bufferReference = _bufferReference; - - isRead = _isRead; - -} - -JNICallbackAdapter::~JNICallbackAdapter() -{ -} - -void JNICallbackAdapter::done(THREAD_CONTEXT threadContext) -{ - JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, sequence, isRead ? nullObj : bufferReference); - - release(threadContext); -} - -void JNICallbackAdapter::onError(THREAD_CONTEXT threadContext, long errorCode, std::string error) -{ - controller->log(threadContext, 0, "Libaio event generated errors, callback object was informed about it"); - - jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data()); - - JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, sequence, isRead ? nullObj : bufferReference, (jint)errorCode, strError); - - release(threadContext); -} - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JNICallbackAdapter.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/JNICallbackAdapter.h b/artemis-native/src/main/c/JNICallbackAdapter.h deleted file mode 100644 index 5d32620..0000000 --- a/artemis-native/src/main/c/JNICallbackAdapter.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef JNIBUFFERADAPTER_H_ -#define JNIBUFFERADAPTER_H_ - -#include <iostream> - -#include "CallbackAdapter.h" -#include "AIOController.h" -#include "JAIODatatypes.h" - - -class JNICallbackAdapter : public CallbackAdapter -{ -private: - - AIOController * controller; - - jobject callback; - - jobject fileController; - - jobject bufferReference; - - jlong sequence; - - // Is this a read operation - short isRead; - - void release(THREAD_CONTEXT threadContext) - { - JNI_ENV(threadContext)->DeleteGlobalRef(callback); - JNI_ENV(threadContext)->DeleteGlobalRef(fileController); - JNI_ENV(threadContext)->DeleteGlobalRef(bufferReference); - delete this; - return; - } - - -public: - // _ob must be a global Reference (use createGloblReferente before calling the constructor) - JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead); - virtual ~JNICallbackAdapter(); - - void done(THREAD_CONTEXT threadContext); - - void onError(THREAD_CONTEXT , long , std::string ); - - -}; -#endif /*JNIBUFFERADAPTER_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JNI_AsynchronousFileImpl.cpp ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/JNI_AsynchronousFileImpl.cpp b/artemis-native/src/main/c/JNI_AsynchronousFileImpl.cpp deleted file mode 100644 index 0334a7c..0000000 --- a/artemis-native/src/main/c/JNI_AsynchronousFileImpl.cpp +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <jni.h> -#include <stdlib.h> -#include <iostream> -#include <stdio.h> -#include <fcntl.h> -#include <unistd.h> -#include <string> -#include <time.h> -#include <sys/file.h> - -#include "org_apache_activemq_artemis_core_libaio_Native.h" - - -#include "JavaUtilities.h" -#include "AIOController.h" -#include "JNICallbackAdapter.h" -#include "AIOException.h" -#include "Version.h" - - -// This value is set here globally, to avoid passing stuff on stack between java and the native layer on every sleep call -struct timespec nanoTime; - -inline AIOController * getController(JNIEnv *env, jobject & controllerAddress) -{ - return (AIOController *) env->GetDirectBufferAddress(controllerAddress); -} - -/* Inaccessible static: log */ -/* Inaccessible static: totalMaxIO */ -/* Inaccessible static: loaded */ -/* Inaccessible static: EXPECTED_NATIVE_VERSION */ -/* - * Class: org.apache.activemq.artemis_core_asyncio_impl_AsynchronousFileImpl - * Method: openFile - * Signature: (Ljava/lang/String;)I - */ -JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_openFile - (JNIEnv * env , jclass , jstring jstrFileName) -{ - std::string fileName = convertJavaString(env, jstrFileName); - - return open(fileName.data(), O_RDWR | O_CREAT, 0666); -} - -/* - * Class: org.apache.activemq.artemis_core_asyncio_impl_AsynchronousFileImpl - * Method: closeFile - * Signature: (I)V - */ -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_closeFile - (JNIEnv * , jclass , jint handle) -{ - close(handle); -} - -/* - * Class: org.apache.activemq.artemis_core_asyncio_impl_AsynchronousFileImpl - * Method: flock - * Signature: (I)Z - */ -JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_flock - (JNIEnv * , jclass , jint handle) -{ - return flock(handle, LOCK_EX | LOCK_NB) == 0; -} - - - -/* - * Class: org_jboss_jaio_libaioimpl_LibAIOController - * Method: init - * Signature: (Ljava/lang/String;Ljava/lang/Class;)J - */ -JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_init - (JNIEnv * env, jclass, jclass controllerClazz, jstring jstrFileName, jint maxIO, jobject logger) -{ - AIOController * controller = 0; - try - { - std::string fileName = convertJavaString(env, jstrFileName); - - controller = new AIOController(fileName, (int) maxIO); - controller->done = env->GetMethodID(controllerClazz,"callbackDone","(Lorg/apache/activemq/artemis/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V"); - if (!controller->done) - { - throwException (env, -1, "can't get callbackDone method"); - return 0; - } - - controller->error = env->GetMethodID(controllerClazz, "callbackError", "(Lorg/apache/activemq/artemis/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V"); - if (!controller->done) - { - throwException (env, -1, "can't get callbackError method"); - return 0; - } - - jclass loggerClass = env->GetObjectClass(logger); - - if (!(controller->loggerDebug = env->GetMethodID(loggerClass, "debug", "(Ljava/lang/Object;)V"))) return 0; - if (!(controller->loggerWarn = env->GetMethodID(loggerClass, "warn", "(Ljava/lang/Object;)V"))) return 0; - if (!(controller->loggerInfo = env->GetMethodID(loggerClass, "info", "(Ljava/lang/Object;)V"))) return 0; - if (!(controller->loggerError = env->GetMethodID(loggerClass, "error", "(Ljava/lang/Object;)V"))) return 0; - - controller->logger = env->NewGlobalRef(logger); - - return env->NewDirectByteBuffer(controller, 0); - } - catch (AIOException& e){ - if (controller != 0) - { - delete controller; - } - throwException(env, e.getErrorCode(), e.what()); - return 0; - } -} - -/** -* objThis here is passed as a parameter at the java layer. It used to be a JNI this and now it's a java static method - where the intended reference is now passed as an argument -*/ -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_read - (JNIEnv *env, jclass, jobject objThis, jobject controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback) -{ - try - { - AIOController * controller = getController(env, controllerAddress); - void * buffer = env->GetDirectBufferAddress(jbuffer); - - if (buffer == 0) - { - throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); - return; - } - - if (((long)buffer) % 512) - { - throwException(env, NATIVE_ERROR_NOT_ALIGNED, "Buffer not aligned for use with DMA"); - return; - } - - CallbackAdapter * adapter = new JNICallbackAdapter(controller, -1, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), true); - - controller->fileOutput.read(env, position, (size_t)size, buffer, adapter); - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - } -} - - -// Fast memset on buffer -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_resetBuffer - (JNIEnv *env, jclass, jobject jbuffer, jint size) -{ - void * buffer = env->GetDirectBufferAddress(jbuffer); - - if (buffer == 0) - { - throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); - return; - } - - memset(buffer, 0, (size_t)size); - -} - -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_destroyBuffer - (JNIEnv * env, jclass, jobject jbuffer) -{ - if (jbuffer == 0) - { - throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Null Buffer"); - return; - } - void * buffer = env->GetDirectBufferAddress(jbuffer); - free(buffer); -} - -JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_newNativeBuffer - (JNIEnv * env, jclass, jlong size) -{ - try - { - - if (size % ALIGNMENT) - { - throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512"); - return 0; - } - - - // This will allocate a buffer, aligned by 512. - // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory - void * buffer = 0; - if (::posix_memalign(&buffer, 512, size)) - { - throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign"); - return 0; - } - - memset(buffer, 0, (size_t)size); - - jobject jbuffer = env->NewDirectByteBuffer(buffer, size); - return jbuffer; - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - return 0; - } -} - -/** -* objThis here is passed as a parameter at the java layer. It used to be a JNI this and now it's a java static method - where the intended reference is now passed as an argument -*/ -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_write - (JNIEnv *env, jclass, jobject objThis, jobject controllerAddress, jlong sequence, jlong position, jlong size, jobject jbuffer, jobject callback) -{ - try - { - AIOController * controller = getController(env, controllerAddress); - void * buffer = env->GetDirectBufferAddress(jbuffer); - - if (buffer == 0) - { - throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); - return; - } - - - CallbackAdapter * adapter = new JNICallbackAdapter(controller, sequence, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), false); - - controller->fileOutput.write(env, position, (size_t)size, buffer, adapter); - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - } -} - -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_writeInternal - (JNIEnv * env, jclass, jobject controllerAddress, jlong positionToWrite, jlong size, jobject jbuffer) -{ - try - { - AIOController * controller = getController(env, controllerAddress); - void * buffer = env->GetDirectBufferAddress(jbuffer); - - if (buffer == 0) - { - throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer"); - return; - } - - controller->fileOutput.writeInternal(env, positionToWrite, (size_t)size, buffer); - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - } -} - - -JNIEXPORT void Java_org_apache_activemq_artemis_core_libaio_Native_internalPollEvents - (JNIEnv *env, jclass, jobject controllerAddress) -{ - try - { - AIOController * controller = getController(env, controllerAddress); - controller->fileOutput.pollEvents(env); - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - } -} - -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_stopPoller - (JNIEnv *env, jclass, jobject controllerAddress) -{ - try - { - AIOController * controller = getController(env, controllerAddress); - controller->fileOutput.stopPoller(env); - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - } -} - -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_closeInternal - (JNIEnv *env, jclass, jobject controllerAddress) -{ - try - { - AIOController * controller = getController(env, controllerAddress); - controller->destroy(env); - delete controller; - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - } -} - - -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_fill - (JNIEnv * env, jclass, jobject controllerAddress, jlong position, jint blocks, jlong size, jbyte fillChar) -{ - try - { - AIOController * controller = getController(env, controllerAddress); - - controller->fileOutput.preAllocate(env, position, blocks, size, fillChar); - - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - } -} - - - -/** It does nothing... just return true to make sure it has all the binary dependencies */ -JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_getNativeVersion - (JNIEnv *, jclass) - -{ - return _VERSION_NATIVE_AIO; -} - - -JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_core_libaio_Native_size0 - (JNIEnv * env, jclass, jobject controllerAddress) -{ - try - { - AIOController * controller = getController(env, controllerAddress); - - long size = controller->fileOutput.getSize(); - if (size < 0) - { - throwException(env, NATIVE_ERROR_INTERNAL, "InternalError on Native Layer: method size failed"); - return -1l; - } - return size; - } - catch (AIOException& e) - { - throwException(env, e.getErrorCode(), e.what()); - return -1l; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JavaUtilities.cpp ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/JavaUtilities.cpp b/artemis-native/src/main/c/JavaUtilities.cpp deleted file mode 100644 index 10d6099..0000000 --- a/artemis-native/src/main/c/JavaUtilities.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <stdio.h> -#include <iostream> -#include <string> -#include "JavaUtilities.h" - - -void throwRuntimeException(JNIEnv * env, const char * message) -{ - jclass exceptionClass = env->FindClass("java/lang/RuntimeException"); - env->ThrowNew(exceptionClass,message); - -} - -void throwException(JNIEnv * env, const int code, const char * message) -{ - jclass exceptionClass = env->FindClass("org/apache/activemq/artemis/api/core/ActiveMQException"); - if (exceptionClass==NULL) - { - std::cerr << "Couldn't throw exception message:= " << message << "\n"; - throwRuntimeException (env, "Can't find Exception class"); - return; - } - - jmethodID constructor = env->GetMethodID(exceptionClass, "<init>", "(ILjava/lang/String;)V"); - if (constructor == NULL) - { - std::cerr << "Couldn't find the constructor ***"; - throwRuntimeException (env, "Can't find Constructor for Exception"); - return; - } - - jstring strError = env->NewStringUTF(message); - jthrowable ex = (jthrowable)env->NewObject(exceptionClass, constructor, code, strError); - env->Throw(ex); - -} - -std::string convertJavaString(JNIEnv * env, jstring& jstr) -{ - const char * valueStr = env->GetStringUTFChars(jstr, NULL); - std::string data(valueStr); - env->ReleaseStringUTFChars(jstr, valueStr); - return data; -} - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/JavaUtilities.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/JavaUtilities.h b/artemis-native/src/main/c/JavaUtilities.h deleted file mode 100644 index 53ba870..0000000 --- a/artemis-native/src/main/c/JavaUtilities.h +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef JAVAUTILITIES_H_ -#define JAVAUTILITIES_H_ -#include <string> -#include <jni.h> - -void throwException(JNIEnv * env, const int code, const char * message); -std::string convertJavaString(JNIEnv * env, jstring& jstr); - -#endif /*JAVAUTILITIES_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/LockClass.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/LockClass.h b/artemis-native/src/main/c/LockClass.h deleted file mode 100644 index 5259919..0000000 --- a/artemis-native/src/main/c/LockClass.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LOCKCLASS_H_ -#define LOCKCLASS_H_ - -#include <pthread.h> - -class LockClass -{ -protected: - pthread_mutex_t* _m; -public: - inline LockClass(pthread_mutex_t* m) : _m(m) - { - ::pthread_mutex_lock(_m); - } - inline ~LockClass() - { - ::pthread_mutex_unlock(_m); - } -}; - - -#endif /*LOCKCLASS_H_*/ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/Version.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/Version.h b/artemis-native/src/main/c/Version.h deleted file mode 100644 index 5b521b3..0000000 --- a/artemis-native/src/main/c/Version.h +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef _VERSION_NATIVE_AIO - -// This definition needs to match org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl.EXPECTED_NATIVE_VERSION -// Or else the native module won't be loaded because of version mismatches -#define _VERSION_NATIVE_AIO 52 -#endif - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-native/src/main/c/exception_helper.h ---------------------------------------------------------------------- diff --git a/artemis-native/src/main/c/exception_helper.h b/artemis-native/src/main/c/exception_helper.h new file mode 100644 index 0000000..d8c7707 --- /dev/null +++ b/artemis-native/src/main/c/exception_helper.h @@ -0,0 +1,23 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +void throwRuntimeException(JNIEnv* env, char* message); +void throwRuntimeExceptionErrorNo(JNIEnv* env, char* message, int errorNumber); +void throwIOException(JNIEnv* env, char* message); +void throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber); +void throwClosedChannelException(JNIEnv* env); +void throwOutOfMemoryError(JNIEnv* env); +char* exceptionMessage(char* msg, int error);
