http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java new file mode 100644 index 0000000..f23eb1c --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ConsumeQueue.java @@ -0,0 +1,495 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class ConsumeQueue { + + public static final int CQ_STORE_UNIT_SIZE = 20; + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); + + private final DefaultMessageStore defaultMessageStore; + + private final MappedFileQueue mappedFileQueue; + private final String topic; + private final int queueId; + private final ByteBuffer byteBufferIndex; + + private final String storePath; + private final int mappedFileSize; + private long maxPhysicOffset = -1; + private volatile long minLogicOffset = 0; + + + public ConsumeQueue( + final String topic, + final int queueId, + final String storePath, + final int mappedFileSize, + final DefaultMessageStore defaultMessageStore) { + this.storePath = storePath; + this.mappedFileSize = mappedFileSize; + this.defaultMessageStore = defaultMessageStore; + + this.topic = topic; + this.queueId = queueId; + + String queueDir = this.storePath + + File.separator + topic + + File.separator + queueId; + + this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null); + + this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); + } + + + public boolean load() { + boolean result = this.mappedFileQueue.load(); + log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed")); + return result; + } + + + public void recover() { + final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); + if (!mappedFiles.isEmpty()) { + + int index = mappedFiles.size() - 3; + if (index < 0) + index = 0; + + int mapedFileSizeLogics = this.mappedFileSize; + MappedFile mappedFile = mappedFiles.get(index); + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + long processOffset = mappedFile.getFileFromOffset(); + long mapedFileOffset = 0; + while (true) { + for (int i = 0; i < mapedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { + long offset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + long tagsCode = byteBuffer.getLong(); + + if (offset >= 0 && size > 0) { + mapedFileOffset = i + CQ_STORE_UNIT_SIZE; + this.maxPhysicOffset = offset; + } else { + log.info("recover current consume queue file over, " + mappedFile.getFileName() + " " + + offset + " " + size + " " + tagsCode); + break; + } + } + + + if (mapedFileOffset == mapedFileSizeLogics) { + index++; + if (index >= mappedFiles.size()) { + + log.info("recover last consume queue file over, last maped file " + + mappedFile.getFileName()); + break; + } else { + mappedFile = mappedFiles.get(index); + byteBuffer = mappedFile.sliceByteBuffer(); + processOffset = mappedFile.getFileFromOffset(); + mapedFileOffset = 0; + log.info("recover next consume queue file, " + mappedFile.getFileName()); + } + } else { + log.info("recover current consume queue queue over " + mappedFile.getFileName() + " " + + (processOffset + mapedFileOffset)); + break; + } + } + + processOffset += mapedFileOffset; + this.mappedFileQueue.setFlushedWhere(processOffset); + this.mappedFileQueue.setCommittedWhere(processOffset); + this.mappedFileQueue.truncateDirtyFiles(processOffset); + } + } + + public long getOffsetInQueueByTime(final long timestamp) { + MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp); + if (mappedFile != null) { + long offset = 0; + int low = + minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile + .getFileFromOffset()) : 0; + int high = 0; + int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1; + long leftIndexValue = -1L, rightIndexValue = -1L; + long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset(); + SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); + if (null != sbr) { + ByteBuffer byteBuffer = sbr.getByteBuffer(); + high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE; + try { + while (high >= low) { + midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE; + byteBuffer.position(midOffset); + long phyOffset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + if (phyOffset < minPhysicOffset) { + low = midOffset + CQ_STORE_UNIT_SIZE; + leftOffset = midOffset; + continue; + } + + long storeTime = + this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + if (storeTime < 0) { + return 0; + } else if (storeTime == timestamp) { + targetOffset = midOffset; + break; + } else if (storeTime > timestamp) { + high = midOffset - CQ_STORE_UNIT_SIZE; + rightOffset = midOffset; + rightIndexValue = storeTime; + } else { + low = midOffset + CQ_STORE_UNIT_SIZE; + leftOffset = midOffset; + leftIndexValue = storeTime; + } + } + + if (targetOffset != -1) { + + offset = targetOffset; + } else { + if (leftIndexValue == -1) { + + offset = rightOffset; + } else if (rightIndexValue == -1) { + + offset = leftOffset; + } else { + offset = + Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp + - rightIndexValue) ? rightOffset : leftOffset; + } + } + + return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE; + } finally { + sbr.release(); + } + } + } + return 0; + } + + public void truncateDirtyLogicFiles(long phyOffet) { + + int logicFileSize = this.mappedFileSize; + + this.maxPhysicOffset = phyOffet - 1; + + while (true) { + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + if (mappedFile != null) { + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + + mappedFile.setWrotePosition(0); + mappedFile.setCommittedPosition(0); + mappedFile.setFlushedPosition(0); + + for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) { + long offset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + byteBuffer.getLong(); + + + if (0 == i) { + if (offset >= phyOffet) { + this.mappedFileQueue.deleteLastMappedFile(); + break; + } else { + int pos = i + CQ_STORE_UNIT_SIZE; + mappedFile.setWrotePosition(pos); + mappedFile.setCommittedPosition(pos); + mappedFile.setFlushedPosition(pos); + this.maxPhysicOffset = offset; + } + } else { + + if (offset >= 0 && size > 0) { + + if (offset >= phyOffet) { + return; + } + + int pos = i + CQ_STORE_UNIT_SIZE; + mappedFile.setWrotePosition(pos); + mappedFile.setCommittedPosition(pos); + mappedFile.setFlushedPosition(pos); + this.maxPhysicOffset = offset; + + + if (pos == logicFileSize) { + return; + } + } else { + return; + } + } + } + } else { + break; + } + } + } + + public long getLastOffset() { + long lastOffset = -1; + + int logicFileSize = this.mappedFileSize; + + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + if (mappedFile != null) { + + int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE; + if (position < 0) + position = 0; + + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + byteBuffer.position(position); + for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) { + long offset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + byteBuffer.getLong(); + + + if (offset >= 0 && size > 0) { + lastOffset = offset + size; + } else { + break; + } + } + } + + return lastOffset; + } + + + public boolean flush(final int flushLeastPages) { + return this.mappedFileQueue.flush(flushLeastPages); + } + + + public int deleteExpiredFile(long offset) { + int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE); + this.correctMinOffset(offset); + return cnt; + } + + public void correctMinOffset(long phyMinOffset) { + MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); + if (mappedFile != null) { + SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0); + if (result != null) { + try { + + for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { + long offsetPy = result.getByteBuffer().getLong(); + result.getByteBuffer().getInt(); + result.getByteBuffer().getLong(); + + if (offsetPy >= phyMinOffset) { + this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i; + log.info("compute logics min offset: " + this.getMinOffsetInQuque() + ", topic: " + + this.topic + ", queueId: " + this.queueId); + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + result.release(); + } + } + } + } + + + public long getMinOffsetInQuque() { + return this.minLogicOffset / CQ_STORE_UNIT_SIZE; + } + + + public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, + long logicOffset) { + final int maxRetries = 30; + boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); + for (int i = 0; i < maxRetries && canWrite; i++) { + boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset); + if (result) { + this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp); + return; + } else { + // XXX: warn and notify me + log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset + + " failed, retry " + i + " times"); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + log.warn("", e); + } + } + } + + // XXX: warn and notify me + log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId); + this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); + } + + private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, + final long cqOffset) { + + if (offset <= this.maxPhysicOffset) { + return true; + } + + this.byteBufferIndex.flip(); + this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); + this.byteBufferIndex.putLong(offset); + this.byteBufferIndex.putInt(size); + this.byteBufferIndex.putLong(tagsCode); + + final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; + + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); + if (mappedFile != null) { + + if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { + this.minLogicOffset = expectLogicOffset; + this.mappedFileQueue.setFlushedWhere(expectLogicOffset); + this.mappedFileQueue.setCommittedWhere(expectLogicOffset); + this.fillPreBlank(mappedFile, expectLogicOffset); + log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + + mappedFile.getWrotePosition()); + } + + if (cqOffset != 0) { + long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); + if (expectLogicOffset != currentLogicOffset) { + LOG_ERROR.warn( + "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, + currentLogicOffset, + this.topic, + this.queueId, + expectLogicOffset - currentLogicOffset + ); + } + } + this.maxPhysicOffset = offset; + return mappedFile.appendMessage(this.byteBufferIndex.array()); + } + return false; + } + + + private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) { + ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); + byteBuffer.putLong(0L); + byteBuffer.putInt(Integer.MAX_VALUE); + byteBuffer.putLong(0L); + + int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize()); + for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) { + mappedFile.appendMessage(byteBuffer.array()); + } + } + + public SelectMappedBufferResult getIndexBuffer(final long startIndex) { + int mappedFileSize = this.mappedFileSize; + long offset = startIndex * CQ_STORE_UNIT_SIZE; + if (offset >= this.getMinLogicOffset()) { + MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); + if (mappedFile != null) { + SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); + return result; + } + } + return null; + } + + public long getMinLogicOffset() { + return minLogicOffset; + } + + public void setMinLogicOffset(long minLogicOffset) { + this.minLogicOffset = minLogicOffset; + } + + public long rollNextFile(final long index) { + int mapedFileSize = this.mappedFileSize; + int totalUnitsInFile = mapedFileSize / CQ_STORE_UNIT_SIZE; + return index + totalUnitsInFile - index % totalUnitsInFile; + } + + public String getTopic() { + return topic; + } + + public int getQueueId() { + return queueId; + } + + public long getMaxPhysicOffset() { + return maxPhysicOffset; + } + + public void setMaxPhysicOffset(long maxPhysicOffset) { + this.maxPhysicOffset = maxPhysicOffset; + } + + public void destroy() { + this.maxPhysicOffset = -1; + this.minLogicOffset = 0; + this.mappedFileQueue.destroy(); + } + + public long getMessageTotalInQueue() { + return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque(); + } + + + public long getMaxOffsetInQuque() { + return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; + } + + + public void checkSelf() { + mappedFileQueue.checkSelf(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java new file mode 100644 index 0000000..2086438 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageFilter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; + + +/** + * @author shijia.wxr + */ +public class DefaultMessageFilter implements MessageFilter { + + @Override + public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) { + if (tagsCode == null) { + return true; + } + + if (null == subscriptionData) { + return true; + } + + if (subscriptionData.isClassFilterMode()) + return true; + + if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) { + return true; + } + + return subscriptionData.getCodeSet().contains(tagsCode.intValue()); + } + +}
