http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallback.java new file mode 100644 index 0000000..2cd5efe --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallback.java @@ -0,0 +1,79 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import org.apache.activemq6.core.journal.RecordInfo; + +/** + * A JournalReader + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public interface JournalReaderCallback +{ + void onReadAddRecord(RecordInfo info) throws Exception; + + /** + * @param recordInfo + * @throws Exception + */ + void onReadUpdateRecord(RecordInfo recordInfo) throws Exception; + + /** + * @param recordID + */ + void onReadDeleteRecord(long recordID) throws Exception; + + /** + * @param transactionID + * @param recordInfo + * @throws Exception + */ + void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception; + + /** + * @param transactionID + * @param recordInfo + * @throws Exception + */ + void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception; + + /** + * @param transactionID + * @param recordInfo + */ + void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception; + + /** + * @param transactionID + * @param extraData + * @param numberOfRecords + */ + void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception; + + /** + * @param transactionID + * @param numberOfRecords + */ + void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception; + + /** + * @param transactionID + */ + void onReadRollbackRecord(long transactionID) throws Exception; + + void markAsDataFile(JournalFile file); + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallbackAbstract.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallbackAbstract.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallbackAbstract.java new file mode 100644 index 0000000..fdd50bd --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallbackAbstract.java @@ -0,0 +1,67 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import org.apache.activemq6.core.journal.RecordInfo; + +/** + * A JournalReaderCallbackAbstract + * + * @author <mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class JournalReaderCallbackAbstract implements JournalReaderCallback +{ + + public void markAsDataFile(final JournalFile file) + { + } + + public void onReadAddRecord(final RecordInfo info) throws Exception + { + } + + public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + } + + public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception + { + } + + public void onReadDeleteRecord(final long recordID) throws Exception + { + } + + public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + } + + public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception + { + } + + public void onReadRollbackRecord(final long transactionID) throws Exception + { + } + + public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception + { + } + + public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecord.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecord.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecord.java new file mode 100644 index 0000000..414f01d --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecord.java @@ -0,0 +1,96 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq6.api.core.Pair; + +/** + * This holds the relationship a record has with other files in regard to reference counting. + * Note: This class used to be called PosFiles + * + * Used on the ref-count for reclaiming + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * */ +public class JournalRecord +{ + private final JournalFile addFile; + + private final int size; + + private List<Pair<JournalFile, Integer>> updateFiles; + + public JournalRecord(final JournalFile addFile, final int size) + { + this.addFile = addFile; + + this.size = size; + + addFile.incPosCount(); + + addFile.addSize(size); + } + + void addUpdateFile(final JournalFile updateFile, final int size) + { + if (updateFiles == null) + { + updateFiles = new ArrayList<Pair<JournalFile, Integer>>(); + } + + updateFiles.add(new Pair<JournalFile, Integer>(updateFile, size)); + + updateFile.incPosCount(); + + updateFile.addSize(size); + } + + void delete(final JournalFile file) + { + file.incNegCount(addFile); + addFile.decSize(size); + + if (updateFiles != null) + { + for (Pair<JournalFile, Integer> updFile : updateFiles) + { + file.incNegCount(updFile.getA()); + updFile.getA().decSize(updFile.getB()); + } + } + } + + @Override + public String toString() + { + StringBuilder buffer = new StringBuilder(); + buffer.append("JournalRecord(add=" + addFile.getFile().getFileName()); + + if (updateFiles != null) + { + + for (Pair<JournalFile, Integer> update : updateFiles) + { + buffer.append(", update=" + update.getA().getFile().getFileName()); + } + + } + + buffer.append(")"); + + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecordProvider.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecordProvider.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecordProvider.java new file mode 100644 index 0000000..e72fe49 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecordProvider.java @@ -0,0 +1,33 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.util.Map; + +/** + * This is an interface used only internally. + * + * During a TX.commit, the JournalTransaction needs to get a valid list of records from either the JournalImpl or JournalCompactor. + * + * when a commit is read, the JournalTransaction will inquire the JournalCompactor about the existent records + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public interface JournalRecordProvider +{ + JournalCompactor getCompactor(); + + Map<Long, JournalRecord> getRecords(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalTransaction.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalTransaction.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalTransaction.java new file mode 100644 index 0000000..a5dadbc --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalTransaction.java @@ -0,0 +1,456 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord; + +/** + * A JournalTransaction + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class JournalTransaction +{ + + private JournalRecordProvider journal; + + private List<JournalUpdate> pos; + + private List<JournalUpdate> neg; + + private final long id; + + // All the files this transaction is touching on. + // We can't have those files being reclaimed if there is a pending transaction + private Set<JournalFile> pendingFiles; + + private TransactionCallback currentCallback; + + private boolean compacting = false; + + private Map<JournalFile, TransactionCallback> callbackList; + + private JournalFile lastFile = null; + + private final AtomicInteger counter = new AtomicInteger(); + + public JournalTransaction(final long id, final JournalRecordProvider journal) + { + this.id = id; + this.journal = journal; + } + + public void replaceRecordProvider(final JournalRecordProvider provider) + { + journal = provider; + } + + /** + * @return the id + */ + public long getId() + { + return id; + } + + public int getCounter(final JournalFile file) + { + return internalgetCounter(file).intValue(); + } + + public void incCounter(final JournalFile file) + { + internalgetCounter(file).incrementAndGet(); + } + + public long[] getPositiveArray() + { + if (pos == null) + { + return new long[0]; + } + else + { + int i = 0; + long[] ids = new long[pos.size()]; + for (JournalUpdate el : pos) + { + ids[i++] = el.getId(); + } + return ids; + } + } + + public void setCompacting() + { + compacting = true; + + // Everything is cleared on the transaction... + // since we are compacting, everything is at the compactor's level + clear(); + } + + /** This is used to merge transactions from compacting */ + public void merge(final JournalTransaction other) + { + if (other.pos != null) + { + if (pos == null) + { + pos = new ArrayList<JournalUpdate>(); + } + + pos.addAll(other.pos); + } + + if (other.neg != null) + { + if (neg == null) + { + neg = new ArrayList<JournalUpdate>(); + } + + neg.addAll(other.neg); + } + + if (other.pendingFiles != null) + { + if (pendingFiles == null) + { + pendingFiles = new HashSet<JournalFile>(); + } + + pendingFiles.addAll(other.pendingFiles); + } + + compacting = false; + } + + /** + * + */ + public void clear() + { + // / Compacting is recreating all the previous files and everything + // / so we just clear the list of previous files, previous pos and previous adds + // / The transaction may be working at the top from now + + if (pendingFiles != null) + { + pendingFiles.clear(); + } + + if (callbackList != null) + { + callbackList.clear(); + } + + if (pos != null) + { + pos.clear(); + } + + if (neg != null) + { + neg.clear(); + } + + counter.set(0); + + lastFile = null; + + currentCallback = null; + } + + /** + * @param currentFile + * @param data + */ + public void fillNumberOfRecords(final JournalFile currentFile, final JournalInternalRecord data) + { + data.setNumberOfRecords(getCounter(currentFile)); + } + + public TransactionCallback getCallback(final JournalFile file) throws Exception + { + if (callbackList == null) + { + callbackList = new HashMap<JournalFile, TransactionCallback>(); + } + + currentCallback = callbackList.get(file); + + if (currentCallback == null) + { + currentCallback = new TransactionCallback(); + callbackList.put(file, currentCallback); + } + + if (currentCallback.getErrorMessage() != null) + { + throw HornetQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage()); + } + + currentCallback.countUp(); + + return currentCallback; + } + + public void addPositive(final JournalFile file, final long id, final int size) + { + incCounter(file); + + addFile(file); + + if (pos == null) + { + pos = new ArrayList<JournalUpdate>(); + } + + pos.add(new JournalUpdate(file, id, size)); + } + + public void addNegative(final JournalFile file, final long id) + { + incCounter(file); + + addFile(file); + + if (neg == null) + { + neg = new ArrayList<JournalUpdate>(); + } + + neg.add(new JournalUpdate(file, id, 0)); + } + + /** + * The caller of this method needs to guarantee appendLock.lock at the journal. (unless this is being called from load what is a single thread process). + * */ + public void commit(final JournalFile file) + { + JournalCompactor compactor = journal.getCompactor(); + + if (compacting) + { + compactor.addCommandCommit(this, file); + } + else + { + + if (pos != null) + { + for (JournalUpdate trUpdate : pos) + { + JournalRecord posFiles = journal.getRecords().get(trUpdate.id); + + if (compactor != null && compactor.lookupRecord(trUpdate.id)) + { + // This is a case where the transaction was opened after compacting was started, + // but the commit arrived while compacting was working + // We need to cache the counter update, so compacting will take the correct files when it is done + compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size); + } + else if (posFiles == null) + { + posFiles = new JournalRecord(trUpdate.file, trUpdate.size); + + journal.getRecords().put(trUpdate.id, posFiles); + } + else + { + posFiles.addUpdateFile(trUpdate.file, trUpdate.size); + } + } + } + + if (neg != null) + { + for (JournalUpdate trDelete : neg) + { + if (compactor != null) + { + compactor.addCommandDelete(trDelete.id, trDelete.file); + } + else + { + JournalRecord posFiles = journal.getRecords().remove(trDelete.id); + + if (posFiles != null) + { + posFiles.delete(trDelete.file); + } + } + } + } + + // Now add negs for the pos we added in each file in which there were + // transactional operations + + for (JournalFile jf : pendingFiles) + { + file.incNegCount(jf); + } + } + } + + public void waitCallbacks() throws InterruptedException + { + if (callbackList != null) + { + for (TransactionCallback callback : callbackList.values()) + { + callback.waitCompletion(); + } + } + } + + /** Wait completion at the latest file only */ + public void waitCompletion() throws Exception + { + if (currentCallback != null) + { + currentCallback.waitCompletion(); + } + } + + /** + * The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context. + * or else potFilesMap could be affected + * */ + public void rollback(final JournalFile file) + { + JournalCompactor compactor = journal.getCompactor(); + + if (compacting && compactor != null) + { + compactor.addCommandRollback(this, file); + } + else + { + // Now add negs for the pos we added in each file in which there were + // transactional operations + // Note that we do this on rollback as we do on commit, since we need + // to ensure the file containing + // the rollback record doesn't get deleted before the files with the + // transactional operations are deleted + // Otherwise we may run into problems especially with XA where we are + // just left with a prepare when the tx + // has actually been rolled back + + for (JournalFile jf : pendingFiles) + { + file.incNegCount(jf); + } + } + } + + /** + * The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context. + * or else potFilesMap could be affected + * */ + public void prepare(final JournalFile file) + { + // We don't want the prepare record getting deleted before time + + addFile(file); + } + + /** Used by load, when the transaction was not loaded correctly */ + public void forget() + { + // The transaction was not committed or rolled back in the file, so we + // reverse any pos counts we added + for (JournalFile jf : pendingFiles) + { + jf.decPosCount(); + } + + } + + @Override + public String toString() + { + return "JournalTransaction(" + id + ")"; + } + + private AtomicInteger internalgetCounter(final JournalFile file) + { + if (lastFile != file) + + { + lastFile = file; + counter.set(0); + } + return counter; + } + + private void addFile(final JournalFile file) + { + if (pendingFiles == null) + { + pendingFiles = new HashSet<JournalFile>(); + } + + if (!pendingFiles.contains(file)) + { + pendingFiles.add(file); + + // We add a pos for the transaction itself in the file - this + // prevents any transactional operations + // being deleted before a commit or rollback is written + file.incPosCount(); + } + } + + private static class JournalUpdate + { + private final JournalFile file; + + long id; + + int size; + + /** + * @param file + * @param id + * @param size + */ + private JournalUpdate(final JournalFile file, final long id, final int size) + { + super(); + this.file = file; + this.id = id; + this.size = size; + } + + /** + * @return the id + */ + public long getId() + { + return id; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFile.java new file mode 100644 index 0000000..97a70dc --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFile.java @@ -0,0 +1,415 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.HornetQIOErrorException; +import org.apache.activemq6.api.core.HornetQIllegalStateException; +import org.apache.activemq6.core.journal.IOAsyncTask; +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.journal.HornetQJournalBundle; +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * A NIOSequentialFile + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public final class NIOSequentialFile extends AbstractSequentialFile +{ + private FileChannel channel; + + private RandomAccessFile rfile; + + /** + * The write semaphore here is only used when writing asynchronously + */ + private Semaphore maxIOSemaphore; + + private final int defaultMaxIO; + + private int maxIO; + + public NIOSequentialFile(final SequentialFileFactory factory, + final String directory, + final String fileName, + final int maxIO, + final Executor writerExecutor) + { + super(directory, new File(directory + "/" + fileName), factory, writerExecutor); + defaultMaxIO = maxIO; + } + + public NIOSequentialFile(final SequentialFileFactory factory, + final File file, + final int maxIO, + final Executor writerExecutor) + { + super(file.getParent(), new File(file.getPath()), factory, writerExecutor); + defaultMaxIO = maxIO; + } + + public int getAlignment() + { + return 1; + } + + public int calculateBlockStart(final int position) + { + return position; + } + + public synchronized boolean isOpen() + { + return channel != null; + } + + /** + * this.maxIO represents the default maxIO. + * Some operations while initializing files on the journal may require a different maxIO + */ + public synchronized void open() throws IOException + { + open(defaultMaxIO, true); + } + + public void open(final int maxIO, final boolean useExecutor) throws IOException + { + try + { + rfile = new RandomAccessFile(getFile(), "rw"); + + channel = rfile.getChannel(); + + fileSize = channel.size(); + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + + if (writerExecutor != null && useExecutor) + { + maxIOSemaphore = new Semaphore(maxIO); + this.maxIO = maxIO; + } + } + + public void fill(final int position, final int size, final byte fillCharacter) throws IOException + { + ByteBuffer bb = ByteBuffer.allocate(size); + + for (int i = 0; i < size; i++) + { + bb.put(fillCharacter); + } + + bb.flip(); + + try + { + channel.position(position); + channel.write(bb); + channel.force(false); + channel.position(0); + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + + fileSize = channel.size(); + } + + public synchronized void waitForClose() throws InterruptedException + { + while (isOpen()) + { + wait(); + } + } + + @Override + public synchronized void close() throws IOException, InterruptedException, HornetQException + { + super.close(); + + if (maxIOSemaphore != null) + { + while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) + { + HornetQJournalLogger.LOGGER.errorClosingFile(getFileName()); + } + } + + maxIOSemaphore = null; + try + { + if (channel != null) + { + channel.close(); + } + + if (rfile != null) + { + rfile.close(); + } + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + channel = null; + + rfile = null; + + notifyAll(); + } + + public int read(final ByteBuffer bytes) throws Exception + { + return read(bytes, null); + } + + public synchronized int read(final ByteBuffer bytes, final IOAsyncTask callback) throws IOException, + HornetQIllegalStateException + { + try + { + if (channel == null) + { + throw new HornetQIllegalStateException("File " + this.getFileName() + " has a null channel"); + } + int bytesRead = channel.read(bytes); + + if (callback != null) + { + callback.done(); + } + + bytes.flip(); + + return bytesRead; + } + catch (IOException e) + { + if (callback != null) + { + callback.onError(HornetQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage()); + } + + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + + throw e; + } + } + + public void sync() throws IOException + { + if (channel != null) + { + try + { + channel.force(false); + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + } + } + + public long size() throws IOException + { + if (channel == null) + { + return getFile().length(); + } + + try + { + return channel.size(); + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + } + + @Override + public void position(final long pos) throws IOException + { + try + { + super.position(pos); + channel.position(pos); + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + } + + @Override + public String toString() + { + return "NIOSequentialFile " + getFile(); + } + + public SequentialFile cloneFile() + { + return new NIOSequentialFile(factory, getFile(), maxIO, writerExecutor); + } + + public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) + { + if (callback == null) + { + throw new NullPointerException("callback parameter need to be set"); + } + + try + { + internalWrite(bytes, sync, callback); + } + catch (Exception e) + { + callback.onError(HornetQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage()); + } + } + + public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception + { + internalWrite(bytes, sync, null); + } + + public void writeInternal(final ByteBuffer bytes) throws Exception + { + internalWrite(bytes, true, null); + } + + @Override + protected ByteBuffer newBuffer(int size, final int limit) + { + // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO + + size = limit; + + return super.newBuffer(size, limit); + } + + private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException, HornetQIOErrorException, InterruptedException + { + if (!isOpen()) + { + if (callback != null) + { + callback.onError(HornetQExceptionType.IO_ERROR.getCode(), "File not opened"); + } + else + { + throw HornetQJournalBundle.BUNDLE.fileNotOpened(); + } + return; + } + + position.addAndGet(bytes.limit()); + + if (maxIOSemaphore == null || callback == null) + { + // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous + try + { + doInternalWrite(bytes, sync, callback); + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + } + } + else + { + // This is a flow control on writing, just like maxAIO on libaio + maxIOSemaphore.acquire(); + + writerExecutor.execute(new Runnable() + { + public void run() + { + try + { + try + { + doInternalWrite(bytes, sync, callback); + } + catch (IOException e) + { + HornetQJournalLogger.LOGGER.errorSubmittingWrite(e); + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this); + callback.onError(HornetQExceptionType.IO_ERROR.getCode(), e.getMessage()); + } + catch (Throwable e) + { + HornetQJournalLogger.LOGGER.errorSubmittingWrite(e); + callback.onError(HornetQExceptionType.IO_ERROR.getCode(), e.getMessage()); + } + } + finally + { + maxIOSemaphore.release(); + } + } + }); + } + } + + /** + * @param bytes + * @param sync + * @param callback + * @throws IOException + * @throws Exception + */ + private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException + { + channel.write(bytes); + + if (sync) + { + sync(); + } + + if (callback != null) + { + callback.done(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFileFactory.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFileFactory.java new file mode 100644 index 0000000..2137477 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFileFactory.java @@ -0,0 +1,171 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; + +import org.apache.activemq6.core.journal.IOCriticalErrorListener; +import org.apache.activemq6.core.journal.SequentialFile; + +/** + * + * A NIOSequentialFileFactory + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public class NIOSequentialFileFactory extends AbstractSequentialFileFactory +{ + public NIOSequentialFileFactory(final String journalDir) + { + this(journalDir, null); + } + + public NIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener listener) + { + this(journalDir, + false, + JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, + JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, + false, + listener); + } + + public NIOSequentialFileFactory(final String journalDir, final boolean buffered) + { + this(journalDir, buffered, null); + } + + public NIOSequentialFileFactory(final String journalDir, + final boolean buffered, + final IOCriticalErrorListener listener) + { + this(journalDir, + buffered, + JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, + JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, + false, + listener); + } + + public NIOSequentialFileFactory(final String journalDir, + final boolean buffered, + final int bufferSize, + final int bufferTimeout, + final boolean logRates) + { + this(journalDir, buffered, bufferSize, bufferTimeout, logRates, null); + } + + public NIOSequentialFileFactory(final String journalDir, + final boolean buffered, + final int bufferSize, + final int bufferTimeout, + final boolean logRates, + final IOCriticalErrorListener listener) + { + super(journalDir, buffered, bufferSize, bufferTimeout, logRates, listener); + } + + public SequentialFile createSequentialFile(final String fileName, int maxIO) + { + if (maxIO < 1) + { + // A single threaded IO + maxIO = 1; + } + + return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); + } + + public boolean isSupportsCallbacks() + { + return timedBuffer != null; + } + + + public ByteBuffer allocateDirectBuffer(final int size) + { + // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467 + ByteBuffer buffer2 = null; + try + { + buffer2 = ByteBuffer.allocateDirect(size); + } + catch (OutOfMemoryError error) + { + // This is a workaround for the way the JDK will deal with native buffers. + // the main portion is outside of the VM heap + // and the JDK will not have any reference about it to take GC into account + // so we force a GC and try again. + WeakReference<Object> obj = new WeakReference<Object>(new Object()); + try + { + long timeout = System.currentTimeMillis() + 5000; + while (System.currentTimeMillis() > timeout && obj.get() != null) + { + System.gc(); + Thread.sleep(100); + } + } + catch (InterruptedException e) + { + } + + buffer2 = ByteBuffer.allocateDirect(size); + + } + return buffer2; + } + + public void releaseDirectBuffer(ByteBuffer buffer) + { + // nothing we can do on this case. we can just have good faith on GC + } + + public ByteBuffer newBuffer(final int size) + { + return ByteBuffer.allocate(size); + } + + public void clearBuffer(final ByteBuffer buffer) + { + final int limit = buffer.limit(); + buffer.rewind(); + + for (int i = 0; i < limit; i++) + { + buffer.put((byte)0); + } + + buffer.rewind(); + } + + public ByteBuffer wrapBuffer(final byte[] bytes) + { + return ByteBuffer.wrap(bytes); + } + + public int getAlignment() + { + return 1; + } + + public int calculateBlockSize(final int bytes) + { + return bytes; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/Reclaimer.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/Reclaimer.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/Reclaimer.java new file mode 100644 index 0000000..c5d1800 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/Reclaimer.java @@ -0,0 +1,118 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + + +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * + * <p>The journal consists of an ordered list of journal files Fn where 0 <= n <= N</p> + * + * <p>A journal file can contain either positives (pos) or negatives (neg)</p> + * + * <p>(Positives correspond either to adds or updates, and negatives correspond to deletes).</p> + * + * <p>A file Fn can be deleted if, and only if the following criteria are satisified</p> + * + * <p>1) All pos in a file Fn, must have corresponding neg in any file Fm where m >= n.</p> + * + * <p>2) All pos that correspond to any neg in file Fn, must all live in any file Fm where 0 <= m <= n + * which are also marked for deletion in the same pass of the algorithm.</p> + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public class Reclaimer +{ + private static boolean trace = HornetQJournalLogger.LOGGER.isTraceEnabled(); + + private static void trace(final String message) + { + HornetQJournalLogger.LOGGER.trace(message); + } + + public void scan(final JournalFile[] files) + { + for (int i = 0; i < files.length; i++) + { + // First we evaluate criterion 1) + + JournalFile currentFile = files[i]; + + int posCount = currentFile.getPosCount(); + + int totNeg = 0; + + if (Reclaimer.trace) + { + Reclaimer.trace("posCount on " + currentFile + " = " + posCount); + } + + for (int j = i; j < files.length; j++) + { + if (Reclaimer.trace) + { + if (files[j].getNegCount(currentFile) != 0) + { + Reclaimer.trace("Negative from " + files[j] + + " into " + + currentFile + + " = " + + files[j].getNegCount(currentFile)); + } + } + + totNeg += files[j].getNegCount(currentFile); + } + + currentFile.setCanReclaim(true); + + if (posCount <= totNeg) + { + // Now we evaluate criterion 2) + + for (int j = 0; j <= i; j++) + { + JournalFile file = files[j]; + + int negCount = currentFile.getNegCount(file); + + if (negCount != 0) + { + if (file.isCanReclaim()) + { + // Ok + } + else + { + if (Reclaimer.trace) + { + Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values"); + } + + currentFile.setCanReclaim(false); + + break; + } + } + } + } + else + { + currentFile.setCanReclaim(false); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SimpleWaitIOCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SimpleWaitIOCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SimpleWaitIOCallback.java new file mode 100644 index 0000000..26bfbe0 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SimpleWaitIOCallback.java @@ -0,0 +1,92 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * A SimpleWaitIOCallback + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public final class SimpleWaitIOCallback extends SyncIOCompletion +{ + private final CountDownLatch latch = new CountDownLatch(1); + + private volatile String errorMessage; + + private volatile int errorCode = 0; + + @Override + public String toString() + { + return SimpleWaitIOCallback.class.getName(); + } + + public void done() + { + latch.countDown(); + } + + public void onError(final int errorCode1, final String errorMessage1) + { + this.errorCode = errorCode1; + + this.errorMessage = errorMessage1; + + HornetQJournalLogger.LOGGER.errorOnIOCallback(errorMessage1); + + latch.countDown(); + } + + @Override + public void waitCompletion() throws InterruptedException, HornetQException + { + while (true) + { + if (latch.await(2, TimeUnit.SECONDS)) + break; + } + + if (errorMessage != null) + { + throw HornetQExceptionType.createException(errorCode, errorMessage); + } + + return; + } + + public boolean waitCompletion(final long timeout) throws InterruptedException, HornetQException + { + boolean retValue = latch.await(timeout, TimeUnit.MILLISECONDS); + + if (errorMessage != null) + { + throw HornetQExceptionType.createException(errorCode, errorMessage); + } + + return retValue; + } + + @Override + public void storeLineUp() + { + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncIOCompletion.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncIOCompletion.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncIOCompletion.java new file mode 100644 index 0000000..2c344a1 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncIOCompletion.java @@ -0,0 +1,47 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import org.apache.activemq6.core.journal.IOCompletion; + +/** + * Internal class used to manage explicit syncs on the Journal through callbacks. + * + * @author <mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public abstract class SyncIOCompletion implements IOCompletion +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + public abstract void waitCompletion() throws Exception; + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncSpeedTest.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncSpeedTest.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncSpeedTest.java new file mode 100644 index 0000000..b0628f2 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncSpeedTest.java @@ -0,0 +1,354 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; + +import org.apache.activemq6.core.journal.IOAsyncTask; +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * A SyncSpeedTest + * + * This class just provides some diagnostics on how fast your disk can sync + * Useful when determining performance issues + * + * @author <a href="mailto:[email protected]">Tim Fox</a> fox + * + * + */ +public class SyncSpeedTest +{ + public static void main(final String[] args) + { + try + { + new SyncSpeedTest().testScaleAIO(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + protected SequentialFileFactory fileFactory; + + public boolean AIO = true; + + protected void setupFactory() + { + if (AIO) + { + fileFactory = new AIOSequentialFileFactory(".", 0, 0, false, null); + } + else + { + fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false, null); + } + } + + protected SequentialFile createSequentialFile(final String fileName) + { + if (AIO) + { + return new AIOSequentialFile(fileFactory, + 0, + 0, + ".", + fileName, + 100000, + null, + null, + Executors.newSingleThreadExecutor()); + } + else + { + return new NIOSequentialFile(fileFactory, new File(fileName), 1000, null); + } + } + + public void run2() throws Exception + { + setupFactory(); + + int recordSize = 128 * 1024; + + while (true) + { + System.out.println("** record size is " + recordSize); + + int warmup = 500; + + int its = 500; + + int fileSize = (its + warmup) * recordSize; + + SequentialFile file = createSequentialFile("sync-speed-test.dat"); + + if (file.exists()) + { + file.delete(); + } + + file.open(); + + file.fill(0, fileSize, (byte)'X'); + + if (!AIO) + { + file.sync(); + } + + ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h'); + + long start = 0; + + for (int i = 0; i < its + warmup; i++) + { + if (i == warmup) + { + start = System.currentTimeMillis(); + } + + bb1.rewind(); + + file.writeDirect(bb1, true); + } + + long end = System.currentTimeMillis(); + + double rate = 1000 * (double)its / (end - start); + + double throughput = recordSize * rate; + + System.out.println("Rate of " + rate + " syncs per sec"); + System.out.println("Throughput " + throughput + " bytes per sec"); + System.out.println("*************"); + + recordSize *= 2; + } + } + + public void run() throws Exception + { + int recordSize = 256; + + while (true) + { + System.out.println("** record size is " + recordSize); + + int warmup = 500; + + int its = 500; + + int fileSize = (its + warmup) * recordSize; + + File file = new File("sync-speed-test.dat"); + + if (file.exists()) + { + if (!file.delete()) + { + HornetQJournalLogger.LOGGER.errorDeletingFile(file); + } + } + + boolean created = file.createNewFile(); + if (!created) + throw new IOException("could not create file " + file); + + RandomAccessFile rfile = new RandomAccessFile(file, "rw"); + + FileChannel channel = rfile.getChannel(); + + ByteBuffer bb = generateBuffer(fileSize, (byte)'x'); + + write(bb, channel, fileSize); + + channel.force(true); + + channel.position(0); + + ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h'); + + long start = 0; + + for (int i = 0; i < its + warmup; i++) + { + if (i == warmup) + { + start = System.currentTimeMillis(); + } + + bb1.flip(); + channel.write(bb1); + channel.force(false); + } + + long end = System.currentTimeMillis(); + + double rate = 1000 * (double)its / (end - start); + + double throughput = recordSize * rate; + + System.out.println("Rate of " + rate + " syncs per sec"); + System.out.println("Throughput " + throughput + " bytes per sec"); + + recordSize *= 2; + } + } + + public void testScaleAIO() throws Exception + { + setupFactory(); + + final int recordSize = 1024; + + System.out.println("** record size is " + recordSize); + + final int its = 10; + + for (int numThreads = 1; numThreads <= 10; numThreads++) + { + + int fileSize = its * recordSize * numThreads; + + final SequentialFile file = createSequentialFile("sync-speed-test.dat"); + + if (file.exists()) + { + file.delete(); + } + + file.open(); + + file.fill(0, fileSize, (byte)'X'); + + if (!AIO) + { + file.sync(); + } + + final CountDownLatch latch = new CountDownLatch(its * numThreads); + + class MyIOAsyncTask implements IOAsyncTask + { + public void done() + { + latch.countDown(); + } + + public void onError(final int errorCode, final String errorMessage) + { + + } + } + + final MyIOAsyncTask task = new MyIOAsyncTask(); + + class MyRunner implements Runnable + { + private final ByteBuffer bb1; + + MyRunner() + { + bb1 = generateBuffer(recordSize, (byte)'h'); + } + + public void run() + { + for (int i = 0; i < its; i++) + { + bb1.rewind(); + + file.writeDirect(bb1, true, task); + // try + // { + // file.writeDirect(bb1, true); + // } + // catch (Exception e) + // { + // e.printStackTrace(); + // } + } + } + } + + Set<Thread> threads = new HashSet<Thread>(); + + for (int i = 0; i < numThreads; i++) + { + MyRunner runner = new MyRunner(); + + Thread t = new Thread(runner); + + threads.add(t); + } + + long start = System.currentTimeMillis(); + + for (Thread t : threads) + { + HornetQJournalLogger.LOGGER.startingThread(); + t.start(); + } + + for (Thread t : threads) + { + t.join(); + } + + latch.await(); + + long end = System.currentTimeMillis(); + + double rate = 1000 * (double)its * numThreads / (end - start); + + double throughput = recordSize * rate; + + System.out.println("For " + numThreads + " threads:"); + System.out.println("Rate of " + rate + " records per sec"); + System.out.println("Throughput " + throughput + " bytes per sec"); + System.out.println("*************"); + } + } + + private void write(final ByteBuffer buffer, final FileChannel channel, final int size) throws Exception + { + buffer.flip(); + + channel.write(buffer); + } + + private ByteBuffer generateBuffer(final int size, final byte ch) + { + ByteBuffer bb = ByteBuffer.allocateDirect(size); + + for (int i = 0; i < size; i++) + { + bb.put(ch); + } + + return bb; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBuffer.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBuffer.java new file mode 100644 index 0000000..1e9b3d6 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBuffer.java @@ -0,0 +1,560 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.core.journal.IOAsyncTask; +import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding; +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * A TimedBuffer + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class TimedBuffer +{ + // Constants ----------------------------------------------------- + + // The number of tries on sleep before switching to spin + public static final int MAX_CHECKS_ON_SLEEP = 20; + + // Attributes ---------------------------------------------------- + + private TimedBufferObserver bufferObserver; + + // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread + // in spinning and checking the time - and using up CPU in the process - this semaphore is used to + // prevent that + private final Semaphore spinLimiter = new Semaphore(1); + + private CheckTimer timerRunnable = new CheckTimer(); + + private final int bufferSize; + + private final HornetQBuffer buffer; + + private int bufferLimit = 0; + + private List<IOAsyncTask> callbacks; + + private volatile int timeout; + + // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen + private volatile boolean pendingSync = false; + + private Thread timerThread; + + private volatile boolean started; + + // We use this flag to prevent flush occurring between calling checkSize and addBytes + // CheckSize must always be followed by it's corresponding addBytes otherwise the buffer + // can get in an inconsistent state + private boolean delayFlush; + + // for logging write rates + + private final boolean logRates; + + private final AtomicLong bytesFlushed = new AtomicLong(0); + + private final AtomicLong flushesDone = new AtomicLong(0); + + private Timer logRatesTimer; + + private TimerTask logRatesTimerTask; + + private boolean useSleep = true; + + // no need to be volatile as every access is synchronized + private boolean spinning = false; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + public TimedBuffer(final int size, final int timeout, final boolean logRates) + { + bufferSize = size; + + this.logRates = logRates; + + if (logRates) + { + logRatesTimer = new Timer(true); + } + // Setting the interval for nano-sleeps + + buffer = HornetQBuffers.fixedBuffer(bufferSize); + + buffer.clear(); + + bufferLimit = 0; + + callbacks = new ArrayList<IOAsyncTask>(); + + this.timeout = timeout; + } + + // for Debug purposes + public synchronized boolean isUseSleep() + { + return useSleep; + } + + public synchronized void setUseSleep(boolean useSleep) + { + this.useSleep = useSleep; + } + + public synchronized void start() + { + if (started) + { + return; + } + + // Need to start with the spin limiter acquired + try + { + spinLimiter.acquire(); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + + timerRunnable = new CheckTimer(); + + timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout"); + + timerThread.start(); + + if (logRates) + { + logRatesTimerTask = new LogRatesTimerTask(); + + logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000); + } + + started = true; + } + + public void stop() + { + if (!started) + { + return; + } + + flush(); + + bufferObserver = null; + + timerRunnable.close(); + + spinLimiter.release(); + + if (logRates) + { + logRatesTimerTask.cancel(); + } + + while (timerThread.isAlive()) + { + try + { + timerThread.join(); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + + started = false; + } + + public synchronized void setObserver(final TimedBufferObserver observer) + { + if (bufferObserver != null) + { + flush(); + } + + bufferObserver = observer; + } + + /** + * Verify if the size fits the buffer + * + * @param sizeChecked + */ + public synchronized boolean checkSize(final int sizeChecked) + { + if (!started) + { + throw new IllegalStateException("TimedBuffer is not started"); + } + + if (sizeChecked > bufferSize) + { + throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize + + ") on the journal"); + } + + if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) + { + // Either there is not enough space left in the buffer for the sized record + // Or a flush has just been performed and we need to re-calcualate bufferLimit + + flush(); + + delayFlush = true; + + final int remainingInFile = bufferObserver.getRemainingBytes(); + + if (sizeChecked > remainingInFile) + { + return false; + } + else + { + // There is enough space in the file for this size + + // Need to re-calculate buffer limit + + bufferLimit = Math.min(remainingInFile, bufferSize); + + return true; + } + } + else + { + delayFlush = true; + + return true; + } + } + + public synchronized void addBytes(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback) + { + addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback); + } + + public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) + { + if (!started) + { + throw new IllegalStateException("TimedBuffer is not started"); + } + + delayFlush = false; + + bytes.encode(buffer); + + callbacks.add(callback); + + if (sync) + { + pendingSync = true; + + startSpin(); + } + + } + + public void flush() + { + flush(false); + } + + /** + * force means the Journal is moving to a new file. Any pending write need to be done immediately + * or data could be lost + */ + public void flush(final boolean force) + { + synchronized (this) + { + if (!started) + { + throw new IllegalStateException("TimedBuffer is not started"); + } + + if ((force || !delayFlush) && buffer.writerIndex() > 0) + { + int pos = buffer.writerIndex(); + + if (logRates) + { + bytesFlushed.addAndGet(pos); + } + + ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); + + // Putting a byteArray on a native buffer is much faster, since it will do in a single native call. + // Using bufferToFlush.put(buffer) would make several append calls for each byte + // We also transfer the content of this buffer to the native file's buffer + + bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos); + + bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); + + stopSpin(); + + pendingSync = false; + + // swap the instance as the previous callback list is being used asynchronously + callbacks = new LinkedList<IOAsyncTask>(); + + buffer.clear(); + + bufferLimit = 0; + + flushesDone.incrementAndGet(); + } + } + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + + private class LogRatesTimerTask extends TimerTask + { + private boolean closed; + + private long lastExecution; + + private long lastBytesFlushed; + + private long lastFlushesDone; + + @Override + public synchronized void run() + { + if (!closed) + { + long now = System.currentTimeMillis(); + + long bytesF = bytesFlushed.get(); + long flushesD = flushesDone.get(); + + if (lastExecution != 0) + { + double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution); + HornetQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024))); + double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution); + HornetQJournalLogger.LOGGER.flushRate(flushRate); + } + + lastExecution = now; + + lastBytesFlushed = bytesF; + + lastFlushesDone = flushesD; + } + } + + @Override + public synchronized boolean cancel() + { + closed = true; + + return super.cancel(); + } + } + + private class CheckTimer implements Runnable + { + private volatile boolean closed = false; + + int checks = 0; + int failedChecks = 0; + long timeBefore = 0; + + final int sleepMillis = timeout / 1000000; // truncates + final int sleepNanos = timeout % 1000000; + + + public void run() + { + long lastFlushTime = 0; + + while (!closed) + { + // We flush on the timer if there are pending syncs there and we've waited at least one + // timeout since the time of the last flush. + // Effectively flushing "resets" the timer + // On the timeout verification, notice that we ignore the timeout check if we are using sleep + + if (pendingSync) + { + if (isUseSleep()) + { + // if using sleep, we will always flush + flush(); + lastFlushTime = System.nanoTime(); + } + else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) + { + // if not using flush we will spin and do the time checks manually + flush(); + lastFlushTime = System.nanoTime(); + } + + } + + sleepIfPossible(); + + try + { + spinLimiter.acquire(); + + Thread.yield(); + + spinLimiter.release(); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + } + + /** + * We will attempt to use sleep only if the system supports nano-sleep + * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well. + * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin + */ + private void sleepIfPossible() + { + if (isUseSleep()) + { + if (checks < MAX_CHECKS_ON_SLEEP) + { + timeBefore = System.nanoTime(); + } + + try + { + sleep(sleepMillis, sleepNanos); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + catch (Exception e) + { + setUseSleep(false); + HornetQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e); + } + + if (checks < MAX_CHECKS_ON_SLEEP) + { + long realTimeSleep = System.nanoTime() - timeBefore; + + // I'm letting the real time to be up to 50% than the requested sleep. + if (realTimeSleep > timeout * 1.5) + { + failedChecks++; + } + + if (++checks >= MAX_CHECKS_ON_SLEEP) + { + if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) + { + HornetQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts"); + setUseSleep(false); + } + } + } + } + } + + public void close() + { + closed = true; + } + } + + /** + * Sub classes (tests basically) can use this to override how the sleep is being done + * + * @param sleepMillis + * @param sleepNanos + * @throws InterruptedException + */ + protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException + { + Thread.sleep(sleepMillis, sleepNanos); + } + + /** + * Sub classes (tests basically) can use this to override disabling spinning + */ + protected void stopSpin() + { + if (spinning) + { + try + { + // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning + // when the buffer is inactive + spinLimiter.acquire(); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + + spinning = false; + } + } + + + /** + * Sub classes (tests basically) can use this to override disabling spinning + */ + protected void startSpin() + { + if (!spinning) + { + spinLimiter.release(); + + spinning = true; + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBufferObserver.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBufferObserver.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBufferObserver.java new file mode 100644 index 0000000..6560ac7 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBufferObserver.java @@ -0,0 +1,55 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.activemq6.core.journal.IOAsyncTask; + +/** + * A TimedBufferObserver + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public interface TimedBufferObserver +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOAsyncTask> callbacks); + + /** Return the number of remaining bytes that still fit on the observer (file) */ + int getRemainingBytes(); + + ByteBuffer newBuffer(int size, int limit); + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TransactionCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TransactionCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TransactionCallback.java new file mode 100644 index 0000000..ce3c157 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TransactionCallback.java @@ -0,0 +1,116 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq6.core.journal.IOAsyncTask; +import org.apache.activemq6.utils.ReusableLatch; + +/** + * A TransactionCallback + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class TransactionCallback implements IOAsyncTask +{ + private final ReusableLatch countLatch = new ReusableLatch(); + + private volatile String errorMessage = null; + + private volatile int errorCode = 0; + + private final AtomicInteger up = new AtomicInteger(); + + private volatile int done = 0; + + private volatile IOAsyncTask delegateCompletion; + + public void countUp() + { + up.incrementAndGet(); + countLatch.countUp(); + } + + public void done() + { + countLatch.countDown(); + if (++done == up.get() && delegateCompletion != null) + { + final IOAsyncTask delegateToCall = delegateCompletion; + // We need to set the delegateCompletion to null first or blocking commits could miss a callback + // What would affect mainly tests + delegateCompletion = null; + delegateToCall.done(); + } + } + + public void waitCompletion() throws InterruptedException + { + countLatch.await(); + + if (errorMessage != null) + { + throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage); + } + } + + public void onError(final int errorCode, final String errorMessage) + { + this.errorMessage = errorMessage; + + this.errorCode = errorCode; + + countLatch.countDown(); + + if (delegateCompletion != null) + { + delegateCompletion.onError(errorCode, errorMessage); + } + } + + /** + * @return the delegateCompletion + */ + public IOAsyncTask getDelegateCompletion() + { + return delegateCompletion; + } + + /** + * @param delegateCompletion the delegateCompletion to set + */ + public void setDelegateCompletion(final IOAsyncTask delegateCompletion) + { + this.delegateCompletion = delegateCompletion; + } + + /** + * @return the errorMessage + */ + public String getErrorMessage() + { + return errorMessage; + } + + /** + * @return the errorCode + */ + public int getErrorCode() + { + return errorCode; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/ByteArrayEncoding.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/ByteArrayEncoding.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/ByteArrayEncoding.java new file mode 100644 index 0000000..acb232b --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/ByteArrayEncoding.java @@ -0,0 +1,51 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl.dataformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.EncodingSupport; + +/** + * A ByteArrayEncoding + * + * @author <mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class ByteArrayEncoding implements EncodingSupport +{ + + final byte[] data; + + public ByteArrayEncoding(final byte[] data) + { + this.data = data; + } + + // Public -------------------------------------------------------- + + public void decode(final HornetQBuffer buffer) + { + throw new IllegalStateException("operation not supported"); + } + + public void encode(final HornetQBuffer buffer) + { + buffer.writeBytes(data); + } + + public int getEncodeSize() + { + return data.length; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecord.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecord.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecord.java new file mode 100644 index 0000000..d62eca7 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecord.java @@ -0,0 +1,85 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl.dataformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.core.journal.impl.JournalImpl; + +/** + * A JournalAddRecord + * + * @author <mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class JournalAddRecord extends JournalInternalRecord +{ + + private final long id; + + private final EncodingSupport record; + + private final byte recordType; + + private final boolean add; + + /** + * @param id + * @param recordType + * @param record + */ + public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record) + { + this.id = id; + + this.record = record; + + this.recordType = recordType; + + this.add = add; + } + + @Override + public void encode(final HornetQBuffer buffer) + { + if (add) + { + buffer.writeByte(JournalImpl.ADD_RECORD); + } + else + { + buffer.writeByte(JournalImpl.UPDATE_RECORD); + } + + buffer.writeInt(fileID); + + buffer.writeByte(compactCount); + + buffer.writeLong(id); + + buffer.writeInt(record.getEncodeSize()); + + buffer.writeByte(recordType); + + record.encode(buffer); + + buffer.writeInt(getEncodeSize()); + } + + @Override + public int getEncodeSize() + { + return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1; + } +} \ No newline at end of file
