http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFile.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFile.java new file mode 100644 index 0000000..ed2afa9 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFile.java @@ -0,0 +1,591 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.store.config.FlushDiskType; +import com.alibaba.rocketmq.store.util.LibC; +import com.sun.jna.NativeLong; +import com.sun.jna.Pointer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.nio.ch.DirectBuffer; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * @author shijia.wxr + */ +public class MappedFile extends ReferenceResource { + public static final int OS_PAGE_SIZE = 1024 * 4; + protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + + private static final AtomicLong TOTAL_MAPED_VITUAL_MEMORY = new AtomicLong(0); + + private static final AtomicInteger TOTAL_MAPED_FILES = new AtomicInteger(0); + + private String fileName; + + private long fileFromOffset; + + protected int fileSize; + + private File file; + + private MappedByteBuffer mappedByteBuffer; + + protected final AtomicInteger wrotePosition = new AtomicInteger(0); + + private final AtomicInteger flushedPosition = new AtomicInteger(0); + //ADD BY ChenYang + protected final AtomicInteger committedPosition = new AtomicInteger(0); + + + protected FileChannel fileChannel; + + private volatile long storeTimestamp = 0; + private boolean firstCreateInQueue = false; + + /** + * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. + */ + protected ByteBuffer writeBuffer = null; + protected TransientStorePool transientStorePool = null; + + public MappedFile() { + } + + public MappedFile(final String fileName, final int fileSize) throws IOException { + init(fileName, fileSize); + } + + public MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { + init(fileName, fileSize, transientStorePool); + } + + public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { + init(fileName, fileSize); + this.writeBuffer = transientStorePool.borrowBuffer(); + this.transientStorePool = transientStorePool; + } + + private void init(final String fileName, final int fileSize) throws IOException { + this.fileName = fileName; + this.fileSize = fileSize; + this.file = new File(fileName); + this.fileFromOffset = Long.parseLong(this.file.getName()); + boolean ok = false; + + ensureDirOK(this.file.getParent()); + + try { + this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); + this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); + TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize); + TOTAL_MAPED_FILES.incrementAndGet(); + ok = true; + } catch (FileNotFoundException e) { + log.error("create file channel " + this.fileName + " Failed. ", e); + throw e; + } catch (IOException e) { + log.error("map file " + this.fileName + " Failed. ", e); + throw e; + } finally { + if (!ok && this.fileChannel != null) { + this.fileChannel.close(); + } + } + } + + + public static void ensureDirOK(final String dirName) { + if (dirName != null) { + File f = new File(dirName); + if (!f.exists()) { + boolean result = f.mkdirs(); + log.info(dirName + " mkdir " + (result ? "OK" : "Failed")); + } + } + } + + + public static void clean(final ByteBuffer buffer) { + if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) + return; + invoke(invoke(viewed(buffer), "cleaner"), "clean"); + } + + + private static Object invoke(final Object target, final String methodName, final Class<?>... args) { + return AccessController.doPrivileged(new PrivilegedAction<Object>() { + public Object run() { + try { + Method method = method(target, methodName, args); + method.setAccessible(true); + return method.invoke(target); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + }); + } + + + private static Method method(Object target, String methodName, Class<?>[] args) + throws NoSuchMethodException { + try { + return target.getClass().getMethod(methodName, args); + } catch (NoSuchMethodException e) { + return target.getClass().getDeclaredMethod(methodName, args); + } + } + + + private static ByteBuffer viewed(ByteBuffer buffer) { + String methodName = "viewedBuffer"; + + + Method[] methods = buffer.getClass().getMethods(); + for (int i = 0; i < methods.length; i++) { + if (methods[i].getName().equals("attachment")) { + methodName = "attachment"; + break; + } + } + + ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName); + if (viewedBuffer == null) + return buffer; + else + return viewed(viewedBuffer); + } + + + public static int getTotalmapedfiles() { + return TOTAL_MAPED_FILES.get(); + } + + + public static long getTotalMapedVitualMemory() { + return TOTAL_MAPED_VITUAL_MEMORY.get(); + } + + + public long getLastModifiedTimestamp() { + return this.file.lastModified(); + } + + public int getFileSize() { + return fileSize; + } + + public FileChannel getFileChannel() { + return fileChannel; + } + + public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { + assert msg != null; + assert cb != null; + + int currentPos = this.wrotePosition.get(); + + + if (currentPos < this.fileSize) { + ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); + byteBuffer.position(currentPos); + AppendMessageResult result = + cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); + this.wrotePosition.addAndGet(result.getWroteBytes()); + this.storeTimestamp = result.getStoreTimestamp(); + return result; + } + + + log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: " + + this.fileSize); + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } + + /** + + */ + public long getFileFromOffset() { + return this.fileFromOffset; + } + + /** + + * + + */ + public boolean appendMessage(final byte[] data) { + int currentPos = this.wrotePosition.get(); + + + if ((currentPos + data.length) <= this.fileSize) { + try { + this.fileChannel.position(currentPos); + this.fileChannel.write(ByteBuffer.wrap(data)); + } catch (Throwable e) { + log.error("Error occurred when append message to mappedFile.", e); + } + this.wrotePosition.addAndGet(data.length); + return true; + } + + return false; + } + + /** + + * + * @param flushLeastPages + + * + * @return The current flushed position + */ + public int flush(final int flushLeastPages) { + if (this.isAbleToFlush(flushLeastPages)) { + if (this.hold()) { + int value = getReadPosition(); + + try { + //We only append data to fileChannel or mappedByteBuffer, never both. + if (writeBuffer != null || this.fileChannel.position() != 0) { + this.fileChannel.force(false); + } else { + this.mappedByteBuffer.force(); + } + } catch (Throwable e) { + log.error("Error occurred when force data to disk.", e); + } + + this.flushedPosition.set(value); + this.release(); + } else { + log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); + this.flushedPosition.set(getReadPosition()); + } + } + return this.getFlushedPosition(); + } + + public int commit(final int commitLeastPages) { + if (writeBuffer == null) { + //no need to commit data to file channel, so just regard wrotePosition as committedPosition. + return this.wrotePosition.get(); + } + if (this.isAbleToCommit(commitLeastPages)) { + if (this.hold()) { + commit0(commitLeastPages); + this.release(); + } else { + log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); + } + } + + // All dirty data has been committed to FileChannel. + if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { + this.transientStorePool.returnBuffer(writeBuffer); + this.writeBuffer = null; + } + + return this.committedPosition.get(); + } + + protected void commit0(final int commitLeastPages) { + int writePos = this.wrotePosition.get(); + int lastCommittedPosition = this.committedPosition.get(); + + if (writePos - this.committedPosition.get() > 0) { + try { + ByteBuffer byteBuffer = writeBuffer.slice(); + byteBuffer.position(lastCommittedPosition); + byteBuffer.limit(writePos); + this.fileChannel.position(lastCommittedPosition); + this.fileChannel.write(byteBuffer); + this.committedPosition.set(writePos); + } catch (Throwable e) { + log.error("Error occurred when commit data to FileChannel.", e); + } + } + } + + private boolean isAbleToFlush(final int flushLeastPages) { + int flush = this.flushedPosition.get(); + int write = getReadPosition(); + + if (this.isFull()) { + return true; + } + + if (flushLeastPages > 0) { + return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; + } + + return write > flush; + } + + protected boolean isAbleToCommit(final int commitLeastPages) { + int flush = this.committedPosition.get(); + int write = this.wrotePosition.get(); + + if (this.isFull()) { + return true; + } + + if (commitLeastPages > 0) { + return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; + } + + return write > flush; + } + + public int getFlushedPosition() { + return flushedPosition.get(); + } + + + public void setFlushedPosition(int pos) { + this.flushedPosition.set(pos); + } + + + public boolean isFull() { + return this.fileSize == this.wrotePosition.get(); + } + + public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { + int readPosition = getReadPosition(); + if ((pos + size) <= readPosition) { + + if (this.hold()) { + ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); + byteBuffer.position(pos); + ByteBuffer byteBufferNew = byteBuffer.slice(); + byteBufferNew.limit(size); + return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); + } else { + log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " + + this.fileFromOffset); + } + } else { + log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size + + ", fileFromOffset: " + this.fileFromOffset); + } + + + return null; + } + + /** + + */ + public SelectMappedBufferResult selectMappedBuffer(int pos) { + int readPosition = getReadPosition(); + if (pos < readPosition && pos >= 0) { + if (this.hold()) { + ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); + byteBuffer.position(pos); + int size = readPosition - pos; + ByteBuffer byteBufferNew = byteBuffer.slice(); + byteBufferNew.limit(size); + return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); + } + } + + + return null; + } + + @Override + public boolean cleanup(final long currentRef) { + if (this.isAvailable()) { + log.error("this file[REF:" + currentRef + "] " + this.fileName + + " have not shutdown, stop unmaping."); + return false; + } + + if (this.isCleanupOver()) { + log.error("this file[REF:" + currentRef + "] " + this.fileName + + " have cleanup, do not do it again."); + return true; + } + + clean(this.mappedByteBuffer); + TOTAL_MAPED_VITUAL_MEMORY.addAndGet(this.fileSize * (-1)); + TOTAL_MAPED_FILES.decrementAndGet(); + log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK"); + return true; + } + + public boolean destroy(final long intervalForcibly) { + this.shutdown(intervalForcibly); + + if (this.isCleanupOver()) { + try { + this.fileChannel.close(); + log.info("close file channel " + this.fileName + " OK"); + + long beginTime = System.currentTimeMillis(); + boolean result = this.file.delete(); + log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + + this.getFlushedPosition() + ", " + + UtilAll.computeEclipseTimeMilliseconds(beginTime)); + } catch (Exception e) { + log.warn("close file channel " + this.fileName + " Failed. ", e); + } + + return true; + } else { + log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName + + " Failed. cleanupOver: " + this.cleanupOver); + } + + return false; + } + + public int getWrotePosition() { + return wrotePosition.get(); + } + + /** + * + * @return The max position which have valid data + */ + public int getReadPosition() { + return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); + } + + public void setWrotePosition(int pos) { + this.wrotePosition.set(pos); + } + + public void setCommittedPosition(int pos) { + this.committedPosition.set(pos); + } + + public void warmMappedFile(FlushDiskType type, int pages) { + long beginTime = System.currentTimeMillis(); + ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); + int flush = 0; + long time = System.currentTimeMillis(); + for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) { + byteBuffer.put(i, (byte) 0); + // force flush when flush disk type is sync + if (type == FlushDiskType.SYNC_FLUSH) { + if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { + flush = i; + mappedByteBuffer.force(); + } + } + + // prevent gc + if (j % 1000 == 0) { + log.info("j={}, costTime={}", j, System.currentTimeMillis() - time); + time = System.currentTimeMillis(); + try { + Thread.sleep(0); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + // force flush when prepare load finished + if (type == FlushDiskType.SYNC_FLUSH) { + log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}", + this.getFileName(), System.currentTimeMillis() - beginTime); + mappedByteBuffer.force(); + } + log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(), + System.currentTimeMillis() - beginTime); + + this.mlock(); + } + + public String getFileName() { + return fileName; + } + + public MappedByteBuffer getMappedByteBuffer() { + return mappedByteBuffer; + } + + public ByteBuffer sliceByteBuffer() { + return this.mappedByteBuffer.slice(); + } + + + public long getStoreTimestamp() { + return storeTimestamp; + } + + + public boolean isFirstCreateInQueue() { + return firstCreateInQueue; + } + + + public void setFirstCreateInQueue(boolean firstCreateInQueue) { + this.firstCreateInQueue = firstCreateInQueue; + } + + + public void mlock() { + final long beginTime = System.currentTimeMillis(); + final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); + Pointer pointer = new Pointer(address); + { + int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize)); + log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); + } + + { + int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED); + log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); + } + } + + public void munlock() { + final long beginTime = System.currentTimeMillis(); + final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); + Pointer pointer = new Pointer(address); + int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize)); + log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); + } + + @Override + public String toString() { + return this.fileName; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java new file mode 100644 index 0000000..2b006c0 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java @@ -0,0 +1,606 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; + + +/** + * @author shijia.wxr + */ +public class MappedFileQueue { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); + + private static final int DELETE_FILES_BATCH_MAX = 10; + + private final String storePath; + + private final int mappedFileSize; + + private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>(); + + private final AllocateMappedFileService allocateMappedFileService; + + private long flushedWhere = 0; + private long committedWhere = 0; + + private volatile long storeTimestamp = 0; + + + public MappedFileQueue(final String storePath, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService) { + this.storePath = storePath; + this.mappedFileSize = mappedFileSize; + this.allocateMappedFileService = allocateMappedFileService; + } + + + public void checkSelf() { + + if (!this.mappedFiles.isEmpty()) { + Iterator<MappedFile> iterator = mappedFiles.iterator(); + MappedFile pre = null; + while (iterator.hasNext()) { + MappedFile cur = iterator.next(); + + if (pre != null) { + if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) { + LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}", + pre.getFileName(), cur.getFileName()); + } + } + pre = cur; + } + } + } + + + public MappedFile getMappedFileByTime(final long timestamp) { + Object[] mfs = this.copyMappedFiles(0); + + if (null == mfs) + return null; + + for (int i = 0; i < mfs.length; i++) { + MappedFile mappedFile = (MappedFile) mfs[i]; + if (mappedFile.getLastModifiedTimestamp() >= timestamp) { + return mappedFile; + } + } + + return (MappedFile) mfs[mfs.length - 1]; + } + + + private Object[] copyMappedFiles(final int reservedMappedFiles) { + Object[] mfs; + + if (this.mappedFiles.size() <= reservedMappedFiles) { + return null; + } + + mfs = this.mappedFiles.toArray(); + return mfs; + } + + + public void truncateDirtyFiles(long offset) { + List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>(); + + for (MappedFile file : this.mappedFiles) { + long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize; + if (fileTailOffset > offset) { + if (offset >= file.getFileFromOffset()) { + file.setWrotePosition((int) (offset % this.mappedFileSize)); + file.setCommittedPosition((int) (offset % this.mappedFileSize)); + file.setFlushedPosition((int) (offset % this.mappedFileSize)); + } else { + file.destroy(1000); + willRemoveFiles.add(file); + } + } + } + + this.deleteExpiredFile(willRemoveFiles); + } + + + private void deleteExpiredFile(List<MappedFile> files) { + + if (!files.isEmpty()) { + + Iterator<MappedFile> iterator = files.iterator(); + while (iterator.hasNext()) { + MappedFile cur = iterator.next(); + if (!this.mappedFiles.contains(cur)) { + iterator.remove(); + log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName()); + } + } + + try { + if (!this.mappedFiles.removeAll(files)) { + log.error("deleteExpiredFile remove failed."); + } + } catch (Exception e) { + log.error("deleteExpiredFile has exception.", e); + } + } + } + + + public boolean load() { + File dir = new File(this.storePath); + File[] files = dir.listFiles(); + if (files != null) { + // ascending order + Arrays.sort(files); + for (File file : files) { + + if (file.length() != this.mappedFileSize) { + log.warn(file + "\t" + file.length() + + " length not matched message store config value, ignore it"); + return true; + } + + + try { + MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); + + mappedFile.setWrotePosition(this.mappedFileSize); + mappedFile.setFlushedPosition(this.mappedFileSize); + mappedFile.setCommittedPosition(this.mappedFileSize); + this.mappedFiles.add(mappedFile); + log.info("load " + file.getPath() + " OK"); + } catch (IOException e) { + log.error("load file " + file + " error", e); + return false; + } + } + } + + return true; + } + + + public long howMuchFallBehind() { + if (this.mappedFiles.isEmpty()) + return 0; + + long committed = this.flushedWhere; + if (committed != 0) { + MappedFile mappedFile = this.getLastMappedFile(0, false); + if (mappedFile != null) { + return (mappedFile.getFileFromOffset() + mappedFile.getWrotePosition()) - committed; + } + } + + return 0; + } + + + public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) { + long createOffset = -1; + MappedFile mappedFileLast = getLastMappedFile(); + + if (mappedFileLast == null) { + createOffset = startOffset - (startOffset % this.mappedFileSize); + } + + if (mappedFileLast != null && mappedFileLast.isFull()) { + createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize; + } + + if (createOffset != -1 && needCreate) { + String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = this.storePath + File.separator + + UtilAll.offset2FileName(createOffset + this.mappedFileSize); + MappedFile mappedFile = null; + + if (this.allocateMappedFileService != null) { + mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, + nextNextFilePath, this.mappedFileSize); + } else { + try { + mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); + } catch (IOException e) { + log.error("create mappedFile exception", e); + } + } + + if (mappedFile != null) { + if (this.mappedFiles.isEmpty()) { + mappedFile.setFirstCreateInQueue(true); + } + this.mappedFiles.add(mappedFile); + } + + return mappedFile; + } + + return mappedFileLast; + } + + public MappedFile getLastMappedFile(final long startOffset) { + return getLastMappedFile(startOffset, true); + } + + public MappedFile getLastMappedFile() { + MappedFile mappedFileLast = null; + + while (!this.mappedFiles.isEmpty()) { + try { + mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1); + break; + } catch (IndexOutOfBoundsException e) { + //continue; + } catch (Exception e) { + log.error("getLastMappedFile has exception.", e); + break; + } + } + + return mappedFileLast; + } + + public boolean resetOffset(long offset) { + MappedFile mappedFileLast = getLastMappedFile(); + + if (mappedFileLast != null) { + long lastOffset = mappedFileLast.getFileFromOffset() + + mappedFileLast.getWrotePosition(); + long diff = lastOffset - offset; + + final int maxDiff = this.mappedFileSize * 2; + if (diff > maxDiff) return false; + } + + ListIterator<MappedFile> iterator = this.mappedFiles.listIterator(); + + while (iterator.hasPrevious()) { + mappedFileLast = iterator.previous(); + if (offset >= mappedFileLast.getFileFromOffset()) { + int where = (int) (offset % mappedFileLast.getFileSize()); + mappedFileLast.setFlushedPosition(where); + mappedFileLast.setWrotePosition(where); + mappedFileLast.setCommittedPosition(where); + break; + } else { + iterator.remove(); + } + } + return true; + } + + public long getMinOffset() { + + if (!this.mappedFiles.isEmpty()) { + try { + return this.mappedFiles.get(0).getFileFromOffset(); + } catch (IndexOutOfBoundsException e) { + //continue; + } catch (Exception e) { + log.error("getMinOffset has exception.", e); + } + } + return -1; + } + + + public long getMaxOffset() { + MappedFile mappedFile = getLastMappedFile(); + if (mappedFile != null) { + return mappedFile.getFileFromOffset() + mappedFile.getReadPosition(); + } + return 0; + } + + public long getMaxWrotePosition() { + MappedFile mappedFile = getLastMappedFile(); + if (mappedFile != null) { + return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); + } + return 0; + } + + public long remainHowManyDataToCommit() { + return getMaxWrotePosition() - committedWhere; + } + + public long remainHowManyDataToFlush() { + return getMaxOffset() - flushedWhere; + } + + public void deleteLastMappedFile() { + MappedFile lastMappedFile = getLastMappedFile(); + if (lastMappedFile != null) { + lastMappedFile.destroy(1000); + this.mappedFiles.remove(lastMappedFile); + log.info("on recover, destroy a logic mapped file " + lastMappedFile.getFileName()); + + } + } + + public int deleteExpiredFileByTime(final long expiredTime, + final int deleteFilesInterval, + final long intervalForcibly, + final boolean cleanImmediately) { + Object[] mfs = this.copyMappedFiles(0); + + if (null == mfs) + return 0; + + int mfsLength = mfs.length - 1; + int deleteCount = 0; + List<MappedFile> files = new ArrayList<MappedFile>(); + if (null != mfs) { + for (int i = 0; i < mfsLength; i++) { + MappedFile mappedFile = (MappedFile) mfs[i]; + long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; + if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { + if (mappedFile.destroy(intervalForcibly)) { + files.add(mappedFile); + deleteCount++; + + if (files.size() >= DELETE_FILES_BATCH_MAX) { + break; + } + + if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { + try { + Thread.sleep(deleteFilesInterval); + } catch (InterruptedException e) { + } + } + } else { + break; + } + } + } + } + + deleteExpiredFile(files); + + return deleteCount; + } + + + public int deleteExpiredFileByOffset(long offset, int unitSize) { + Object[] mfs = this.copyMappedFiles(0); + + List<MappedFile> files = new ArrayList<MappedFile>(); + int deleteCount = 0; + if (null != mfs) { + + int mfsLength = mfs.length - 1; + + for (int i = 0; i < mfsLength; i++) { + boolean destroy; + MappedFile mappedFile = (MappedFile) mfs[i]; + SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize); + if (result != null) { + long maxOffsetInLogicQueue = result.getByteBuffer().getLong(); + result.release(); + destroy = maxOffsetInLogicQueue < offset; + if (destroy) { + log.info("physic min offset " + offset + ", logics in current mappedFile max offset " + + maxOffsetInLogicQueue + ", delete it"); + } + } else { + log.warn("this being not executed forever."); + break; + } + + if (destroy && mappedFile.destroy(1000 * 60)) { + files.add(mappedFile); + deleteCount++; + } else { + break; + } + } + } + + deleteExpiredFile(files); + + return deleteCount; + } + + + public boolean flush(final int flushLeastPages) { + boolean result = true; + MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false); + if (mappedFile != null) { + long tmpTimeStamp = mappedFile.getStoreTimestamp(); + int offset = mappedFile.flush(flushLeastPages); + long where = mappedFile.getFileFromOffset() + offset; + result = where == this.flushedWhere; + this.flushedWhere = where; + if (0 == flushLeastPages) { + this.storeTimestamp = tmpTimeStamp; + } + } + + return result; + } + + public boolean commit(final int commitLeastPages) { + boolean result = true; + MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false); + if (mappedFile != null) { + int offset = mappedFile.commit(commitLeastPages); + long where = mappedFile.getFileFromOffset() + offset; + result = where == this.committedWhere; + this.committedWhere = where; + } + + return result; + } + + + public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { + try { + MappedFile mappedFile = this.getFirstMappedFile(); + if (mappedFile != null) { + int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); + if (index < 0 || index >= this.mappedFiles.size()) { + LOG_ERROR.warn("findMappedFileByOffset offset not matched, request Offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}, StackTrace: {}", + offset, + index, + this.mappedFileSize, + this.mappedFiles.size(), + UtilAll.currentStackTrace()); + } + + try { + return this.mappedFiles.get(index); + } catch (Exception e) { + if (returnFirstOnNotFound) { + return mappedFile; + } + } + } + } catch (Exception e) { + log.error("findMappedFileByOffset Exception", e); + } + + return null; + } + + + public MappedFile getFirstMappedFile() { + MappedFile mappedFileFirst = null; + + if (!this.mappedFiles.isEmpty()) { + try { + mappedFileFirst = this.mappedFiles.get(0); + } catch (IndexOutOfBoundsException e) { + //ignore + } catch (Exception e) { + log.error("getFirstMappedFile has exception.", e); + } + } + + return mappedFileFirst; + } + + public MappedFile findMappedFileByOffset(final long offset) { + return findMappedFileByOffset(offset, false); + } + + + public long getMappedMemorySize() { + long size = 0; + + Object[] mfs = this.copyMappedFiles(0); + if (mfs != null) { + for (Object mf : mfs) { + if (((ReferenceResource) mf).isAvailable()) { + size += this.mappedFileSize; + } + } + } + + return size; + } + + + public boolean retryDeleteFirstFile(final long intervalForcibly) { + MappedFile mappedFile = this.getFirstMappedFile(); + if (mappedFile != null) { + if (!mappedFile.isAvailable()) { + log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName()); + boolean result = mappedFile.destroy(intervalForcibly); + if (result) { + log.info("the mappedFile re delete OK, " + mappedFile.getFileName()); + List<MappedFile> tmpFiles = new ArrayList<MappedFile>(); + tmpFiles.add(mappedFile); + this.deleteExpiredFile(tmpFiles); + } else { + log.warn("the mappedFile re delete failed, " + mappedFile.getFileName()); + } + + return result; + } + } + + return false; + } + + + public void shutdown(final long intervalForcibly) { + for (MappedFile mf : this.mappedFiles) { + mf.shutdown(intervalForcibly); + } + } + + + public void destroy() { + for (MappedFile mf : this.mappedFiles) { + mf.destroy(1000 * 3); + } + this.mappedFiles.clear(); + this.flushedWhere = 0; + + // delete parent directory + File file = new File(storePath); + if (file.isDirectory()) { + file.delete(); + } + } + + + public long getFlushedWhere() { + return flushedWhere; + } + + + public void setFlushedWhere(long flushedWhere) { + this.flushedWhere = flushedWhere; + } + + + public long getStoreTimestamp() { + return storeTimestamp; + } + + + public List<MappedFile> getMappedFiles() { + return mappedFiles; + } + + + public int getMappedFileSize() { + return mappedFileSize; + } + + public long getCommittedWhere() { + return committedWhere; + } + + public void setCommittedWhere(final long committedWhere) { + this.committedWhere = committedWhere; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageArrivingListener.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageArrivingListener.java new file mode 100644 index 0000000..6d227e8 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageArrivingListener.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.store; + +public interface MessageArrivingListener { + void arriving(String topic, int queueId, long logicOffset, long tagsCode); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageExtBrokerInner.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageExtBrokerInner.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageExtBrokerInner.java new file mode 100644 index 0000000..1c54956 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageExtBrokerInner.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.TopicFilterType; +import com.alibaba.rocketmq.common.message.MessageExt; + + +/** + * @author shijia.wxr + */ +public class MessageExtBrokerInner extends MessageExt { + private static final long serialVersionUID = 7256001576878700634L; + private String propertiesString; + private long tagsCode; + + public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { + if (null == tags || tags.length() == 0) + return 0; + + return tags.hashCode(); + } + + + public String getPropertiesString() { + return propertiesString; + } + + + public void setPropertiesString(String propertiesString) { + this.propertiesString = propertiesString; + } + + + public long getTagsCode() { + return tagsCode; + } + + + public void setTagsCode(long tagsCode) { + this.tagsCode = tagsCode; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageFilter.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageFilter.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageFilter.java new file mode 100644 index 0000000..a10e607 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageFilter.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; + + +/** + * @author shijia.wxr + */ +public interface MessageFilter { + boolean isMessageMatched(final SubscriptionData subscriptionData, final Long tagsCode); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java new file mode 100644 index 0000000..30f7bf7 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MessageStore.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.util.HashMap; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public interface MessageStore { + + boolean load(); + + + void start() throws Exception; + + + void shutdown(); + + + void destroy(); + + PutMessageResult putMessage(final MessageExtBrokerInner msg); + + + GetMessageResult getMessage(final String group, final String topic, final int queueId, + final long offset, final int maxMsgNums, final SubscriptionData subscriptionData); + + + long getMaxOffsetInQuque(final String topic, final int queueId); + + + long getMinOffsetInQuque(final String topic, final int queueId); + + + long getCommitLogOffsetInQueue(final String topic, final int queueId, final long cqOffset); + + + long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp); + + + MessageExt lookMessageByOffset(final long commitLogOffset); + + + SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset); + + + SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize); + + String getRunningDataInfo(); + + + HashMap<String, String> getRuntimeInfo(); + + + long getMaxPhyOffset(); + + + long getMinPhyOffset(); + + + long getEarliestMessageTime(final String topic, final int queueId); + long getEarliestMessageTime(); + + + long getMessageStoreTimeStamp(final String topic, final int queueId, final long offset); + + + long getMessageTotalInQueue(final String topic, final int queueId); + + SelectMappedBufferResult getCommitLogData(final long offset); + + + boolean appendToCommitLog(final long startOffset, final byte[] data); + + void excuteDeleteFilesManualy(); + + + QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, + final long begin, final long end); + + + void updateHaMasterAddress(final String newAddr); + + + long slaveFallBehindMuch(); + + + long now(); + + + int cleanUnusedTopic(final Set<String> topics); + + + void cleanExpiredConsumerQueue(); + + + boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset); + + + long dispatchBehindBytes(); + + long flush(); + + boolean resetWriteOffset(long phyOffset); + + long getConfirmOffset(); + + void setConfirmOffset(long phyOffset); + + boolean isOSPageCacheBusy(); + + long lockTimeMills(); + + boolean isTransientStorePoolDeficient(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageResult.java new file mode 100644 index 0000000..b36ae58 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageResult.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +/** + * @author shijia.wxr + */ +public class PutMessageResult { + private PutMessageStatus putMessageStatus; + private AppendMessageResult appendMessageResult; + + + public PutMessageResult(PutMessageStatus putMessageStatus, AppendMessageResult appendMessageResult) { + this.putMessageStatus = putMessageStatus; + this.appendMessageResult = appendMessageResult; + } + + + public boolean isOk() { + return this.appendMessageResult != null && this.appendMessageResult.isOk(); + } + + + public AppendMessageResult getAppendMessageResult() { + return appendMessageResult; + } + + + public void setAppendMessageResult(AppendMessageResult appendMessageResult) { + this.appendMessageResult = appendMessageResult; + } + + + public PutMessageStatus getPutMessageStatus() { + return putMessageStatus; + } + + + public void setPutMessageStatus(PutMessageStatus putMessageStatus) { + this.putMessageStatus = putMessageStatus; + } + + + @Override + public String toString() { + return "PutMessageResult [putMessageStatus=" + putMessageStatus + ", appendMessageResult=" + + appendMessageResult + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageStatus.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageStatus.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageStatus.java new file mode 100644 index 0000000..4758789 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/PutMessageStatus.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +/** + * @author shijia.wxr + */ +public enum PutMessageStatus { + PUT_OK, + FLUSH_DISK_TIMEOUT, + FLUSH_SLAVE_TIMEOUT, + SLAVE_NOT_AVAILABLE, + SERVICE_NOT_AVAILABLE, + CREATE_MAPEDFILE_FAILED, + MESSAGE_ILLEGAL, + PROPERTIES_SIZE_EXCEEDED, + OS_PAGECACHE_BUSY, + UNKNOWN_ERROR, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/QueryMessageResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/QueryMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/QueryMessageResult.java new file mode 100644 index 0000000..a8e3fed --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/QueryMessageResult.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class QueryMessageResult { + + private final List<SelectMappedBufferResult> messageMapedList = + new ArrayList<SelectMappedBufferResult>(100); + + private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100); + private long indexLastUpdateTimestamp; + private long indexLastUpdatePhyoffset; + + private int bufferTotalSize = 0; + + + public void addMessage(final SelectMappedBufferResult mapedBuffer) { + this.messageMapedList.add(mapedBuffer); + this.messageBufferList.add(mapedBuffer.getByteBuffer()); + this.bufferTotalSize += mapedBuffer.getSize(); + } + + + public void release() { + for (SelectMappedBufferResult select : this.messageMapedList) { + select.release(); + } + } + + + public long getIndexLastUpdateTimestamp() { + return indexLastUpdateTimestamp; + } + + + public void setIndexLastUpdateTimestamp(long indexLastUpdateTimestamp) { + this.indexLastUpdateTimestamp = indexLastUpdateTimestamp; + } + + + public long getIndexLastUpdatePhyoffset() { + return indexLastUpdatePhyoffset; + } + + + public void setIndexLastUpdatePhyoffset(long indexLastUpdatePhyoffset) { + this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset; + } + + + public List<ByteBuffer> getMessageBufferList() { + return messageBufferList; + } + + + public int getBufferTotalSize() { + return bufferTotalSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ReferenceResource.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ReferenceResource.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ReferenceResource.java new file mode 100644 index 0000000..7a50f3a --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ReferenceResource.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import java.util.concurrent.atomic.AtomicLong; + + +/** + * @author shijia.wxr + */ +public abstract class ReferenceResource { + protected final AtomicLong refCount = new AtomicLong(1); + protected volatile boolean available = true; + protected volatile boolean cleanupOver = false; + private volatile long firstShutdownTimestamp = 0; + + + public synchronized boolean hold() { + if (this.isAvailable()) { + if (this.refCount.getAndIncrement() > 0) { + return true; + } else { + this.refCount.getAndDecrement(); + } + } + + return false; + } + + + public boolean isAvailable() { + return this.available; + } + + + + public void shutdown(final long intervalForcibly) { + if (this.available) { + this.available = false; + this.firstShutdownTimestamp = System.currentTimeMillis(); + this.release(); + } + + else if (this.getRefCount() > 0) { + if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { + this.refCount.set(-1000 - this.getRefCount()); + this.release(); + } + } + } + + public void release() { + long value = this.refCount.decrementAndGet(); + if (value > 0) + return; + + synchronized (this) { + + this.cleanupOver = this.cleanup(value); + } + } + + public long getRefCount() { + return this.refCount.get(); + } + + public abstract boolean cleanup(final long currentRef); + + + public boolean isCleanupOver() { + return this.refCount.get() <= 0 && this.cleanupOver; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/RunningFlags.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/RunningFlags.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/RunningFlags.java new file mode 100644 index 0000000..9343ed9 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/RunningFlags.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +/** + * @author shijia.wxr + */ +public class RunningFlags { + + private static final int NOT_READABLE_BIT = 1; + + private static final int NOT_WRITEABLE_BIT = 1 << 1; + + private static final int WRITE_LOGICS_QUEUE_ERROR_BIT = 1 << 2; + + private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3; + + private static final int DISK_FULL_BIT = 1 << 4; + private volatile int flagBits = 0; + + + public RunningFlags() { + } + + + public int getFlagBits() { + return flagBits; + } + + + public boolean getAndMakeReadable() { + boolean result = this.isReadable(); + if (!result) { + this.flagBits &= ~NOT_READABLE_BIT; + } + return result; + } + + + public boolean isReadable() { + if ((this.flagBits & NOT_READABLE_BIT) == 0) { + return true; + } + + return false; + } + + + public boolean getAndMakeNotReadable() { + boolean result = this.isReadable(); + if (result) { + this.flagBits |= NOT_READABLE_BIT; + } + return result; + } + + + public boolean getAndMakeWriteable() { + boolean result = this.isWriteable(); + if (!result) { + this.flagBits &= ~NOT_WRITEABLE_BIT; + } + return result; + } + + + public boolean isWriteable() { + if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) { + return true; + } + + return false; + } + + + public boolean getAndMakeNotWriteable() { + boolean result = this.isWriteable(); + if (result) { + this.flagBits |= NOT_WRITEABLE_BIT; + } + return result; + } + + + public void makeLogicsQueueError() { + this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT; + } + + + public boolean isLogicsQueueError() { + if ((this.flagBits & WRITE_LOGICS_QUEUE_ERROR_BIT) == WRITE_LOGICS_QUEUE_ERROR_BIT) { + return true; + } + + return false; + } + + + public void makeIndexFileError() { + this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT; + } + + + public boolean isIndexFileError() { + if ((this.flagBits & WRITE_INDEX_FILE_ERROR_BIT) == WRITE_INDEX_FILE_ERROR_BIT) { + return true; + } + + return false; + } + + + public boolean getAndMakeDiskFull() { + boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT); + this.flagBits |= DISK_FULL_BIT; + return result; + } + + + public boolean getAndMakeDiskOK() { + boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT); + this.flagBits &= ~DISK_FULL_BIT; + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/SelectMappedBufferResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/SelectMappedBufferResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/SelectMappedBufferResult.java new file mode 100644 index 0000000..dba8072 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/SelectMappedBufferResult.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import java.nio.ByteBuffer; + + +/** + * @author shijia.wxr + */ +public class SelectMappedBufferResult { + + private final long startOffset; + + private final ByteBuffer byteBuffer; + + private int size; + + private MappedFile mappedFile; + + + public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, int size, MappedFile mappedFile) { + this.startOffset = startOffset; + this.byteBuffer = byteBuffer; + this.size = size; + this.mappedFile = mappedFile; + } + + + public ByteBuffer getByteBuffer() { + return byteBuffer; + } + + + public int getSize() { + return size; + } + + + public void setSize(final int s) { + this.size = s; + this.byteBuffer.limit(this.size); + } + + + public MappedFile getMappedFile() { + return mappedFile; + } + + +// @Override +// protected void finalize() { +// if (this.mappedFile != null) { +// this.release(); +// } +// } + + + public synchronized void release() { + if (this.mappedFile != null) { + this.mappedFile.release(); + this.mappedFile = null; + } + } + + + public long getStartOffset() { + return startOffset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreCheckpoint.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreCheckpoint.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreCheckpoint.java new file mode 100644 index 0000000..2d75fd5 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/StoreCheckpoint.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; + + +/** + * @author shijia.wxr + */ +public class StoreCheckpoint { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private final RandomAccessFile randomAccessFile; + private final FileChannel fileChannel; + private final MappedByteBuffer mappedByteBuffer; + private volatile long physicMsgTimestamp = 0; + private volatile long logicsMsgTimestamp = 0; + private volatile long indexMsgTimestamp = 0; + + + public StoreCheckpoint(final String scpPath) throws IOException { + File file = new File(scpPath); + MappedFile.ensureDirOK(file.getParent()); + boolean fileExists = file.exists(); + + this.randomAccessFile = new RandomAccessFile(file, "rw"); + this.fileChannel = this.randomAccessFile.getChannel(); + this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE); + + if (fileExists) { + log.info("store checkpoint file exists, " + scpPath); + this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0); + this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8); + this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16); + + log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", " + + UtilAll.timeMillisToHumanString(this.physicMsgTimestamp)); + log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", " + + UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp)); + log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", " + + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp)); + } else { + log.info("store checkpoint file not exists, " + scpPath); + } + } + + + public void shutdown() { + this.flush(); + + // unmap mappedByteBuffer + MappedFile.clean(this.mappedByteBuffer); + + try { + this.fileChannel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + public void flush() { + this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp); + this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp); + this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp); + this.mappedByteBuffer.force(); + } + + + public long getPhysicMsgTimestamp() { + return physicMsgTimestamp; + } + + + public void setPhysicMsgTimestamp(long physicMsgTimestamp) { + this.physicMsgTimestamp = physicMsgTimestamp; + } + + + public long getLogicsMsgTimestamp() { + return logicsMsgTimestamp; + } + + + public void setLogicsMsgTimestamp(long logicsMsgTimestamp) { + this.logicsMsgTimestamp = logicsMsgTimestamp; + } + + + public long getMinTimestampIndex() { + return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp); + } + + + public long getMinTimestamp() { + long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp); + + + // fixed https://github.com/alibaba/RocketMQ/issues/467 + min -= 1000 * 3; + if (min < 0) + min = 0; + + return min; + } + + + public long getIndexMsgTimestamp() { + return indexMsgTimestamp; + } + + + public void setIndexMsgTimestamp(long indexMsgTimestamp) { + this.indexMsgTimestamp = indexMsgTimestamp; + } + +}
