http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractJournalUpdateTask.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractJournalUpdateTask.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractJournalUpdateTask.java new file mode 100644 index 0000000..54ab464 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractJournalUpdateTask.java @@ -0,0 +1,253 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding; +import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecord; +import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord; +import org.apache.activemq6.utils.ConcurrentHashSet; + +/** + * + * Super class for Journal maintenances such as clean up and Compactor + * + * @author <mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public abstract class AbstractJournalUpdateTask implements JournalReaderCallback +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + protected static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr"; + + protected final JournalImpl journal; + + protected final SequentialFileFactory fileFactory; + + protected JournalFile currentFile; + + protected SequentialFile sequentialFile; + + protected final JournalFilesRepository filesRepository; + + protected long nextOrderingID; + + private HornetQBuffer writingChannel; + + private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>(); + + protected final List<JournalFile> newDataFiles = new ArrayList<JournalFile>(); + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory, + final JournalImpl journal, + final JournalFilesRepository filesRepository, + final Set<Long> recordsSnapshot, + final long nextOrderingID) + { + super(); + this.journal = journal; + this.filesRepository = filesRepository; + this.fileFactory = fileFactory; + this.nextOrderingID = nextOrderingID; + this.recordsSnapshot.addAll(recordsSnapshot); + } + + // Public -------------------------------------------------------- + + public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory, + final List<JournalFile> files, + final List<JournalFile> newFiles, + final List<Pair<String, String>> renames) throws Exception + { + + SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1); + + try + { + controlFile.open(1, false); + + JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0); + + HornetQBuffer filesToRename = HornetQBuffers.dynamicBuffer(1); + + // DataFiles first + + if (files == null) + { + filesToRename.writeInt(0); + } + else + { + filesToRename.writeInt(files.size()); + + for (JournalFile file : files) + { + filesToRename.writeUTF(file.getFile().getFileName()); + } + } + + // New Files second + + if (newFiles == null) + { + filesToRename.writeInt(0); + } + else + { + filesToRename.writeInt(newFiles.size()); + + for (JournalFile file : newFiles) + { + filesToRename.writeUTF(file.getFile().getFileName()); + } + } + + // Renames from clean up third + if (renames == null) + { + filesToRename.writeInt(0); + } + else + { + filesToRename.writeInt(renames.size()); + for (Pair<String, String> rename : renames) + { + filesToRename.writeUTF(rename.getA()); + filesToRename.writeUTF(rename.getB()); + } + } + + JournalInternalRecord controlRecord = new JournalAddRecord(true, + 1, + (byte)0, + new ByteArrayEncoding(filesToRename.toByteBuffer() + .array())); + + HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(filesToRename.writerIndex()); + + controlRecord.setFileID(0); + + controlRecord.encode(renameBuffer); + + ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex()); + + writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex()); + + writeBuffer.rewind(); + + controlFile.writeDirect(writeBuffer, true); + + return controlFile; + } + finally + { + controlFile.close(); + } + } + + /** Write pending output into file */ + public void flush() throws Exception + { + if (writingChannel != null) + { + sequentialFile.position(0); + + // To Fix the size of the file + writingChannel.writerIndex(writingChannel.capacity()); + + sequentialFile.writeInternal(writingChannel.toByteBuffer()); + sequentialFile.close(); + newDataFiles.add(currentFile); + } + + writingChannel = null; + } + + public boolean lookupRecord(final long id) + { + return recordsSnapshot.contains(id); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + /** + * @throws Exception + */ + + protected void openFile() throws Exception + { + flush(); + + ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize()); + + writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite); + + currentFile = filesRepository.takeFile(false, false, false, true); + + sequentialFile = currentFile.getFile(); + + sequentialFile.open(1, false); + + currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION); + + JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID()); + } + + protected void addToRecordsSnaptshot(final long id) + { + recordsSnapshot.add(id); + } + + /** + * @return the writingChannel + */ + protected HornetQBuffer getWritingChannel() + { + return writingChannel; + } + + protected void writeEncoder(final JournalInternalRecord record) throws Exception + { + record.setFileID(currentFile.getRecordID()); + record.encode(getWritingChannel()); + } + + protected void writeEncoder(final JournalInternalRecord record, final int txcounter) throws Exception + { + record.setNumberOfRecords(txcounter); + writeEncoder(record); + } + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFile.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFile.java new file mode 100644 index 0000000..1f4362a --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFile.java @@ -0,0 +1,408 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQIOErrorException; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.core.journal.IOAsyncTask; +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.journal.HornetQJournalBundle; +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * A AbstractSequentialFile + * + * @author <mailto:[email protected]">Clebert Suconic</a> + */ +public abstract class AbstractSequentialFile implements SequentialFile +{ + + private File file; + + private final String directory; + + protected final SequentialFileFactory factory; + + protected long fileSize = 0; + + protected final AtomicLong position = new AtomicLong(0); + + protected TimedBuffer timedBuffer; + + /** + * Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class. + * This is the class returned to the factory when the file is being activated. + */ + protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver(); + + /** + * Used for asynchronous writes + */ + protected final Executor writerExecutor; + + /** + * @param file + * @param directory + */ + public AbstractSequentialFile(final String directory, + final File file, + final SequentialFileFactory factory, + final Executor writerExecutor) + { + super(); + this.file = file; + this.directory = directory; + this.factory = factory; + this.writerExecutor = writerExecutor; + } + + // Public -------------------------------------------------------- + + public final boolean exists() + { + return file.exists(); + } + + public final String getFileName() + { + return file.getName(); + } + + public final void delete() throws IOException, InterruptedException, HornetQException + { + if (isOpen()) + { + close(); + } + + if (file.exists() && !file.delete()) + { + HornetQJournalLogger.LOGGER.errorDeletingFile(this); + } + } + + public void copyTo(SequentialFile newFileName) throws Exception + { + try + { + HornetQJournalLogger.LOGGER.debug("Copying " + this + " as " + newFileName); + if (!newFileName.isOpen()) + { + newFileName.open(); + } + + if (!isOpen()) + { + this.open(); + } + + + ByteBuffer buffer = ByteBuffer.allocate(10 * 1024); + + for (;;) + { + buffer.rewind(); + int size = this.read(buffer); + newFileName.writeDirect(buffer, false); + if (size < 10 * 1024) + { + break; + } + } + newFileName.close(); + this.close(); + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + } + + /** + * @throws IOException only declare exception due to signature. Sub-class needs it. + */ + @Override + public void position(final long pos) throws IOException + { + position.set(pos); + } + + public long position() + { + return position.get(); + } + + public final void renameTo(final String newFileName) throws IOException, InterruptedException, + HornetQException + { + try + { + close(); + } + catch (IOException e) + { + factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } + + File newFile = new File(directory + "/" + newFileName); + + if (!file.equals(newFile)) + { + if (!file.renameTo(newFile)) + { + throw HornetQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName); + } + file = newFile; + } + } + + /** + * @throws IOException we declare throwing IOException because sub-classes need to do it + * @throws HornetQException + */ + public synchronized void close() throws IOException, InterruptedException, HornetQException + { + final CountDownLatch donelatch = new CountDownLatch(1); + + if (writerExecutor != null) + { + writerExecutor.execute(new Runnable() + { + public void run() + { + donelatch.countDown(); + } + }); + + while (!donelatch.await(60, TimeUnit.SECONDS)) + { + HornetQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName()); + } + } + } + + public final boolean fits(final int size) + { + if (timedBuffer == null) + { + return position.get() + size <= fileSize; + } + else + { + return timedBuffer.checkSize(size); + } + } + + public void setTimedBuffer(final TimedBuffer buffer) + { + if (timedBuffer != null) + { + timedBuffer.setObserver(null); + } + + timedBuffer = buffer; + + if (buffer != null) + { + buffer.setObserver(timedBufferObserver); + } + + } + + public void write(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException + { + if (timedBuffer != null) + { + bytes.setIndex(0, bytes.capacity()); + timedBuffer.addBytes(bytes, sync, callback); + } + else + { + ByteBuffer buffer = factory.newBuffer(bytes.capacity()); + buffer.put(bytes.toByteBuffer().array()); + buffer.rewind(); + writeDirect(buffer, sync, callback); + } + } + + public void write(final HornetQBuffer bytes, final boolean sync) throws IOException, InterruptedException, + HornetQException + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + + write(bytes, true, completion); + + completion.waitCompletion(); + } + else + { + write(bytes, false, DummyCallback.getInstance()); + } + } + + public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) + { + if (timedBuffer != null) + { + timedBuffer.addBytes(bytes, sync, callback); + } + else + { + ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize()); + + // If not using the TimedBuffer, a final copy is necessary + // Because AIO will need a specific Buffer + // And NIO will also need a whole buffer to perform the write + + HornetQBuffer outBuffer = HornetQBuffers.wrappedBuffer(buffer); + bytes.encode(outBuffer); + buffer.rewind(); + writeDirect(buffer, sync, callback); + } + } + + public void write(final EncodingSupport bytes, final boolean sync) throws InterruptedException, HornetQException + { + if (sync) + { + SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); + + write(bytes, true, completion); + + completion.waitCompletion(); + } + else + { + write(bytes, false, DummyCallback.getInstance()); + } + } + + protected File getFile() + { + return file; + } + + private static final class DelegateCallback implements IOAsyncTask + { + final List<IOAsyncTask> delegates; + + private DelegateCallback(final List<IOAsyncTask> delegates) + { + this.delegates = delegates; + } + + public void done() + { + for (IOAsyncTask callback : delegates) + { + try + { + callback.done(); + } + catch (Throwable e) + { + HornetQJournalLogger.LOGGER.errorCompletingCallback(e); + } + } + } + + public void onError(final int errorCode, final String errorMessage) + { + for (IOAsyncTask callback : delegates) + { + try + { + callback.onError(errorCode, errorMessage); + } + catch (Throwable e) + { + HornetQJournalLogger.LOGGER.errorCallingErrorCallback(e); + } + } + } + } + + protected ByteBuffer newBuffer(int size, int limit) + { + size = factory.calculateBlockSize(size); + limit = factory.calculateBlockSize(limit); + + ByteBuffer buffer = factory.newBuffer(size); + buffer.limit(limit); + return buffer; + } + + protected class LocalBufferObserver implements TimedBufferObserver + { + public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks) + { + buffer.flip(); + + if (buffer.limit() == 0) + { + factory.releaseBuffer(buffer); + } + else + { + writeDirect(buffer, requestedSync, new DelegateCallback(callbacks)); + } + } + + public ByteBuffer newBuffer(final int size, final int limit) + { + return AbstractSequentialFile.this.newBuffer(size, limit); + } + + public int getRemainingBytes() + { + if (fileSize - position.get() > Integer.MAX_VALUE) + { + return Integer.MAX_VALUE; + } + else + { + return (int)(fileSize - position.get()); + } + } + + @Override + public String toString() + { + return "TimedBufferObserver on file (" + getFile().getName() + ")"; + } + + } + + @Override + public File getJavaFile() + { + return getFile().getAbsoluteFile(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFileFactory.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFileFactory.java new file mode 100644 index 0000000..4a4c8f2 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFileFactory.java @@ -0,0 +1,220 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.core.journal.IOCriticalErrorListener; +import org.apache.activemq6.core.journal.SequentialFile; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.journal.HornetQJournalLogger; +import org.apache.activemq6.utils.HornetQThreadFactory; + +/** + * + * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + */ +abstract class AbstractSequentialFileFactory implements SequentialFileFactory +{ + + // Timeout used to wait executors to shutdown + protected static final int EXECUTOR_TIMEOUT = 60; + + protected final String journalDir; + + protected final TimedBuffer timedBuffer; + + protected final int bufferSize; + + protected final long bufferTimeout; + + private final IOCriticalErrorListener critialErrorListener; + + /** + * Asynchronous writes need to be done at another executor. + * This needs to be done at NIO, or else we would have the callers thread blocking for the return. + * At AIO this is necessary as context switches on writes would fire flushes at the kernel. + * */ + protected ExecutorService writeExecutor; + + AbstractSequentialFileFactory(final String journalDir, + final boolean buffered, + final int bufferSize, + final int bufferTimeout, + final boolean logRates, + final IOCriticalErrorListener criticalErrorListener) + { + this.journalDir = journalDir; + + if (buffered) + { + timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates); + } + else + { + timedBuffer = null; + } + this.bufferSize = bufferSize; + this.bufferTimeout = bufferTimeout; + this.critialErrorListener = criticalErrorListener; + } + + public void stop() + { + if (timedBuffer != null) + { + timedBuffer.stop(); + } + + if (isSupportsCallbacks() && writeExecutor != null) + { + writeExecutor.shutdown(); + + try + { + if (!writeExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS)) + { + HornetQJournalLogger.LOGGER.timeoutOnWriterShutdown(new Exception("trace")); + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + } + + public String getDirectory() + { + return journalDir; + } + + public void start() + { + if (timedBuffer != null) + { + timedBuffer.start(); + } + + if (isSupportsCallbacks()) + { + writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this), + true, + AbstractSequentialFileFactory.getThisClassLoader())); + } + + } + + @Override + public void onIOError(Exception exception, String message, SequentialFile file) + { + if (critialErrorListener != null) + { + critialErrorListener.onIOException(exception, message, file); + } + } + + @Override + public void activateBuffer(final SequentialFile file) + { + if (timedBuffer != null) + { + file.setTimedBuffer(timedBuffer); + } + } + + public void flush() + { + if (timedBuffer != null) + { + timedBuffer.flush(); + } + } + + public void deactivateBuffer() + { + if (timedBuffer != null) + { + // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer + timedBuffer.flush(); + timedBuffer.setObserver(null); + } + } + + public void releaseBuffer(final ByteBuffer buffer) + { + } + + /** + * Create the directory if it doesn't exist yet + */ + public void createDirs() throws Exception + { + File file = new File(journalDir); + boolean ok = file.mkdirs(); + if (!ok) + { + throw new IOException("Failed to create directory " + journalDir); + } + } + + public List<String> listFiles(final String extension) throws Exception + { + File dir = new File(journalDir); + + FilenameFilter fnf = new FilenameFilter() + { + public boolean accept(final File file, final String name) + { + return name.endsWith("." + extension); + } + }; + + String[] fileNames = dir.list(fnf); + + if (fileNames == null) + { + throw new IOException("Failed to list: " + journalDir); + } + + return Arrays.asList(fileNames); + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return AbstractSequentialFileFactory.class.getClassLoader(); + } + }); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/CompactJournal.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/CompactJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/CompactJournal.java new file mode 100644 index 0000000..849219a --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/CompactJournal.java @@ -0,0 +1,66 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import org.apache.activemq6.core.journal.IOCriticalErrorListener; + +/** + * This is an undocumented class, that will open a journal and force compacting on it. + * <p> + * It may be used under special cases, but it shouldn't be needed under regular circumstances as the + * system should detect the need for compacting. The regular use is to configure min-compact + * parameters. + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public final class CompactJournal // NO_UCD +{ + + public static void main(final String[] arg) + { + if (arg.length != 4) + { + System.err.println("Use: java -cp hornetq-core.jar org.apache.activemq6.core.journal.impl.CompactJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>"); + return; + } + + try + { + CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), null); + } + catch (Exception e) + { + e.printStackTrace(); + } + + } + + static void compactJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final IOCriticalErrorListener listener) throws Exception + { + NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener); + + JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + + journal.start(); + + journal.loadInternalOnly(); + + journal.compact(); + + journal.stop(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/DummyCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/DummyCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/DummyCallback.java new file mode 100644 index 0000000..c2eaabed --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/DummyCallback.java @@ -0,0 +1,51 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import org.apache.activemq6.journal.HornetQJournalLogger; + +/** + * A DummyCallback + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +class DummyCallback extends SyncIOCompletion +{ + private static final DummyCallback instance = new DummyCallback(); + + public static DummyCallback getInstance() + { + return DummyCallback.instance; + } + + public void done() + { + } + + public void onError(final int errorCode, final String errorMessage) + { + HornetQJournalLogger.LOGGER.errorWritingData(new Exception(errorMessage), errorMessage, errorCode); + } + + @Override + public void waitCompletion() throws Exception + { + } + + @Override + public void storeLineUp() + { + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ExportJournal.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ExportJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ExportJournal.java new file mode 100644 index 0000000..aaad661 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ExportJournal.java @@ -0,0 +1,206 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.List; + +import org.apache.activemq6.core.journal.RecordInfo; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.utils.Base64; + +/** + * Use this class to export the journal data. You can use it as a main class or through its native method {@link ExportJournal#exportJournal(String, String, String, int, int, String)} + * + * If you use the main method, use it as <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput> + * + * Example: java -cp hornetq-core.jar org.apache.activemq6.core.journal.impl.ExportJournal /journalDir hornetq-data hq 2 10485760 /tmp/export.dat + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * + * + */ +public class ExportJournal +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + public static void main(final String[] arg) + { + if (arg.length != 5) + { + System.err.println("Use: java -cp hornetq-core.jar org.apache.activemq6.core.journal.impl.ExportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>"); + return; + } + + try + { + ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]); + } + catch (Exception e) + { + e.printStackTrace(); + } + + } + + public static void exportJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final String fileOutput) throws Exception + { + + FileOutputStream fileOut = new FileOutputStream(new File(fileOutput)); + + BufferedOutputStream buffOut = new BufferedOutputStream(fileOut); + + PrintStream out = new PrintStream(buffOut); + + ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out); + + out.close(); + } + + public static void exportJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final PrintStream out) throws Exception + { + NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null); + + JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + + List<JournalFile> files = journal.orderFiles(); + + for (JournalFile file : files) + { + out.println("#File," + file); + + ExportJournal.exportJournalFile(out, nio, file); + } + } + + /** + * @param out + * @param fileFactory + * @param file + * @throws Exception + */ + public static void exportJournalFile(final PrintStream out, + final SequentialFileFactory fileFactory, + final JournalFile file) throws Exception + { + JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() + { + + public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + out.println("operation@UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo)); + } + + public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception + { + out.println("operation@Update," + ExportJournal.describeRecord(recordInfo)); + } + + public void onReadRollbackRecord(final long transactionID) throws Exception + { + out.println("operation@Rollback,txID@" + transactionID); + } + + public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception + { + out.println("operation@Prepare,txID@" + transactionID + + ",numberOfRecords@" + + numberOfRecords + + ",extraData@" + + ExportJournal.encode(extraData)); + } + + public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + out.println("operation@DeleteRecordTX,txID@" + transactionID + + "," + + ExportJournal.describeRecord(recordInfo)); + } + + public void onReadDeleteRecord(final long recordID) throws Exception + { + out.println("operation@DeleteRecord,id@" + recordID); + } + + public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception + { + out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords); + } + + public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + out.println("operation@AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo)); + } + + public void onReadAddRecord(final RecordInfo recordInfo) throws Exception + { + out.println("operation@AddRecord," + ExportJournal.describeRecord(recordInfo)); + } + + public void markAsDataFile(final JournalFile file) + { + } + }); + } + + private static String describeRecord(final RecordInfo recordInfo) + { + return "id@" + recordInfo.id + + ",userRecordType@" + + recordInfo.userRecordType + + ",length@" + + recordInfo.data.length + + ",isUpdate@" + + recordInfo.isUpdate + + ",compactCount@" + + recordInfo.compactCount + + ",data@" + + ExportJournal.encode(recordInfo.data); + } + + private static String encode(final byte[] data) + { + return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/FileWrapperJournal.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/FileWrapperJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/FileWrapperJournal.java new file mode 100644 index 0000000..c7eb8e0 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/FileWrapperJournal.java @@ -0,0 +1,337 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQUnsupportedPacketException; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.core.journal.IOCompletion; +import org.apache.activemq6.core.journal.Journal; +import org.apache.activemq6.core.journal.JournalLoadInformation; +import org.apache.activemq6.core.journal.LoaderCallback; +import org.apache.activemq6.core.journal.PreparedTransactionInfo; +import org.apache.activemq6.core.journal.RecordInfo; +import org.apache.activemq6.core.journal.SequentialFileFactory; +import org.apache.activemq6.core.journal.TransactionFailureCallback; +import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecord; +import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecordTX; +import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX; +import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX.TX_RECORD_TYPE; +import org.apache.activemq6.core.journal.impl.dataformat.JournalDeleteRecord; +import org.apache.activemq6.core.journal.impl.dataformat.JournalDeleteRecordTX; +import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord; + +/** + * Journal used at a replicating backup server during the synchronization of data with the 'live' + * server. It just wraps a single {@link JournalFile}. + * <p/> + * Its main purpose is to store the data as a Journal would, but without verifying records. + */ +public final class FileWrapperJournal extends JournalBase +{ + private final ReentrantLock lockAppend = new ReentrantLock(); + + private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<Long, AtomicInteger>(); + private final JournalImpl journal; + protected volatile JournalFile currentFile; + + /** + * @param journal + * @throws Exception + */ + public FileWrapperJournal(Journal journal) throws Exception + { + super(journal.getFileFactory().isSupportsCallbacks(), journal.getFileSize()); + this.journal = (JournalImpl)journal; + currentFile = this.journal.setUpCurrentFile(JournalImpl.SIZE_HEADER); + } + + @Override + public void start() throws Exception + { + throw new UnsupportedOperationException(); + } + + @Override + public void stop() throws Exception + { + if (currentFile.getFile().isOpen()) + currentFile.getFile().close(); + } + + @Override + public boolean isStarted() + { + throw new UnsupportedOperationException(); + } + + // ------------------------ + + // ------------------------ + + @Override + public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback) throws Exception + { + JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); + + writeRecord(addRecord, sync, callback); + } + + /** + * Write the record to the current file. + */ + private void writeRecord(JournalInternalRecord encoder, final boolean sync, final IOCompletion callback) throws Exception + { + + lockAppend.lock(); + try + { + if (callback != null) + { + callback.storeLineUp(); + } + currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize()); + encoder.setFileID(currentFile.getRecordID()); + + if (callback != null) + { + currentFile.getFile().write(encoder, sync, callback); + } + else + { + currentFile.getFile().write(encoder, sync); + } + } + finally + { + lockAppend.unlock(); + } + } + + @Override + public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception + { + JournalInternalRecord deleteRecord = new JournalDeleteRecord(id); + writeRecord(deleteRecord, sync, callback); + } + + @Override + public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception + { + count(txID); + JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record); + writeRecord(deleteRecordTX, false, null); + } + + @Override + public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception + { + count(txID); + JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + writeRecord(addRecord, false, null); + } + + @Override + public void + appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback) throws Exception + { + JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); + writeRecord(updateRecord, sync, callback); + } + + @Override + public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception + { + count(txID); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record); + writeRecord(updateRecordTX, false, null); + } + + @Override + public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception + { + JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); + AtomicInteger value = transactions.remove(Long.valueOf(txID)); + if (value != null) + { + commitRecord.setNumberOfRecords(value.get()); + } + + writeRecord(commitRecord, true, callback); + } + + @Override + public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception + { + JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); + AtomicInteger value = transactions.get(Long.valueOf(txID)); + if (value != null) + { + prepareRecord.setNumberOfRecords(value.get()); + } + writeRecord(prepareRecord, sync, callback); + } + + private int count(long txID) throws HornetQException + { + AtomicInteger defaultValue = new AtomicInteger(1); + AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue); + if (count != null) + { + return count.incrementAndGet(); + } + return defaultValue.get(); + } + + @Override + public String toString() + { + return FileWrapperJournal.class.getName() + "(currentFile=[" + currentFile + "], hash=" + super.toString() + ")"; + } + + // UNSUPPORTED STUFF + + @Override + public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception + { + throw new HornetQUnsupportedPacketException(); + } + + @Override + public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception + { + throw new HornetQUnsupportedPacketException(); + } + + @Override + public JournalLoadInformation loadInternalOnly() throws Exception + { + throw new HornetQUnsupportedPacketException(); + } + + @Override + public void lineUpContext(IOCompletion callback) + { + throw new UnsupportedOperationException(); + } + + @Override + public JournalLoadInformation load(List<RecordInfo> committedRecords, + List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception + { + throw new HornetQUnsupportedPacketException(); + } + + @Override + public int getAlignment() throws Exception + { + throw new HornetQUnsupportedPacketException(); + } + + @Override + public int getNumberOfRecords() + { + throw new UnsupportedOperationException(); + } + + @Override + public int getUserVersion() + { + throw new UnsupportedOperationException(); + } + + @Override + public void perfBlast(int pages) + { + throw new UnsupportedOperationException(); + } + + @Override + public void runDirectJournalBlast() throws Exception + { + throw new UnsupportedOperationException(); + } + + @Override + public JournalLoadInformation loadSyncOnly(JournalState state) throws Exception + { + throw new UnsupportedOperationException(); + } + + @Override + public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception + { + throw new UnsupportedOperationException(); + } + + @Override + public void synchronizationLock() + { + throw new UnsupportedOperationException(); + } + + @Override + public void synchronizationUnlock() + { + throw new UnsupportedOperationException(); + } + + @Override + public void forceMoveNextFile() + { + throw new UnsupportedOperationException(); + } + + @Override + public JournalFile[] getDataFiles() + { + throw new UnsupportedOperationException(); + } + + @Override + void scheduleReclaim() + { + // no-op + } + + @Override + public SequentialFileFactory getFileFactory() + { + throw new UnsupportedOperationException(); + } + + @Override + public void scheduleCompactAndBlock(int timeout) throws Exception + { + throw new UnsupportedOperationException(); + } + + @Override + public void replicationSyncPreserveOldFiles() + { + throw new UnsupportedOperationException(); + } + + @Override + public void replicationSyncFinished() + { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ImportJournal.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ImportJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ImportJournal.java new file mode 100644 index 0000000..314fb94 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ImportJournal.java @@ -0,0 +1,388 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq6.core.journal.RecordInfo; +import org.apache.activemq6.utils.Base64; + +/** + * Use this class to import the journal data from a listed file. You can use it as a main class or + * through its native method + * {@link ImportJournal#importJournal(String, String, String, int, int, String)} + * <p> + * If you use the main method, use its arguments as: + * + * <pre> + * JournalDirectory JournalPrefix FileExtension MinFiles FileSize FileOutput + * </pre> + * <p> + * Example: + * + * <pre> + * java -cp hornetq-core.jar org.apache.activemq6.core.journal.impl.ExportJournal /journalDir hornetq-data hq 2 10485760 /tmp/export.dat + * </pre> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public class ImportJournal +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + public static void main(final String[] arg) + { + if (arg.length != 5) + { + System.err.println("Use: java -cp hornetq-core.jar:netty.jar org.apache.activemq6.core.journal.impl.ImportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>"); + return; + } + + try + { + ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]); + } + catch (Exception e) + { + e.printStackTrace(); + } + + } + + public static void importJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final String fileInput) throws Exception + { + FileInputStream fileInputStream = new FileInputStream(new File(fileInput)); + ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream); + + } + + public static void importJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final InputStream stream) throws Exception + { + Reader reader = new InputStreamReader(stream); + ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader); + } + + public static void importJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final Reader reader) throws Exception + { + + File journalDir = new File(directory); + + if (!journalDir.exists()) + { + if (!journalDir.mkdirs()) + System.err.println("Could not create directory " + directory); + } + + NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null); + + JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + + if (journal.orderFiles().size() != 0) + { + throw new IllegalStateException("Import needs to create a brand new journal"); + } + + journal.start(); + + // The journal is empty, as we checked already. Calling load just to initialize the internal data + journal.loadInternalOnly(); + + BufferedReader buffReader = new BufferedReader(reader); + + String line; + + HashMap<Long, AtomicInteger> txCounters = new HashMap<Long, AtomicInteger>(); + + long lineNumber = 0; + + Map<Long, JournalRecord> journalRecords = journal.getRecords(); + + while ((line = buffReader.readLine()) != null) + { + lineNumber++; + String[] splitLine = line.split(","); + if (splitLine[0].equals("#File")) + { + txCounters.clear(); + continue; + } + + Properties lineProperties = ImportJournal.parseLine(splitLine); + + String operation = null; + try + { + operation = lineProperties.getProperty("operation"); + + if (operation.equals("AddRecord")) + { + RecordInfo info = ImportJournal.parseRecord(lineProperties); + journal.appendAddRecord(info.id, info.userRecordType, info.data, false); + } + else if (operation.equals("AddRecordTX")) + { + long txID = ImportJournal.parseLong("txID", lineProperties); + AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); + counter.incrementAndGet(); + RecordInfo info = ImportJournal.parseRecord(lineProperties); + journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); + } + else if (operation.equals("AddRecordTX")) + { + long txID = ImportJournal.parseLong("txID", lineProperties); + AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); + counter.incrementAndGet(); + RecordInfo info = ImportJournal.parseRecord(lineProperties); + journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); + } + else if (operation.equals("UpdateTX")) + { + long txID = ImportJournal.parseLong("txID", lineProperties); + AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); + counter.incrementAndGet(); + RecordInfo info = ImportJournal.parseRecord(lineProperties); + journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data); + } + else if (operation.equals("Update")) + { + RecordInfo info = ImportJournal.parseRecord(lineProperties); + journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false); + } + else if (operation.equals("DeleteRecord")) + { + long id = ImportJournal.parseLong("id", lineProperties); + + // If not found it means the append/update records were reclaimed already + if (journalRecords.get(id) != null) + { + journal.appendDeleteRecord(id, false); + } + } + else if (operation.equals("DeleteRecordTX")) + { + long txID = ImportJournal.parseLong("txID", lineProperties); + long id = ImportJournal.parseLong("id", lineProperties); + AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); + counter.incrementAndGet(); + + // If not found it means the append/update records were reclaimed already + if (journalRecords.get(id) != null) + { + journal.appendDeleteRecordTransactional(txID, id); + } + } + else if (operation.equals("Prepare")) + { + long txID = ImportJournal.parseLong("txID", lineProperties); + int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties); + AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); + byte[] data = ImportJournal.parseEncoding("extraData", lineProperties); + + if (counter.get() == numberOfRecords) + { + journal.appendPrepareRecord(txID, data, false); + } + else + { + System.err.println("Transaction " + txID + + " at line " + + lineNumber + + " is incomplete. The prepare record expected " + + numberOfRecords + + " while the import only had " + + counter); + } + } + else if (operation.equals("Commit")) + { + long txID = ImportJournal.parseLong("txID", lineProperties); + int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties); + AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); + if (counter.get() == numberOfRecords) + { + journal.appendCommitRecord(txID, false); + } + else + { + System.err.println("Transaction " + txID + + " at line " + + lineNumber + + " is incomplete. The commit record expected " + + numberOfRecords + + " while the import only had " + + counter); + } + } + else if (operation.equals("Rollback")) + { + long txID = ImportJournal.parseLong("txID", lineProperties); + journal.appendRollbackRecord(txID, false); + } + else + { + System.err.println("Invalid opeartion " + operation + " at line " + lineNumber); + } + } + catch (Exception ex) + { + System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage()); + } + } + + journal.stop(); + } + + protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters) + { + + AtomicInteger counter = txCounters.get(txID); + if (counter == null) + { + counter = new AtomicInteger(0); + txCounters.put(txID, counter); + } + + return counter; + } + + protected static RecordInfo parseRecord(final Properties properties) throws Exception + { + long id = ImportJournal.parseLong("id", properties); + byte userRecordType = ImportJournal.parseByte("userRecordType", properties); + boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties); + byte[] data = ImportJournal.parseEncoding("data", properties); + return new RecordInfo(id, userRecordType, data, isUpdate, (short)0); + } + + private static byte[] parseEncoding(final String name, final Properties properties) throws Exception + { + String value = ImportJournal.parseString(name, properties); + + return ImportJournal.decode(value); + } + + /** + * @param properties + * @return + */ + private static int parseInt(final String name, final Properties properties) throws Exception + { + String value = ImportJournal.parseString(name, properties); + + return Integer.parseInt(value); + } + + private static long parseLong(final String name, final Properties properties) throws Exception + { + String value = ImportJournal.parseString(name, properties); + + return Long.parseLong(value); + } + + private static boolean parseBoolean(final String name, final Properties properties) throws Exception + { + String value = ImportJournal.parseString(name, properties); + + return Boolean.parseBoolean(value); + } + + private static byte parseByte(final String name, final Properties properties) throws Exception + { + String value = ImportJournal.parseString(name, properties); + + return Byte.parseByte(value); + } + + /** + * @param name + * @param properties + * @return + * @throws Exception + */ + private static String parseString(final String name, final Properties properties) throws Exception + { + String value = properties.getProperty(name); + + if (value == null) + { + throw new Exception("property " + name + " not found"); + } + return value; + } + + protected static Properties parseLine(final String[] splitLine) + { + Properties properties = new Properties(); + + for (String el : splitLine) + { + String[] tuple = el.split("@"); + if (tuple.length == 2) + { + properties.put(tuple[0], tuple[1]); + } + else + { + properties.put(tuple[0], tuple[0]); + } + } + + return properties; + } + + private static byte[] decode(final String data) + { + return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalBase.java ---------------------------------------------------------------------- diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalBase.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalBase.java new file mode 100644 index 0000000..04e5a22 --- /dev/null +++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalBase.java @@ -0,0 +1,217 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.journal.impl; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.journal.EncodingSupport; +import org.apache.activemq6.core.journal.IOCompletion; +import org.apache.activemq6.core.journal.Journal; +import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding; + +abstract class JournalBase implements Journal +{ + + protected final int fileSize; + private final boolean supportsCallback; + + public JournalBase(boolean supportsCallback, int fileSize) + { + if (fileSize < JournalImpl.MIN_FILE_SIZE) + { + throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes"); + } + this.supportsCallback = supportsCallback; + this.fileSize = fileSize; + } + + public abstract void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, + final boolean sync, final IOCompletion callback) throws Exception; + + public abstract void appendAddRecordTransactional(final long txID, final long id, final byte recordType, + final EncodingSupport record) throws Exception; + + public abstract void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, + boolean lineUpContext) throws Exception; + + public abstract void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception; + + public abstract void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception; + + public abstract void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync, + final IOCompletion callback) throws Exception; + + public abstract void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, + final boolean sync, final IOCompletion callback) throws Exception; + + public abstract void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, + final EncodingSupport record) throws Exception; + + public abstract void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception; + + + public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception + { + appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync); + } + + public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception + { + SyncIOCompletion callback = getSyncCallback(sync); + + appendAddRecord(id, recordType, record, sync, callback); + + if (callback != null) + { + callback.waitCompletion(); + } + } + + public void appendCommitRecord(final long txID, final boolean sync) throws Exception + { + SyncIOCompletion syncCompletion = getSyncCallback(sync); + + appendCommitRecord(txID, sync, syncCompletion, true); + + if (syncCompletion != null) + { + syncCompletion.waitCompletion(); + } + } + + public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception + { + appendCommitRecord(txID, sync, callback, true); + } + + public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception + { + appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync); + } + + public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, + final byte[] record) throws Exception + { + appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record)); + } + + public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception + { + appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record)); + } + + public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception + { + appendDeleteRecordTransactional(txID, id, NullEncoding.instance); + } + + public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception + { + appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync); + } + + public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception + { + SyncIOCompletion syncCompletion = getSyncCallback(sync); + + appendPrepareRecord(txID, transactionData, sync, syncCompletion); + + if (syncCompletion != null) + { + syncCompletion.waitCompletion(); + } + } + + public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception + { + appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record)); + } + + public void + appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception + { + SyncIOCompletion callback = getSyncCallback(sync); + + appendUpdateRecord(id, recordType, record, sync, callback); + + if (callback != null) + { + callback.waitCompletion(); + } + } + + public void appendRollbackRecord(final long txID, final boolean sync) throws Exception + { + SyncIOCompletion syncCompletion = getSyncCallback(sync); + + appendRollbackRecord(txID, sync, syncCompletion); + + if (syncCompletion != null) + { + syncCompletion.waitCompletion(); + } + + } + + public void appendDeleteRecord(final long id, final boolean sync) throws Exception + { + SyncIOCompletion callback = getSyncCallback(sync); + + appendDeleteRecord(id, sync, callback); + + if (callback != null) + { + callback.waitCompletion(); + } + } + + abstract void scheduleReclaim(); + + protected SyncIOCompletion getSyncCallback(final boolean sync) + { + if (supportsCallback) + { + if (sync) + { + return new SimpleWaitIOCallback(); + } + return DummyCallback.getInstance(); + } + return null; + } + + private static final class NullEncoding implements EncodingSupport + { + + private static NullEncoding instance = new NullEncoding(); + + public void decode(final HornetQBuffer buffer) + { + // no-op + } + + public void encode(final HornetQBuffer buffer) + { + // no-op + } + + public int getEncodeSize() + { + return 0; + } + } + + public int getFileSize() + { + return fileSize; + } +}
