http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java new file mode 100644 index 0000000..61cd0fd --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; + +/** + * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories + */ +public abstract class AbstractSequentialFileFactory implements SequentialFileFactory +{ + + // Timeout used to wait executors to shutdown + protected static final int EXECUTOR_TIMEOUT = 60; + + protected final File journalDir; + + protected final TimedBuffer timedBuffer; + + protected final int bufferSize; + + protected final long bufferTimeout; + + protected final int maxIO; + + private final IOCriticalErrorListener critialErrorListener; + + /** + * Asynchronous writes need to be done at another executor. + * This needs to be done at NIO, or else we would have the callers thread blocking for the return. + * At AIO this is necessary as context switches on writes would fire flushes at the kernel. + * */ + protected ExecutorService writeExecutor; + + protected AbstractSequentialFileFactory(final File journalDir, + final boolean buffered, + final int bufferSize, + final int bufferTimeout, + final int maxIO, + final boolean logRates, + final IOCriticalErrorListener criticalErrorListener) + { + this.journalDir = journalDir; + + if (buffered && bufferTimeout > 0) + { + timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates); + } + else + { + timedBuffer = null; + } + this.bufferSize = bufferSize; + this.bufferTimeout = bufferTimeout; + this.critialErrorListener = criticalErrorListener; + this.maxIO = maxIO; + } + + public void stop() + { + if (timedBuffer != null) + { + timedBuffer.stop(); + } + + if (isSupportsCallbacks() && writeExecutor != null) + { + writeExecutor.shutdown(); + + try + { + if (!writeExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) + { + ActiveMQJournalLogger.LOGGER.timeoutOnWriterShutdown(new Exception("trace")); + } + } + catch (InterruptedException e) + { + throw new ActiveMQInterruptedException(e); + } + } + } + + @Override + public File getDirectory() + { + return journalDir; + } + + public void start() + { + if (timedBuffer != null) + { + timedBuffer.start(); + } + + if (isSupportsCallbacks()) + { + writeExecutor = Executors.newSingleThreadExecutor(new ActiveMQThreadFactory("ActiveMQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this), + true, + AbstractSequentialFileFactory.getThisClassLoader())); + } + } + + public int getMaxIO() + { + return maxIO; + } + + @Override + public void onIOError(Exception exception, String message, SequentialFile file) + { + if (critialErrorListener != null) + { + critialErrorListener.onIOException(exception, message, file); + } + } + + @Override + public void activateBuffer(final SequentialFile file) + { + if (timedBuffer != null) + { + file.setTimedBuffer(timedBuffer); + } + } + + public void flush() + { + if (timedBuffer != null) + { + timedBuffer.flush(); + } + } + + public void deactivateBuffer() + { + if (timedBuffer != null) + { + // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer + timedBuffer.flush(); + timedBuffer.setObserver(null); + } + } + + public void releaseBuffer(final ByteBuffer buffer) + { + } + + /** + * Create the directory if it doesn't exist yet + */ + public void createDirs() throws Exception + { + boolean ok = journalDir.mkdirs(); + if (!ok) + { + throw new IOException("Failed to create directory " + journalDir); + } + } + + public List<String> listFiles(final String extension) throws Exception + { + FilenameFilter fnf = new FilenameFilter() + { + public boolean accept(final File file, final String name) + { + return name.endsWith("." + extension); + } + }; + + String[] fileNames = journalDir.list(fnf); + + if (fileNames == null) + { + return Collections.EMPTY_LIST; + } + + return Arrays.asList(fileNames); + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return AbstractSequentialFileFactory.class.getClassLoader(); + } + }); + + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java new file mode 100644 index 0000000..ce21f2a --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DummyCallback.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +import org.apache.activemq.artemis.core.journal.impl.SyncIOCompletion; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; + +public class DummyCallback extends SyncIOCompletion +{ + private static final DummyCallback instance = new DummyCallback(); + + public static DummyCallback getInstance() + { + return DummyCallback.instance; + } + + public void done() + { + } + + public void onError(final int errorCode, final String errorMessage) + { + ActiveMQJournalLogger.LOGGER.errorWritingData(new Exception(errorMessage), errorMessage, errorCode); + } + + @Override + public void waitCompletion() throws Exception + { + } + + @Override + public void storeLineUp() + { + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java new file mode 100644 index 0000000..41470e4 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +/** + * The interface used for AIO Callbacks. + */ +public interface IOCallback +{ + /** + * Method for sync notifications. When this callback method is called, there is a guarantee the data is written on the disk. + * <br><b>Note:</b><i>Leave this method as soon as possible, or you would be blocking the whole notification thread</i> */ + void done(); + + /** + * Method for error notifications. + * Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/ + void onError(int errorCode, String errorMessage); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java new file mode 100644 index 0000000..f2da3e8 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +/** + * TODO Merge this with IOExceptionListener + */ +public interface IOCriticalErrorListener +{ + void onIOException(Exception code, String message, SequentialFile file); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOExceptionListener.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOExceptionListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOExceptionListener.java new file mode 100644 index 0000000..5c855e5 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOExceptionListener.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +public interface IOExceptionListener +{ + void onIOException(Exception exception, String message); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java new file mode 100644 index 0000000..cb9d070 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; + +public interface SequentialFile +{ + + boolean isOpen(); + + boolean exists(); + + void open() throws Exception; + + /** + * The maximum number of simultaneous writes accepted + * @param maxIO + * @throws Exception + */ + void open(int maxIO, boolean useExecutor) throws Exception; + + boolean fits(int size); + + int getAlignment() throws Exception; + + int calculateBlockStart(int position) throws Exception; + + String getFileName(); + + void fill(int size) throws Exception; + + void delete() throws IOException, InterruptedException, ActiveMQException; + + void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception; + + void write(ActiveMQBuffer bytes, boolean sync) throws Exception; + + void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception; + + void write(EncodingSupport bytes, boolean sync) throws Exception; + + /** + * Write directly to the file without using any buffer + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback); + + /** + * Write directly to the file without using intermediate any buffer + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + void writeDirect(ByteBuffer bytes, boolean sync) throws Exception; + + /** + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + int read(ByteBuffer bytes, IOCallback callback) throws Exception; + + /** + * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or + * NIO). To be safe, use a buffer from the corresponding + * {@link SequentialFileFactory#newBuffer(int)}. + */ + int read(ByteBuffer bytes) throws Exception; + + void position(long pos) throws IOException; + + long position(); + + void close() throws Exception; + + void sync() throws IOException; + + long size() throws Exception; + + void renameTo(String newFileName) throws Exception; + + SequentialFile cloneFile(); + + void copyTo(SequentialFile newFileName) throws Exception; + + void setTimedBuffer(TimedBuffer buffer); + + /** + * Returns a native File of the file underlying this sequential file. + */ + File getJavaFile(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java new file mode 100644 index 0000000..b9a72ca --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * + * A SequentialFileFactory + */ +public interface SequentialFileFactory +{ + SequentialFile createSequentialFile(String fileName); + + int getMaxIO(); + + /** + * Lists files that end with the given extension. + * <p> + * This method inserts a ".' before the extension. + * @param extension + * @return + * @throws Exception + */ + List<String> listFiles(String extension) throws Exception; + + boolean isSupportsCallbacks(); + + /** The SequentialFile will call this method when a disk IO Error happens during the live phase. */ + void onIOError(Exception exception, String message, SequentialFile file); + + /** used for cases where you need direct buffer outside of the journal context. + * This is because the native layer has a method that can be reused in certain cases like paging */ + ByteBuffer allocateDirectBuffer(int size); + + /** used for cases where you need direct buffer outside of the journal context. + * This is because the native layer has a method that can be reused in certain cases like paging */ + void releaseDirectBuffer(ByteBuffer buffer); + + /** + * Note: You need to release the buffer if is used for reading operations. You don't need to do + * it if using writing operations (AIO Buffer Lister will take of writing operations) + * @param size + * @return the allocated ByteBuffer + */ + ByteBuffer newBuffer(int size); + + void releaseBuffer(ByteBuffer buffer); + + void activateBuffer(SequentialFile file); + + void deactivateBuffer(); + + // To be used in tests only + ByteBuffer wrapBuffer(byte[] bytes); + + int getAlignment(); + + int calculateBlockSize(int bytes); + + File getDirectory(); + + void clearBuffer(ByteBuffer buffer); + + void start(); + + void stop(); + + /** + * Creates the directory if it does not exist yet. + */ + void createDirs() throws Exception; + + void flush(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java new file mode 100644 index 0000000..7503681 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.aio; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.PriorityQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQNativeIOError; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.AbstractSequentialFile; +import org.apache.activemq.artemis.core.io.DummyCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; +import org.apache.activemq.artemis.jlibaio.LibaioFile; +import org.apache.activemq.artemis.utils.ReusableLatch; + +public class AIOSequentialFile extends AbstractSequentialFile +{ + private boolean opened = false; + + private LibaioFile aioFile; + + private final AIOSequentialFileFactory aioFactory; + + private final ReusableLatch pendingCallbacks = new ReusableLatch(); + + /** + * Used to determine the next writing sequence + */ + private final AtomicLong nextWritingSequence = new AtomicLong(0); + + /** + * AIO can't guarantee ordering over callbacks. + * <br> + * We use this {@link PriorityQueue} to hold values until they are in order + */ + final PriorityQueue<AIOSequentialFileFactory.AIOSequentialCallback> pendingCallbackList = new PriorityQueue<>(); + + /** + * Used to determine the next writing sequence. + * This is accessed from a single thread (the Poller Thread) + */ + private long nextReadSequence = 0; + + + public AIOSequentialFile(final AIOSequentialFileFactory factory, + final int bufferSize, + final long bufferTimeoutMilliseconds, + final File directory, + final String fileName, + final Executor writerExecutor) + { + super(directory, fileName, factory, writerExecutor); + this.aioFactory = factory; + } + + public boolean isOpen() + { + return opened; + } + + public int getAlignment() + { + checkOpened(); + + return aioFile.getBlockSize(); + } + + public int calculateBlockStart(final int position) + { + int alignment = getAlignment(); + + int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; + + return pos; + } + + public SequentialFile cloneFile() + { + return new AIOSequentialFile(aioFactory, + -1, + -1, + getFile().getParentFile(), + getFile().getName(), + writerExecutor); + } + + @Override + public synchronized void close() throws IOException, InterruptedException, ActiveMQException + { + if (!opened) + { + return; + } + + super.close(); + + if (!pendingCallbacks.await(10, TimeUnit.SECONDS)) + { + factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this); + } + + opened = false; + + timedBuffer = null; + + aioFile.close(); + aioFile = null; + } + + + public synchronized void fill(final int size) throws Exception + { + checkOpened(); + aioFile.fill(size); + + fileSize = aioFile.getSize(); + } + + public void open() throws Exception + { + open(aioFactory.getMaxIO(), true); + } + + public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException + { + opened = true; + + try + { + aioFile = aioFactory.libaioContext.openFile(getFile(), true); + } + catch (IOException e) + { + factory.onIOError(e, e.getMessage(), this); + throw new ActiveMQNativeIOError(e.getMessage(), e); + } + + position.set(0); + + fileSize = aioFile.getSize(); + } + + public int read(final ByteBuffer bytes, final IOCallback callback) throws ActiveMQException + { + checkOpened(); + int bytesToRead = bytes.limit(); + + long positionToRead = position.getAndAdd(bytesToRead); + + bytes.rewind(); + + try + { + // We don't send the buffer to the callback on read, + // because we want the buffer available. + // Sending it through the callback would make it released + aioFile.read(positionToRead, bytesToRead, bytes, getCallback(callback, null)); + } + catch (IOException e) + { + factory.onIOError(e, e.getMessage(), this); + throw new ActiveMQNativeIOError(e.getMessage(), e); + } + + return bytesToRead; + } + + public int read(final ByteBuffer bytes) throws Exception + { + SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback(); + + int bytesRead = read(bytes, waitCompletion); + + waitCompletion.waitCompletion(); + + return bytesRead; + } + + public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + + writeDirect(bytes, true, completion); + + completion.waitCompletion(); + } + else + { + writeDirect(bytes, false, DummyCallback.getInstance()); + } + } + + /** + * + * Note: Parameter sync is not used on AIO + * */ + public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) + { + checkOpened(); + + final int bytesToWrite = factory.calculateBlockSize(bytes.limit()); + + final long positionToWrite = position.getAndAdd(bytesToWrite); + + AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes); + runnableCallback.initWrite(positionToWrite, bytesToWrite); + if (writerExecutor != null) + { + writerExecutor.execute(runnableCallback); + } + else + { + runnableCallback.run(); + } + } + + + + AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) + { + AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback(); + callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer); + pendingCallbacks.countUp(); + return callback; + } + + + void done(AIOSequentialFileFactory.AIOSequentialCallback callback) + { + if (callback.writeSequence == -1) + { + callback.sequentialDone(); + pendingCallbacks.countDown(); + } + + + if (callback.writeSequence == nextReadSequence) + { + nextReadSequence++; + callback.sequentialDone(); + pendingCallbacks.countDown(); + flushCallbacks(); + } + else + { + pendingCallbackList.add(callback); + } + + } + + private void flushCallbacks() + { + while (!pendingCallbackList.isEmpty() && pendingCallbackList.peek().writeSequence == nextReadSequence) + { + AIOSequentialFileFactory.AIOSequentialCallback callback = pendingCallbackList.poll(); + callback.sequentialDone(); + nextReadSequence++; + pendingCallbacks.countDown(); + } + } + + public void sync() + { + throw new UnsupportedOperationException("This method is not supported on AIO"); + } + + public long size() throws Exception + { + if (aioFile == null) + { + return getFile().length(); + } + else + { + return aioFile.getSize(); + } + } + + @Override + public String toString() + { + return "AIOSequentialFile:" + getFile().getAbsolutePath(); + } + + // Protected methods + // ----------------------------------------------------------------------------------------------------- + + @Override + protected ByteBuffer newBuffer(int size, int limit) + { + size = factory.calculateBlockSize(size); + limit = factory.calculateBlockSize(limit); + + ByteBuffer buffer = factory.newBuffer(size); + buffer.limit(limit); + return buffer; + } + + // Private methods + // ----------------------------------------------------------------------------------------------------- + + private void checkOpened() + { + if (aioFile == null || !opened) + { + throw new NullPointerException("File not opened, file=null"); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java new file mode 100644 index 0000000..39dad2f --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.aio; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.journal.impl.JournalConstants; +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jlibaio.LibaioFile; +import org.apache.activemq.artemis.jlibaio.SubmitInfo; +import org.apache.activemq.artemis.jlibaio.util.CallbackCache; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; + +public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory +{ + private static final boolean trace = ActiveMQJournalLogger.LOGGER.isTraceEnabled(); + + private final ReuseBuffersController buffersControl = new ReuseBuffersController(); + + private volatile boolean reuseBuffers = true; + + private ExecutorService pollerExecutor; + + volatile LibaioContext<AIOSequentialCallback> libaioContext; + + private final CallbackCache<AIOSequentialCallback> callbackPool; + + private final AtomicBoolean running = new AtomicBoolean(false); + + // This method exists just to make debug easier. + // I could replace log.trace by log.info temporarily while I was debugging + // Journal + private static void trace(final String message) + { + ActiveMQJournalLogger.LOGGER.trace(message); + } + + public AIOSequentialFileFactory(final File journalDir, int maxIO) + { + this(journalDir, + JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, + JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, + maxIO, + false, + null); + } + + public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, int maxIO) + { + this(journalDir, + JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, + JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, + maxIO, + false, + listener); + } + + public AIOSequentialFileFactory(final File journalDir, + final int bufferSize, + final int bufferTimeout, + final int maxIO, + final boolean logRates) + { + this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null); + } + + public AIOSequentialFileFactory(final File journalDir, + final int bufferSize, + final int bufferTimeout, + final int maxIO, + final boolean logRates, + final IOCriticalErrorListener listener) + { + super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener); + callbackPool = new CallbackCache<>(maxIO); + } + + public AIOSequentialCallback getCallback() + { + AIOSequentialCallback callback = callbackPool.get(); + if (callback == null) + { + callback = new AIOSequentialCallback(); + } + + return callback; + } + + public void enableBufferReuse() + { + this.reuseBuffers = true; + } + + public void disableBufferReuse() + { + this.reuseBuffers = false; + } + + + public SequentialFile createSequentialFile(final String fileName) + { + return new AIOSequentialFile(this, + bufferSize, + bufferTimeout, + journalDir, + fileName, + writeExecutor); + } + + public boolean isSupportsCallbacks() + { + return true; + } + + public static boolean isSupported() + { + return LibaioContext.isLoaded(); + } + + public ByteBuffer allocateDirectBuffer(final int size) + { + + int blocks = size / 512; + if (size % 512 != 0) + { + blocks++; + } + + // The buffer on AIO has to be a multiple of 512 + ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * 512, 512); + + buffer.limit(size); + + return buffer; + } + + public void releaseDirectBuffer(final ByteBuffer buffer) + { + LibaioContext.freeBuffer(buffer); + } + + public ByteBuffer newBuffer(int size) + { + if (size % 512 != 0) + { + size = (size / 512 + 1) * 512; + } + + return buffersControl.newBuffer(size); + } + + public void clearBuffer(final ByteBuffer directByteBuffer) + { + directByteBuffer.position(0); + libaioContext.memsetBuffer(directByteBuffer); + } + + public int getAlignment() + { + return 512; + } + + // For tests only + public ByteBuffer wrapBuffer(final byte[] bytes) + { + ByteBuffer newbuffer = newBuffer(bytes.length); + newbuffer.put(bytes); + return newbuffer; + } + + public int calculateBlockSize(final int position) + { + int alignment = getAlignment(); + + int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; + + return pos; + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.core.io.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer) + */ + @Override + public synchronized void releaseBuffer(final ByteBuffer buffer) + { + LibaioContext.freeBuffer(buffer); + } + + @Override + public void start() + { + if (running.compareAndSet(false, true)) + { + super.start(); + + this.libaioContext = new LibaioContext(maxIO, true); + + this.running.set(true); + + pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), + true, + AIOSequentialFileFactory.getThisClassLoader())); + + pollerExecutor.execute(new PollerRunnable()); + } + + } + + @Override + public void stop() + { + if (this.running.compareAndSet(true, false)) + { + buffersControl.stop(); + + libaioContext.close(); + libaioContext = null; + + if (pollerExecutor != null) + { + pollerExecutor.shutdown(); + + try + { + if (!pollerExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) + { + ActiveMQJournalLogger.LOGGER.timeoutOnPollerShutdown(new Exception("trace")); + } + } + catch (InterruptedException e) + { + throw new ActiveMQInterruptedException(e); + } + } + + super.stop(); + } + } + + @Override + protected void finalize() + { + stop(); + } + + /** + * The same callback is used for Runnable executor. + * This way we can save some memory over the pool. + */ + public class AIOSequentialCallback implements SubmitInfo, Runnable, Comparable<AIOSequentialCallback> + { + IOCallback callback; + boolean error = false; + AIOSequentialFile sequentialFile; + ByteBuffer buffer; + LibaioFile libaioFile; + String errorMessage; + int errorCode = -1; + long writeSequence; + + long position; + int bytes; + + @Override + public String toString() + { + return "AIOSequentialCallback{" + + "error=" + error + + ", errorMessage='" + errorMessage + '\'' + + ", errorCode=" + errorCode + + ", writeSequence=" + writeSequence + + ", position=" + position + + '}'; + } + + public AIOSequentialCallback initWrite(long positionToWrite, int bytesToWrite) + { + this.position = positionToWrite; + this.bytes = bytesToWrite; + return this; + } + + public void run() + { + try + { + libaioFile.write(position, bytes, buffer, this); + } + catch (IOException e) + { + callback.onError(-1, e.getMessage()); + } + } + + public int compareTo(AIOSequentialCallback other) + { + if (this == other || this.writeSequence == other.writeSequence) + { + return 0; + } + else if (other.writeSequence < this.writeSequence) + { + return 1; + } + else + { + return -1; + } + } + + public AIOSequentialCallback init(long writeSequence, IOCallback IOCallback, LibaioFile libaioFile, AIOSequentialFile sequentialFile, ByteBuffer usedBuffer) + { + this.callback = IOCallback; + this.sequentialFile = sequentialFile; + this.error = false; + this.buffer = usedBuffer; + this.libaioFile = libaioFile; + this.writeSequence = writeSequence; + this.errorMessage = null; + return this; + } + + @Override + public void onError(int errno, String message) + { + this.error = true; + this.errorCode = errno; + this.errorMessage = message; + } + + /** + * this is called by libaio. + */ + public void done() + { + this.sequentialFile.done(this); + } + + /** + * This is callbed by the AIOSequentialFile, after determined the callbacks were returned in sequence + */ + public void sequentialDone() + { + + if (error) + { + callback.onError(errorCode, errorMessage); + errorMessage = null; + } + else + { + if (callback != null) + { + callback.done(); + } + + if (buffer != null && reuseBuffers) + { + buffersControl.bufferDone(buffer); + } + + callbackPool.put(AIOSequentialCallback.this); + } + } + } + + private class PollerRunnable implements Runnable + { + public void run() + { + libaioContext.poll(); + } + } + + /** + * Class that will control buffer-reuse + */ + private class ReuseBuffersController + { + private volatile long bufferReuseLastTime = System.currentTimeMillis(); + + private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>(); + + private boolean stopped = false; + + public ByteBuffer newBuffer(final int size) + { + // if a new buffer wasn't requested in 10 seconds, we clear the queue + // This is being done this way as we don't need another Timeout Thread + // just to cleanup this + if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000) + { + if (AIOSequentialFileFactory.trace) + { + AIOSequentialFileFactory.trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + + " elements"); + } + + bufferReuseLastTime = System.currentTimeMillis(); + + clearPoll(); + } + + // if a buffer is bigger than the configured-bufferSize, we just create a new + // buffer. + if (size > bufferSize) + { + return LibaioContext.newAlignedBuffer(size, 512); + } + else + { + // We need to allocate buffers following the rules of the storage + // being used (AIO/NIO) + int alignedSize = calculateBlockSize(size); + + // Try getting a buffer from the queue... + ByteBuffer buffer = reuseBuffersQueue.poll(); + + if (buffer == null) + { + // if empty create a new one. + buffer = LibaioContext.newAlignedBuffer(size, 512); + + buffer.limit(alignedSize); + } + else + { + clearBuffer(buffer); + + // set the limit of the buffer to the bufferSize being required + buffer.limit(alignedSize); + } + + buffer.rewind(); + + return buffer; + } + } + + public synchronized void stop() + { + stopped = true; + clearPoll(); + } + + public synchronized void clearPoll() + { + ByteBuffer reusedBuffer; + + while ((reusedBuffer = reuseBuffersQueue.poll()) != null) + { + releaseBuffer(reusedBuffer); + } + } + + public void bufferDone(final ByteBuffer buffer) + { + synchronized (this) + { + + if (stopped) + { + releaseBuffer(buffer); + } + else + { + bufferReuseLastTime = System.currentTimeMillis(); + + // If a buffer has any other than the configured bufferSize, the buffer + // will be just sent to GC + if (buffer.capacity() == bufferSize) + { + reuseBuffersQueue.offer(buffer); + } + else + { + releaseBuffer(buffer); + } + } + } + } + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return AIOSequentialFileFactory.class.getClassLoader(); + } + }); + + } + + @Override + public String toString() + { + return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped + + "):" + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/ActiveMQFileLock.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/ActiveMQFileLock.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/ActiveMQFileLock.java new file mode 100644 index 0000000..a184244 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/ActiveMQFileLock.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.aio; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +import org.apache.activemq.artemis.jlibaio.LibaioFile; + +public class ActiveMQFileLock extends FileLock +{ + + private final LibaioFile file; + + public ActiveMQFileLock(final LibaioFile handle) + { + super((FileChannel)null, 0, 0, false); + this.file = handle; + } + + @Override + public boolean isValid() + { + return true; + } + + @Override + public void release() throws IOException + { + file.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java new file mode 100644 index 0000000..a61569a --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.buffer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; + +public class TimedBuffer +{ + // Constants ----------------------------------------------------- + + // The number of tries on sleep before switching to spin + public static final int MAX_CHECKS_ON_SLEEP = 20; + + // Attributes ---------------------------------------------------- + + private TimedBufferObserver bufferObserver; + + // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread + // in spinning and checking the time - and using up CPU in the process - this semaphore is used to + // prevent that + private final Semaphore spinLimiter = new Semaphore(1); + + private CheckTimer timerRunnable = new CheckTimer(); + + private final int bufferSize; + + private final ActiveMQBuffer buffer; + + private int bufferLimit = 0; + + private List<IOCallback> callbacks; + + private volatile int timeout; + + // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen + private volatile boolean pendingSync = false; + + private Thread timerThread; + + private volatile boolean started; + + // We use this flag to prevent flush occurring between calling checkSize and addBytes + // CheckSize must always be followed by it's corresponding addBytes otherwise the buffer + // can get in an inconsistent state + private boolean delayFlush; + + // for logging write rates + + private final boolean logRates; + + private final AtomicLong bytesFlushed = new AtomicLong(0); + + private final AtomicLong flushesDone = new AtomicLong(0); + + private Timer logRatesTimer; + + private TimerTask logRatesTimerTask; + + private boolean useSleep = true; + + // no need to be volatile as every access is synchronized + private boolean spinning = false; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + public TimedBuffer(final int size, final int timeout, final boolean logRates) + { + bufferSize = size; + + this.logRates = logRates; + + if (logRates) + { + logRatesTimer = new Timer(true); + } + // Setting the interval for nano-sleeps + + buffer = ActiveMQBuffers.fixedBuffer(bufferSize); + + buffer.clear(); + + bufferLimit = 0; + + callbacks = new ArrayList<IOCallback>(); + + this.timeout = timeout; + } + + // for Debug purposes + public synchronized boolean isUseSleep() + { + return useSleep; + } + + public synchronized void setUseSleep(boolean useSleep) + { + this.useSleep = useSleep; + } + + public synchronized void start() + { + if (started) + { + return; + } + + // Need to start with the spin limiter acquired + try + { + spinLimiter.acquire(); + } + catch (InterruptedException e) + { + throw new ActiveMQInterruptedException(e); + } + + timerRunnable = new CheckTimer(); + + timerThread = new Thread(timerRunnable, "activemq-buffer-timeout"); + + timerThread.start(); + + if (logRates) + { + logRatesTimerTask = new LogRatesTimerTask(); + + logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000); + } + + started = true; + } + + public void stop() + { + if (!started) + { + return; + } + + flush(); + + bufferObserver = null; + + timerRunnable.close(); + + spinLimiter.release(); + + if (logRates) + { + logRatesTimerTask.cancel(); + } + + while (timerThread.isAlive()) + { + try + { + timerThread.join(); + } + catch (InterruptedException e) + { + throw new ActiveMQInterruptedException(e); + } + } + + started = false; + } + + public synchronized void setObserver(final TimedBufferObserver observer) + { + if (bufferObserver != null) + { + flush(); + } + + bufferObserver = observer; + } + + /** + * Verify if the size fits the buffer + * + * @param sizeChecked + */ + public synchronized boolean checkSize(final int sizeChecked) + { + if (!started) + { + throw new IllegalStateException("TimedBuffer is not started"); + } + + if (sizeChecked > bufferSize) + { + throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize + + ") on the journal"); + } + + if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) + { + // Either there is not enough space left in the buffer for the sized record + // Or a flush has just been performed and we need to re-calcualate bufferLimit + + flush(); + + delayFlush = true; + + final int remainingInFile = bufferObserver.getRemainingBytes(); + + if (sizeChecked > remainingInFile) + { + return false; + } + else + { + // There is enough space in the file for this size + + // Need to re-calculate buffer limit + + bufferLimit = Math.min(remainingInFile, bufferSize); + + return true; + } + } + else + { + delayFlush = true; + + return true; + } + } + + public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) + { + addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback); + } + + public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) + { + if (!started) + { + throw new IllegalStateException("TimedBuffer is not started"); + } + + delayFlush = false; + + bytes.encode(buffer); + + callbacks.add(callback); + + if (sync) + { + pendingSync = true; + + startSpin(); + } + + } + + public void flush() + { + flush(false); + } + + /** + * force means the Journal is moving to a new file. Any pending write need to be done immediately + * or data could be lost + */ + public void flush(final boolean force) + { + synchronized (this) + { + if (!started) + { + throw new IllegalStateException("TimedBuffer is not started"); + } + + if ((force || !delayFlush) && buffer.writerIndex() > 0) + { + int pos = buffer.writerIndex(); + + if (logRates) + { + bytesFlushed.addAndGet(pos); + } + + ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); + + // Putting a byteArray on a native buffer is much faster, since it will do in a single native call. + // Using bufferToFlush.put(buffer) would make several append calls for each byte + // We also transfer the content of this buffer to the native file's buffer + + bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos); + + bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); + + stopSpin(); + + pendingSync = false; + + // swap the instance as the previous callback list is being used asynchronously + callbacks = new LinkedList<IOCallback>(); + + buffer.clear(); + + bufferLimit = 0; + + flushesDone.incrementAndGet(); + } + } + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + + private class LogRatesTimerTask extends TimerTask + { + private boolean closed; + + private long lastExecution; + + private long lastBytesFlushed; + + private long lastFlushesDone; + + @Override + public synchronized void run() + { + if (!closed) + { + long now = System.currentTimeMillis(); + + long bytesF = bytesFlushed.get(); + long flushesD = flushesDone.get(); + + if (lastExecution != 0) + { + double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution); + ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024))); + double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution); + ActiveMQJournalLogger.LOGGER.flushRate(flushRate); + } + + lastExecution = now; + + lastBytesFlushed = bytesF; + + lastFlushesDone = flushesD; + } + } + + @Override + public synchronized boolean cancel() + { + closed = true; + + return super.cancel(); + } + } + + private class CheckTimer implements Runnable + { + private volatile boolean closed = false; + + int checks = 0; + int failedChecks = 0; + long timeBefore = 0; + + final int sleepMillis = timeout / 1000000; // truncates + final int sleepNanos = timeout % 1000000; + + + public void run() + { + long lastFlushTime = 0; + + while (!closed) + { + // We flush on the timer if there are pending syncs there and we've waited at least one + // timeout since the time of the last flush. + // Effectively flushing "resets" the timer + // On the timeout verification, notice that we ignore the timeout check if we are using sleep + + if (pendingSync) + { + if (isUseSleep()) + { + // if using sleep, we will always flush + flush(); + lastFlushTime = System.nanoTime(); + } + else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) + { + // if not using flush we will spin and do the time checks manually + flush(); + lastFlushTime = System.nanoTime(); + } + + } + + sleepIfPossible(); + + try + { + spinLimiter.acquire(); + + Thread.yield(); + + spinLimiter.release(); + } + catch (InterruptedException e) + { + throw new ActiveMQInterruptedException(e); + } + } + } + + /** + * We will attempt to use sleep only if the system supports nano-sleep + * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well. + * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin + */ + private void sleepIfPossible() + { + if (isUseSleep()) + { + if (checks < MAX_CHECKS_ON_SLEEP) + { + timeBefore = System.nanoTime(); + } + + try + { + sleep(sleepMillis, sleepNanos); + } + catch (InterruptedException e) + { + throw new ActiveMQInterruptedException(e); + } + catch (Exception e) + { + setUseSleep(false); + ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e); + } + + if (checks < MAX_CHECKS_ON_SLEEP) + { + long realTimeSleep = System.nanoTime() - timeBefore; + + // I'm letting the real time to be up to 50% than the requested sleep. + if (realTimeSleep > timeout * 1.5) + { + failedChecks++; + } + + if (++checks >= MAX_CHECKS_ON_SLEEP) + { + if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) + { + ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts"); + setUseSleep(false); + } + } + } + } + } + + public void close() + { + closed = true; + } + } + + /** + * Sub classes (tests basically) can use this to override how the sleep is being done + * + * @param sleepMillis + * @param sleepNanos + * @throws InterruptedException + */ + protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException + { + Thread.sleep(sleepMillis, sleepNanos); + } + + /** + * Sub classes (tests basically) can use this to override disabling spinning + */ + protected void stopSpin() + { + if (spinning) + { + try + { + // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning + // when the buffer is inactive + spinLimiter.acquire(); + } + catch (InterruptedException e) + { + throw new ActiveMQInterruptedException(e); + } + + spinning = false; + } + } + + + /** + * Sub classes (tests basically) can use this to override disabling spinning + */ + protected void startSpin() + { + if (!spinning) + { + spinLimiter.release(); + + spinning = true; + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java new file mode 100644 index 0000000..7a9659f --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.buffer; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.activemq.artemis.core.io.IOCallback; + + +public interface TimedBufferObserver +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOCallback> callbacks); + + /** Return the number of remaining bytes that still fit on the observer (file) */ + int getRemainingBytes(); + + ByteBuffer newBuffer(int size, int limit); + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java new file mode 100644 index 0000000..d20045e --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.nio; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; +import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.AbstractSequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; + +public final class NIOSequentialFile extends AbstractSequentialFile +{ + private FileChannel channel; + + private RandomAccessFile rfile; + + /** + * The write semaphore here is only used when writing asynchronously + */ + private Semaphore maxIOSemaphore; + + private final int defaultMaxIO; + + private int maxIO; + + public NIOSequentialFile(final SequentialFileFactory factory, + final File directory, + final String file, + final int maxIO, + final Executor writerExecutor) + { + super(directory, file, factory, writerExecutor); + defaultMaxIO = maxIO; + } + + public int getAlignment() + { + return 1; + } + + public int calculateBlockStart(final int position) + { + return position; + } + + public synchronized boolean isOpen() + { + return channel != null; + } + + /** + * this.maxIO represents the default maxIO. + * Some operations while initializing files on the journal may require a different maxIO + */ + public synchronized void open() throws IOException + { + open(defaultMaxIO, true); + } + + public void open(final int maxIO, final boolean useExecutor) throws IOException + { + try + { + rfile = new RandomAccessFile(getFile(), "rw"); + + channel = rfile.getChannel(); + + fileSize = channel.size(); + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + + if (writerExecutor != null && useExecutor) + { + maxIOSemaphore = new Semaphore(maxIO); + this.maxIO = maxIO; + } + } + + public void fill(final int size) throws IOException + { + ByteBuffer bb = ByteBuffer.allocate(size); + + bb.limit(size); + bb.position(0); + + try + { + channel.position(0); + channel.write(bb); + channel.force(false); + channel.position(0); + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + + fileSize = channel.size(); + } + + @Override + public synchronized void close() throws IOException, InterruptedException, ActiveMQException + { + super.close(); + + if (maxIOSemaphore != null) + { + while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) + { + ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName()); + } + } + + maxIOSemaphore = null; + try + { + if (channel != null) + { + channel.close(); + } + + if (rfile != null) + { + rfile.close(); + } + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + channel = null; + + rfile = null; + + notifyAll(); + } + + public int read(final ByteBuffer bytes) throws Exception + { + return read(bytes, null); + } + + public synchronized int read(final ByteBuffer bytes, final IOCallback callback) throws IOException, + ActiveMQIllegalStateException + { + try + { + if (channel == null) + { + throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel"); + } + int bytesRead = channel.read(bytes); + + if (callback != null) + { + callback.done(); + } + + bytes.flip(); + + return bytesRead; + } + catch (IOException e) + { + if (callback != null) + { + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage()); + } + + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + + throw e; + } + } + + public void sync() throws IOException + { + if (channel != null) + { + try + { + channel.force(false); + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + } + } + + public long size() throws IOException + { + if (channel == null) + { + return getFile().length(); + } + + try + { + return channel.size(); + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + } + + @Override + public void position(final long pos) throws IOException + { + try + { + super.position(pos); + channel.position(pos); + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + } + + @Override + public String toString() + { + return "NIOSequentialFile " + getFile(); + } + + public SequentialFile cloneFile() + { + return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor); + } + + public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) + { + if (callback == null) + { + throw new NullPointerException("callback parameter need to be set"); + } + + try + { + internalWrite(bytes, sync, callback); + } + catch (Exception e) + { + callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage()); + } + } + + public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception + { + internalWrite(bytes, sync, null); + } + + public void writeInternal(final ByteBuffer bytes) throws Exception + { + internalWrite(bytes, true, null); + } + + @Override + protected ByteBuffer newBuffer(int size, final int limit) + { + // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO + + size = limit; + + return super.newBuffer(size, limit); + } + + private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException + { + if (!isOpen()) + { + if (callback != null) + { + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened"); + } + else + { + throw ActiveMQJournalBundle.BUNDLE.fileNotOpened(); + } + return; + } + + position.addAndGet(bytes.limit()); + + if (maxIOSemaphore == null || callback == null) + { + // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous + try + { + doInternalWrite(bytes, sync, callback); + } + catch (IOException e) + { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + } + } + else + { + // This is a flow control on writing, just like maxAIO on libaio + maxIOSemaphore.acquire(); + + writerExecutor.execute(new Runnable() + { + public void run() + { + try + { + try + { + doInternalWrite(bytes, sync, callback); + } + catch (IOException e) + { + ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); + } + catch (Throwable e) + { + ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); + } + } + finally + { + maxIOSemaphore.release(); + } + } + }); + } + } + + /** + * @param bytes + * @param sync + * @param callback + * @throws IOException + * @throws Exception + */ + private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws IOException + { + channel.write(bytes); + + if (sync) + { + sync(); + } + + if (callback != null) + { + callback.done(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java new file mode 100644 index 0000000..e64e405 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.nio; + +import java.io.File; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.journal.impl.JournalConstants; + +public class NIOSequentialFileFactory extends AbstractSequentialFileFactory +{ + public NIOSequentialFileFactory(final File journalDir, final int maxIO) + { + this(journalDir, null, maxIO); + } + + public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, final int maxIO) + { + this(journalDir, + false, + JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, + JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, + maxIO, + false, + listener); + } + + public NIOSequentialFileFactory(final File journalDir, final boolean buffered, final int maxIO) + { + this(journalDir, buffered, null, maxIO); + } + + public NIOSequentialFileFactory(final File journalDir, + final boolean buffered, + final IOCriticalErrorListener listener, final int maxIO) + { + this(journalDir, + buffered, + JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, + JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, + maxIO, + false, + listener); + } + + public NIOSequentialFileFactory(final File journalDir, + final boolean buffered, + final int bufferSize, + final int bufferTimeout, + final int maxIO, + final boolean logRates) + { + this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null); + } + + public NIOSequentialFileFactory(final File journalDir, + final boolean buffered, + final int bufferSize, + final int bufferTimeout, + final int maxIO, + final boolean logRates, + final IOCriticalErrorListener listener) + { + super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); + } + + public SequentialFile createSequentialFile(final String fileName) + { + return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); + } + + public boolean isSupportsCallbacks() + { + return timedBuffer != null; + } + + + public ByteBuffer allocateDirectBuffer(final int size) + { + // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467 + ByteBuffer buffer2 = null; + try + { + buffer2 = ByteBuffer.allocateDirect(size); + } + catch (OutOfMemoryError error) + { + // This is a workaround for the way the JDK will deal with native buffers. + // the main portion is outside of the VM heap + // and the JDK will not have any reference about it to take GC into account + // so we force a GC and try again. + WeakReference<Object> obj = new WeakReference<Object>(new Object()); + try + { + long timeout = System.currentTimeMillis() + 5000; + while (System.currentTimeMillis() > timeout && obj.get() != null) + { + System.gc(); + Thread.sleep(100); + } + } + catch (InterruptedException e) + { + } + + buffer2 = ByteBuffer.allocateDirect(size); + + } + return buffer2; + } + + public void releaseDirectBuffer(ByteBuffer buffer) + { + // nothing we can do on this case. we can just have good faith on GC + } + + public ByteBuffer newBuffer(final int size) + { + return ByteBuffer.allocate(size); + } + + public void clearBuffer(final ByteBuffer buffer) + { + final int limit = buffer.limit(); + buffer.rewind(); + + for (int i = 0; i < limit; i++) + { + buffer.put((byte)0); + } + + buffer.rewind(); + } + + public ByteBuffer wrapBuffer(final byte[] bytes) + { + return ByteBuffer.wrap(bytes); + } + + public int getAlignment() + { + return 1; + } + + public int calculateBlockSize(final int bytes) + { + return bytes; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOAsyncTask.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOAsyncTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOAsyncTask.java deleted file mode 100644 index 09c80ca..0000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOAsyncTask.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.journal; - -import org.apache.activemq.artemis.core.asyncio.AIOCallback; - -/** - * This class is just a direct extension of AIOCallback. - * Just to avoid the direct dependency of org.apache.activemq.artemis.core.asynciio.AIOCallback from the journal. - */ -public interface IOAsyncTask extends AIOCallback -{ -}
