http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalCompactor.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalCompactor.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalCompactor.java new file mode 100644 index 0000000..4fa1020 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalCompactor.java @@ -0,0 +1,639 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.core.journal.RecordInfo; +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding; +import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecord; +import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecordTX; +import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX; +import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX.TX_RECORD_TYPE; +import org.apache.activemq6.core.journal.impl.dataformat.JournalDeleteRecordTX; +import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord; +import org.apache.activemq6.core.journal.impl.dataformat.JournalRollbackRecordTX; +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * A JournalCompactor + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider +{ + // We try to separate old record from new ones when doing the compacting + // this is a split line + // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE + private static final short COMPACT_SPLIT_LINE = 2; + + // Snapshot of transactions that were pending when the compactor started + private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>(); + + private final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>(); + + private final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>(); + + /** Commands that happened during compacting + * We can't process any counts during compacting, as we won't know in what files the records are taking place, so + * we cache those updates. As soon as we are done we take the right account. */ + private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>(); + + public static SequentialFile readControlFile(final SequentialFileFactory fileFactory, + final List<String> dataFiles, + final List<String> newFiles, + final List<Pair<String, String>> renameFile) throws Exception + { + SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1); + + if (controlFile.exists()) + { + JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION); + + final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>(); + + JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() + { + @Override + public void onReadAddRecord(final RecordInfo info) throws Exception + { + records.add(info); + } + }); + + if (records.size() == 0) + { + return null; + } + else + { + HornetQBuffer input = HornetQBuffers.wrappedBuffer(records.get(0).data); + + int numberDataFiles = input.readInt(); + + for (int i = 0; i < numberDataFiles; i++) + { + dataFiles.add(input.readUTF()); + } + + int numberNewFiles = input.readInt(); + + for (int i = 0; i < numberNewFiles; i++) + { + newFiles.add(input.readUTF()); + } + + int numberRenames = input.readInt(); + for (int i = 0; i < numberRenames; i++) + { + String from = input.readUTF(); + String to = input.readUTF(); + renameFile.add(new Pair<String, String>(from, to)); + } + + } + + return controlFile; + } + else + { + return null; + } + } + + public List<JournalFile> getNewDataFiles() + { + return newDataFiles; + } + + public Map<Long, JournalRecord> getNewRecords() + { + return newRecords; + } + + public Map<Long, JournalTransaction> getNewTransactions() + { + return newTransactions; + } + + public JournalCompactor(final SequentialFileFactory fileFactory, + final JournalImpl journal, + final JournalFilesRepository filesRepository, + final Set<Long> recordsSnapshot, + final long firstFileID) + { + super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID); + } + + /** This methods informs the Compactor about the existence of a pending (non committed) transaction */ + public void addPendingTransaction(final long transactionID, final long[] ids) + { + pendingTransactions.put(transactionID, new PendingTransaction(ids)); + } + + public void addCommandCommit(final JournalTransaction liveTransaction, final JournalFile currentFile) + { + pendingCommands.add(new CommitCompactCommand(liveTransaction, currentFile)); + + long[] ids = liveTransaction.getPositiveArray(); + + PendingTransaction oldTransaction = pendingTransactions.get(liveTransaction.getId()); + long[] ids2 = null; + + if (oldTransaction != null) + { + ids2 = oldTransaction.pendingIDs; + } + + /** If a delete comes for these records, while the compactor still working, we need to be able to take them into account for later deletes + * instead of throwing exceptions about non existent records */ + if (ids != null) + { + for (long id : ids) + { + addToRecordsSnaptshot(id); + } + } + + if (ids2 != null) + { + for (long id : ids2) + { + addToRecordsSnaptshot(id); + } + } + } + + public void addCommandRollback(final JournalTransaction liveTransaction, final JournalFile currentFile) + { + pendingCommands.add(new RollbackCompactCommand(liveTransaction, currentFile)); + } + + /** + * @param id + * @param usedFile + */ + public void addCommandDelete(final long id, final JournalFile usedFile) + { + pendingCommands.add(new DeleteCompactCommand(id, usedFile)); + } + + /** + * @param id + * @param usedFile + */ + public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) + { + pendingCommands.add(new UpdateCompactCommand(id, usedFile, size)); + } + + private void checkSize(final int size) throws Exception + { + checkSize(size, -1); + } + + private void checkSize(final int size, final int compactCount) throws Exception + { + if (getWritingChannel() == null) + { + if (!checkCompact(compactCount)) + { + // will need to open a file either way + openFile(); + } + } + else + { + if (compactCount >= 0) + { + if (checkCompact(compactCount)) + { + // The file was already moved on this case, no need to check for the size. + // otherwise we will also need to check for the size + return; + } + } + + if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity()) + { + openFile(); + } + } + } + + int currentCount; + + // This means we will need to split when the compactCount is bellow the watermark + boolean willNeedToSplit = false; + + boolean splitted = false; + + private boolean checkCompact(final int compactCount) throws Exception + { + if (compactCount >= COMPACT_SPLIT_LINE && !splitted) + { + willNeedToSplit = true; + } + + if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE) + { + willNeedToSplit = false; + splitted = false; + openFile(); + return true; + } + else + { + return false; + } + } + + /** + * Replay pending counts that happened during compacting + */ + public void replayPendingCommands() + { + for (CompactCommand command : pendingCommands) + { + try + { + command.execute(); + } + catch (Exception e) + { + HornetQJournalLogger.LOGGER.errorReplayingCommands(e); + } + } + + pendingCommands.clear(); + } + + // JournalReaderCallback implementation ------------------------------------------- + + public void onReadAddRecord(final RecordInfo info) throws Exception + { + if (lookupRecord(info.id)) + { + JournalInternalRecord addRecord = new JournalAddRecord(true, + info.id, + info.getUserRecordType(), + new ByteArrayEncoding(info.data)); + addRecord.setCompactCount((short)(info.compactCount + 1)); + + checkSize(addRecord.getEncodeSize(), info.compactCount); + + writeEncoder(addRecord); + + newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize())); + } + } + + public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception + { + if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) + { + JournalTransaction newTransaction = getNewJournalTransaction(transactionID); + + JournalInternalRecord record = new JournalAddRecordTX(true, + transactionID, + info.id, + info.getUserRecordType(), + new ByteArrayEncoding(info.data)); + + record.setCompactCount((short)(info.compactCount + 1)); + + checkSize(record.getEncodeSize(), info.compactCount); + + newTransaction.addPositive(currentFile, info.id, record.getEncodeSize()); + + writeEncoder(record); + } + } + + public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception + { + + if (pendingTransactions.get(transactionID) != null) + { + // Sanity check, this should never happen + HornetQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID); + } + else + { + JournalTransaction newTransaction = newTransactions.remove(transactionID); + if (newTransaction != null) + { + JournalInternalRecord commitRecord = + new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, transactionID, null); + + checkSize(commitRecord.getEncodeSize()); + + writeEncoder(commitRecord, newTransaction.getCounter(currentFile)); + + newTransaction.commit(currentFile); + } + } + } + + public void onReadDeleteRecord(final long recordID) throws Exception + { + if (newRecords.get(recordID) != null) + { + // Sanity check, it should never happen + HornetQJournalLogger.LOGGER.inconsistencyDuringCompactingDelete(recordID); + } + + } + + public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception + { + if (pendingTransactions.get(transactionID) != null) + { + JournalTransaction newTransaction = getNewJournalTransaction(transactionID); + + JournalInternalRecord record = new JournalDeleteRecordTX(transactionID, + info.id, + new ByteArrayEncoding(info.data)); + + checkSize(record.getEncodeSize()); + + writeEncoder(record); + + newTransaction.addNegative(currentFile, info.id); + } + // else.. nothing to be done + } + + public void markAsDataFile(final JournalFile file) + { + // nothing to be done here + } + + public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception + { + if (pendingTransactions.get(transactionID) != null) + { + + JournalTransaction newTransaction = getNewJournalTransaction(transactionID); + + JournalInternalRecord prepareRecord = + new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, transactionID, new ByteArrayEncoding(extraData)); + + checkSize(prepareRecord.getEncodeSize()); + + writeEncoder(prepareRecord, newTransaction.getCounter(currentFile)); + + newTransaction.prepare(currentFile); + + } + } + + public void onReadRollbackRecord(final long transactionID) throws Exception + { + if (pendingTransactions.get(transactionID) != null) + { + // Sanity check, this should never happen + throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID + + " for an already rolled back transaction during compacting"); + } + else + { + JournalTransaction newTransaction = newTransactions.remove(transactionID); + if (newTransaction != null) + { + + JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID); + + checkSize(rollbackRecord.getEncodeSize()); + + writeEncoder(rollbackRecord); + + newTransaction.rollback(currentFile); + } + + } + } + + public void onReadUpdateRecord(final RecordInfo info) throws Exception + { + if (lookupRecord(info.id)) + { + JournalInternalRecord updateRecord = new JournalAddRecord(false, + info.id, + info.userRecordType, + new ByteArrayEncoding(info.data)); + + updateRecord.setCompactCount((short)(info.compactCount + 1)); + + checkSize(updateRecord.getEncodeSize(), info.compactCount); + + JournalRecord newRecord = newRecords.get(info.id); + + if (newRecord == null) + { + HornetQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id); + } + else + { + newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize()); + } + + writeEncoder(updateRecord); + } + } + + public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception + { + if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) + { + JournalTransaction newTransaction = getNewJournalTransaction(transactionID); + + JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, + transactionID, + info.id, + info.userRecordType, + new ByteArrayEncoding(info.data)); + + updateRecordTX.setCompactCount((short)(info.compactCount + 1)); + + checkSize(updateRecordTX.getEncodeSize(), info.compactCount); + + writeEncoder(updateRecordTX); + + newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize()); + } + else + { + onReadUpdateRecord(info); + } + } + + /** + * @param transactionID + * @return + */ + private JournalTransaction getNewJournalTransaction(final long transactionID) + { + JournalTransaction newTransaction = newTransactions.get(transactionID); + if (newTransaction == null) + { + newTransaction = new JournalTransaction(transactionID, this); + newTransactions.put(transactionID, newTransaction); + } + return newTransaction; + } + + private abstract static class CompactCommand + { + abstract void execute() throws Exception; + } + + private class DeleteCompactCommand extends CompactCommand + { + long id; + + JournalFile usedFile; + + public DeleteCompactCommand(final long id, final JournalFile usedFile) + { + this.id = id; + this.usedFile = usedFile; + } + + @Override + void execute() throws Exception + { + JournalRecord deleteRecord = journal.getRecords().remove(id); + if (deleteRecord == null) + { + HornetQJournalLogger.LOGGER.noRecordDuringCompactReplay(id); + } + else + { + deleteRecord.delete(usedFile); + } + } + } + + private static class PendingTransaction + { + long[] pendingIDs; + + PendingTransaction(final long[] ids) + { + pendingIDs = ids; + } + + } + + private class UpdateCompactCommand extends CompactCommand + { + private final long id; + + private final JournalFile usedFile; + + private final int size; + + public UpdateCompactCommand(final long id, final JournalFile usedFile, final int size) + { + this.id = id; + this.usedFile = usedFile; + this.size = size; + } + + @Override + void execute() throws Exception + { + JournalRecord updateRecord = journal.getRecords().get(id); + updateRecord.addUpdateFile(usedFile, size); + } + } + + private class CommitCompactCommand extends CompactCommand + { + private final JournalTransaction liveTransaction; + + /** File containing the commit record */ + private final JournalFile commitFile; + + public CommitCompactCommand(final JournalTransaction liveTransaction, final JournalFile commitFile) + { + this.liveTransaction = liveTransaction; + this.commitFile = commitFile; + } + + @Override + void execute() throws Exception + { + JournalTransaction newTransaction = newTransactions.get(liveTransaction.getId()); + if (newTransaction != null) + { + liveTransaction.merge(newTransaction); + liveTransaction.commit(commitFile); + } + newTransactions.remove(liveTransaction.getId()); + } + } + + private class RollbackCompactCommand extends CompactCommand + { + private final JournalTransaction liveTransaction; + + /** File containing the commit record */ + private final JournalFile rollbackFile; + + public RollbackCompactCommand(final JournalTransaction liveTransaction, final JournalFile rollbackFile) + { + this.liveTransaction = liveTransaction; + this.rollbackFile = rollbackFile; + } + + @Override + void execute() throws Exception + { + JournalTransaction newTransaction = newTransactions.get(liveTransaction.getId()); + if (newTransaction != null) + { + liveTransaction.merge(newTransaction); + liveTransaction.rollback(rollbackFile); + } + newTransactions.remove(liveTransaction.getId()); + } + } + + @Override + public JournalCompactor getCompactor() + { + return null; + } + + @Override + public Map<Long, JournalRecord> getRecords() + { + return newRecords; + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalConstants.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalConstants.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalConstants.java new file mode 100644 index 0000000..1b5ab65 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalConstants.java @@ -0,0 +1,23 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +public final class JournalConstants +{ + + public static final int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = 490 * 1024; + public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = (int)(1000000000d / 2000); + public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO = (int)(1000000000d / 300); + public static final int DEFAULT_JOURNAL_BUFFER_SIZE_NIO = 490 * 1024; + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFile.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFile.java new file mode 100644 index 0000000..87f3dbe --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFile.java @@ -0,0 +1,67 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import org.apache.activemq6.core.journal.SequentialFile; + +/** + * + * A JournalFile + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public interface JournalFile +{ + int getNegCount(JournalFile file); + + void incNegCount(JournalFile file); + + int getPosCount(); + + void incPosCount(); + + void decPosCount(); + + void addSize(int bytes); + + void decSize(int bytes); + + int getLiveSize(); + + /** The total number of deletes this file has */ + int getTotalNegativeToOthers(); + + /** + * Whether this file's contents can deleted and the file reused. + * @param canDelete if {@code true} then this file's contents are unimportant and may be deleted + * at any time. + */ + void setCanReclaim(boolean canDelete); + + /** + * Whether this file's contents can deleted and the file reused. + * @return {@code true} if the file can already be deleted. + */ + boolean isCanReclaim(); + + /** This is a field to identify that records on this file actually belong to the current file. + * The possible implementation for this is fileID & Integer.MAX_VALUE */ + int getRecordID(); + + long getFileID(); + + int getJournalVersion(); + + SequentialFile getFile(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFileImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFileImpl.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFileImpl.java new file mode 100644 index 0000000..8aeee12 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFileImpl.java @@ -0,0 +1,206 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq6.core.journal.SequentialFile; + +/** + * + * A JournalFileImpl + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +public class JournalFileImpl implements JournalFile +{ + private final SequentialFile file; + + private final long fileID; + + private final int recordID; + + private long offset; + + private final AtomicInteger posCount = new AtomicInteger(0); + + private final AtomicInteger liveBytes = new AtomicInteger(0); + + private boolean canReclaim; + + private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0); + + private final int version; + + private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>(); + + public JournalFileImpl(final SequentialFile file, final long fileID, final int version) + { + this.file = file; + + this.fileID = fileID; + + this.version = version; + + recordID = (int)(fileID & Integer.MAX_VALUE); + } + + public int getPosCount() + { + return posCount.intValue(); + } + + @Override + public boolean isCanReclaim() + { + return canReclaim; + } + + @Override + public void setCanReclaim(final boolean canReclaim) + { + this.canReclaim = canReclaim; + } + + public void incNegCount(final JournalFile file) + { + if (file != this) + { + totalNegativeToOthers.incrementAndGet(); + } + getOrCreateNegCount(file).incrementAndGet(); + } + + public int getNegCount(final JournalFile file) + { + AtomicInteger count = negCounts.get(file); + + if (count == null) + { + return 0; + } + else + { + return count.intValue(); + } + } + + public int getJournalVersion() + { + return version; + } + + public void incPosCount() + { + posCount.incrementAndGet(); + } + + public void decPosCount() + { + posCount.decrementAndGet(); + } + + public long getOffset() + { + return offset; + } + + public long getFileID() + { + return fileID; + } + + public int getRecordID() + { + return recordID; + } + + public void setOffset(final long offset) + { + this.offset = offset; + } + + public SequentialFile getFile() + { + return file; + } + + @Override + public String toString() + { + try + { + return "JournalFileImpl: (" + file.getFileName() + " id = " + fileID + ", recordID = " + recordID + ")"; + } + catch (Exception e) + { + e.printStackTrace(); + return "Error:" + e.toString(); + } + } + + /** Receive debug information about the journal */ + public String debug() + { + StringBuilder builder = new StringBuilder(); + + for (Entry<JournalFile, AtomicInteger> entry : negCounts.entrySet()) + { + builder.append(" file = " + entry.getKey() + " negcount value = " + entry.getValue() + "\n"); + } + + return builder.toString(); + } + + private synchronized AtomicInteger getOrCreateNegCount(final JournalFile file) + { + AtomicInteger count = negCounts.get(file); + + if (count == null) + { + count = new AtomicInteger(); + negCounts.put(file, count); + } + + return count; + } + + @Override + public void addSize(final int bytes) + { + liveBytes.addAndGet(bytes); + } + + @Override + public void decSize(final int bytes) + { + liveBytes.addAndGet(-bytes); + } + + @Override + public int getLiveSize() + { + return liveBytes.get(); + } + + public int getTotalNegativeToOthers() + { + return totalNegativeToOthers.get(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFilesRepository.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFilesRepository.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFilesRepository.java new file mode 100644 index 0000000..56ab68f --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFilesRepository.java @@ -0,0 +1,767 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles + * Guaranteeing that they will be delivered in order to the Journal + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public class JournalFilesRepository +{ + private static final boolean trace = HornetQJournalLogger.LOGGER.isTraceEnabled(); + + /** + * Used to debug the consistency of the journal ordering. + * <p/> + * This is meant to be false as these extra checks would cause performance issues + */ + private static final boolean CHECK_CONSISTENCE = false; + + // This method exists just to make debug easier. + // I could replace log.trace by log.info temporarily while I was debugging + // Journal + private static void trace(final String message) + { + HornetQJournalLogger.LOGGER.trace(message); + } + + private final SequentialFileFactory fileFactory; + + private final JournalImpl journal; + + private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>(); + + private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>(); + + private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>(); + + private final AtomicLong nextFileID = new AtomicLong(0); + + private final int maxAIO; + + private final int minFiles; + + private final int fileSize; + + private final String filePrefix; + + private final String fileExtension; + + private final int userVersion; + + private final AtomicInteger freeFilesCount = new AtomicInteger(0); + + private Executor openFilesExecutor; + + private final Runnable pushOpenRunnable = new Runnable() + { + public void run() + { + try + { + pushOpenedFile(); + } + catch (Exception e) + { + HornetQJournalLogger.LOGGER.errorPushingFile(e); + } + } + }; + + public JournalFilesRepository(final SequentialFileFactory fileFactory, + final JournalImpl journal, + final String filePrefix, + final String fileExtension, + final int userVersion, + final int maxAIO, + final int fileSize, + final int minFiles) + { + if (filePrefix == null) + { + throw new IllegalArgumentException("filePrefix cannot be null"); + } + if (fileExtension == null) + { + throw new IllegalArgumentException("fileExtension cannot be null"); + } + if (maxAIO <= 0) + { + throw new IllegalArgumentException("maxAIO must be a positive number"); + } + this.fileFactory = fileFactory; + this.maxAIO = maxAIO; + this.filePrefix = filePrefix; + this.fileExtension = fileExtension; + this.minFiles = minFiles; + this.fileSize = fileSize; + this.userVersion = userVersion; + this.journal = journal; + } + + // Public -------------------------------------------------------- + + public void setExecutor(final Executor fileExecutor) + { + this.openFilesExecutor = fileExecutor; + } + + public void clear() throws Exception + { + dataFiles.clear(); + + freeFiles.clear(); + + freeFilesCount.set(0); + + for (JournalFile file : openedFiles) + { + try + { + file.getFile().close(); + } + catch (Exception e) + { + HornetQJournalLogger.LOGGER.errorClosingFile(e); + } + } + openedFiles.clear(); + } + + public int getMaxAIO() + { + return maxAIO; + } + + public String getFileExtension() + { + return fileExtension; + } + + public String getFilePrefix() + { + return filePrefix; + } + + public void calculateNextfileID(final List<JournalFile> files) + { + + for (JournalFile file : files) + { + final long fileIdFromFile = file.getFileID(); + final long fileIdFromName = getFileNameID(file.getFile().getFileName()); + + // The compactor could create a fileName but use a previously assigned ID. + // Because of that we need to take both parts into account + setNextFileID(Math.max(fileIdFromName, fileIdFromFile)); + } + } + + /** + * Set the {link #nextFileID} value to {@code targetUpdate} if the current value is less than + * {@code targetUpdate}. + * <p/> + * Notice that {@code nextFileID} is incremented before being used, see + * {@link JournalFilesRepository#generateFileID()}. + * + * @param targetUpdate + */ + public void setNextFileID(final long targetUpdate) + { + while (true) + { + final long current = nextFileID.get(); + if (current >= targetUpdate) + return; + + if (nextFileID.compareAndSet(current, targetUpdate)) + return; + } + } + + public void ensureMinFiles() throws Exception + { + int filesToCreate = minFiles - (dataFiles.size() + freeFilesCount.get()); + + if (filesToCreate > 0) + { + for (int i = 0; i < filesToCreate; i++) + { + // Keeping all files opened can be very costly (mainly on AIO) + freeFiles.add(createFile(false, false, true, false, -1)); + freeFilesCount.getAndIncrement(); + } + } + + } + + public void openFile(final JournalFile file, final boolean multiAIO) throws Exception + { + if (multiAIO) + { + file.getFile().open(); + } + else + { + file.getFile().open(1, false); + } + + file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER)); + } + + // Data File Operations ========================================== + + public JournalFile[] getDataFilesArray() + { + return dataFiles.toArray(new JournalFile[dataFiles.size()]); + } + + public JournalFile pollLastDataFile() + { + return dataFiles.pollLast(); + } + + public void removeDataFile(final JournalFile file) + { + if (!dataFiles.remove(file)) + { + HornetQJournalLogger.LOGGER.couldNotRemoveFile(file); + } + } + + public int getDataFilesCount() + { + return dataFiles.size(); + } + + public Collection<JournalFile> getDataFiles() + { + return dataFiles; + } + + public void clearDataFiles() + { + dataFiles.clear(); + } + + public void addDataFileOnTop(final JournalFile file) + { + dataFiles.addFirst(file); + + if (CHECK_CONSISTENCE) + { + checkDataFiles(); + } + } + + public String debugFiles() + { + StringBuilder buffer = new StringBuilder(); + + buffer.append("**********\nCurrent File = " + journal.getCurrentFile() + "\n"); + buffer.append("**********\nDataFiles:\n"); + for (JournalFile file : dataFiles) + { + buffer.append(file.toString() + "\n"); + } + buffer.append("*********\nFreeFiles:\n"); + for (JournalFile file : freeFiles) + { + buffer.append(file.toString() + "\n"); + } + return buffer.toString(); + } + + public synchronized void checkDataFiles() + { + long seq = -1; + for (JournalFile file : dataFiles) + { + if (file.getFileID() <= seq) + { + HornetQJournalLogger.LOGGER.checkFiles(); + HornetQJournalLogger.LOGGER.info(debugFiles()); + HornetQJournalLogger.LOGGER.seqOutOfOrder(); + System.exit(-1); + } + + if (journal.getCurrentFile() != null && journal.getCurrentFile().getFileID() <= file.getFileID()) + { + HornetQJournalLogger.LOGGER.checkFiles(); + HornetQJournalLogger.LOGGER.info(debugFiles()); + HornetQJournalLogger.LOGGER.currentFile(file.getFileID(), journal.getCurrentFile().getFileID(), + file.getFileID(), (journal.getCurrentFile() == file)); + + // throw new RuntimeException ("Check failure!"); + } + + if (journal.getCurrentFile() == file) + { + throw new RuntimeException("Check failure! Current file listed as data file!"); + } + + seq = file.getFileID(); + } + + long lastFreeId = -1; + for (JournalFile file : freeFiles) + { + if (file.getFileID() <= lastFreeId) + { + HornetQJournalLogger.LOGGER.checkFiles(); + HornetQJournalLogger.LOGGER.info(debugFiles()); + HornetQJournalLogger.LOGGER.fileIdOutOfOrder(); + + throw new RuntimeException("Check failure!"); + } + + lastFreeId = file.getFileID(); + + if (file.getFileID() < seq) + { + HornetQJournalLogger.LOGGER.checkFiles(); + HornetQJournalLogger.LOGGER.info(debugFiles()); + HornetQJournalLogger.LOGGER.fileTooSmall(); + + // throw new RuntimeException ("Check failure!"); + } + } + } + + public void addDataFileOnBottom(final JournalFile file) + { + dataFiles.add(file); + + if (CHECK_CONSISTENCE) + { + checkDataFiles(); + } + } + + // Free File Operations ========================================== + + public int getFreeFilesCount() + { + return freeFilesCount.get(); + } + + /** + * @param file + * @throws Exception + */ + public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception + { + addFreeFile(file, renameTmp, true); + } + + /** + * @param file + * @param renameTmp - should rename the file as it's being added to free files + * @param checkDelete - should delete the file if max condition has been met + * @throws Exception + */ + public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp, final boolean checkDelete) throws Exception + { + long calculatedSize = 0; + try + { + calculatedSize = file.getFile().size(); + } + catch (Exception e) + { + e.printStackTrace(); + System.out.println("Can't get file size on " + file); + System.exit(-1); + } + if (calculatedSize != fileSize) + { + HornetQJournalLogger.LOGGER.deletingFile(file); + file.getFile().delete(); + } + else + if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < minFiles)) + { + // Re-initialise it + + if (JournalFilesRepository.trace) + { + JournalFilesRepository.trace("Adding free file " + file); + } + + JournalFile jf = reinitializeFile(file); + + if (renameTmp) + { + jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp")); + } + + freeFiles.add(jf); + freeFilesCount.getAndIncrement(); + } + else + { + if (trace) + { + HornetQJournalLogger.LOGGER.trace("DataFiles.size() = " + dataFiles.size()); + HornetQJournalLogger.LOGGER.trace("openedFiles.size() = " + openedFiles.size()); + HornetQJournalLogger.LOGGER.trace("minfiles = " + minFiles); + HornetQJournalLogger.LOGGER.trace("Free Files = " + freeFilesCount.get()); + HornetQJournalLogger.LOGGER.trace("File " + file + + " being deleted as freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() (" + + (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size()) + + ") < minFiles (" + minFiles + ")"); + } + file.getFile().delete(); + } + + if (CHECK_CONSISTENCE) + { + checkDataFiles(); + } + } + + public Collection<JournalFile> getFreeFiles() + { + return freeFiles; + } + + public JournalFile getFreeFile() + { + JournalFile file = freeFiles.remove(); + freeFilesCount.getAndDecrement(); + return file; + } + + // Opened files operations ======================================= + + public int getOpenedFilesCount() + { + return openedFiles.size(); + } + + /** + * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p> + * <p>In case there are no cached opened files, this method will block until the file was opened, + * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p> + */ + public JournalFile openFile() throws InterruptedException + { + if (JournalFilesRepository.trace) + { + JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size()); + } + + if (openFilesExecutor == null) + { + pushOpenRunnable.run(); + } + else + { + openFilesExecutor.execute(pushOpenRunnable); + } + + JournalFile nextFile = null; + + while (nextFile == null) + { + nextFile = openedFiles.poll(5, TimeUnit.SECONDS); + if (nextFile == null) + { + HornetQJournalLogger.LOGGER.errorOpeningFile(new Exception("trace")); + } + } + + if (JournalFilesRepository.trace) + { + JournalFilesRepository.trace("Returning file " + nextFile); + } + + return nextFile; + } + + /** + * Open a file and place it into the openedFiles queue + */ + public void pushOpenedFile() throws Exception + { + JournalFile nextOpenedFile = takeFile(true, true, true, false); + + if (JournalFilesRepository.trace) + { + JournalFilesRepository.trace("pushing openFile " + nextOpenedFile); + } + + if (!openedFiles.offer(nextOpenedFile)) + { + HornetQJournalLogger.LOGGER.failedToAddFile(nextOpenedFile); + } + } + + public void closeFile(final JournalFile file) throws Exception + { + fileFactory.deactivateBuffer(); + file.getFile().close(); + dataFiles.add(file); + } + + /** + * This will get a File from freeFile without initializing it + * + * @return uninitialized JournalFile + * @throws Exception + * @see {@link JournalImpl#initFileHeader(SequentialFileFactory, SequentialFile, int, long)} + */ + public JournalFile takeFile(final boolean keepOpened, + final boolean multiAIO, + final boolean initFile, + final boolean tmpCompactExtension) throws Exception + { + JournalFile nextFile = null; + + nextFile = freeFiles.poll(); + + if (nextFile != null) + { + freeFilesCount.getAndDecrement(); + } + + if (nextFile == null) + { + nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension, -1); + } + else + { + if (tmpCompactExtension) + { + SequentialFile sequentialFile = nextFile.getFile(); + sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp"); + } + + if (keepOpened) + { + openFile(nextFile, multiAIO); + } + } + return nextFile; + } + + /** + * Creates files for journal synchronization of a replicated backup. + * <p/> + * In order to simplify synchronization, the file IDs in the backup match those in the live + * server. + * + * @param fileID the fileID to use when creating the file. + */ + public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception + { + return createFile(false, false, true, false, fileID); + } + + /** + * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER + * + * @param keepOpened + * @return an initialized journal file + * @throws Exception + */ + private JournalFile createFile(final boolean keepOpened, + final boolean multiAIO, + final boolean init, + final boolean tmpCompact, + final long fileIdPreSet) throws Exception + { + if (System.getSecurityManager() == null) + { + return createFile0(keepOpened, multiAIO, init, tmpCompact, fileIdPreSet); + } + else + { + try + { + return AccessController.doPrivileged(new PrivilegedExceptionAction<JournalFile>() + { + @Override + public JournalFile run() throws Exception + { + return createFile0(keepOpened, multiAIO, init, tmpCompact, fileIdPreSet); + } + }); + } + catch (PrivilegedActionException e) + { + throw unwrapException(e); + } + } + } + + private RuntimeException unwrapException(PrivilegedActionException e) throws Exception + { + Throwable c = e.getCause(); + if (c instanceof RuntimeException) + { + throw (RuntimeException) c; + } + else if (c instanceof Error) + { + throw (Error) c; + } + else + { + throw new RuntimeException(c); + } + } + + private JournalFile createFile0(final boolean keepOpened, + final boolean multiAIO, + final boolean init, + final boolean tmpCompact, + final long fileIdPreSet) throws Exception + { + long fileID = fileIdPreSet != -1 ? fileIdPreSet : generateFileID(); + + final String fileName = createFileName(tmpCompact, fileID); + + if (JournalFilesRepository.trace) + { + JournalFilesRepository.trace("Creating file " + fileName); + } + + String tmpFileName = fileName + ".tmp"; + + SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO); + + sequentialFile.open(1, false); + + if (init) + { + sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER); + + JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID); + } + + long position = sequentialFile.position(); + + sequentialFile.close(); + + if (JournalFilesRepository.trace) + { + JournalFilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName); + } + + sequentialFile.renameTo(fileName); + + if (keepOpened) + { + if (multiAIO) + { + sequentialFile.open(); + } + else + { + sequentialFile.open(1, false); + } + sequentialFile.position(position); + } + + return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION); + } + + /** + * @param tmpCompact + * @param fileID + * @return + */ + private String createFileName(final boolean tmpCompact, final long fileID) + { + String fileName; + if (tmpCompact) + { + fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp"; + } + else + { + fileName = filePrefix + "-" + fileID + "." + fileExtension; + } + return fileName; + } + + private long generateFileID() + { + return nextFileID.incrementAndGet(); + } + + /** + * Get the ID part of the name + */ + private long getFileNameID(final String fileName) + { + try + { + return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.'))); + } + catch (Throwable e) + { + HornetQJournalLogger.LOGGER.errorRetrievingID(e, fileName); + return 0; + } + } + + // Discard the old JournalFile and set it with a new ID + private JournalFile reinitializeFile(final JournalFile file) throws Exception + { + long newFileID = generateFileID(); + + SequentialFile sf = file.getFile(); + + sf.open(1, false); + + int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID); + + JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION); + + sf.position(position); + + sf.close(); + + return jf; + } + + @Override + public String toString() + { + return "JournalFilesRepository(dataFiles=" + dataFiles + ", freeFiles=" + freeFiles + ", openedFiles=" + + openedFiles + ")"; + } +}
