http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 6326d4b..d9e2f03 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -16,14 +16,12 @@ */ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.constant.LoggerName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.nio.ByteBuffer; import java.util.List; - +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConsumeQueue { @@ -43,13 +41,12 @@ public class ConsumeQueue { private long maxPhysicOffset = -1; private volatile long minLogicOffset = 0; - public ConsumeQueue( - final String topic, - final int queueId, - final String storePath, - final int mappedFileSize, - final DefaultMessageStore defaultMessageStore) { + final String topic, + final int queueId, + final String storePath, + final int mappedFileSize, + final DefaultMessageStore defaultMessageStore) { this.storePath = storePath; this.mappedFileSize = mappedFileSize; this.defaultMessageStore = defaultMessageStore; @@ -58,22 +55,20 @@ public class ConsumeQueue { this.queueId = queueId; String queueDir = this.storePath - + File.separator + topic - + File.separator + queueId; + + File.separator + topic + + File.separator + queueId; this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null); this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); } - public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed")); return result; } - public void recover() { final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { @@ -98,18 +93,17 @@ public class ConsumeQueue { this.maxPhysicOffset = offset; } else { log.info("recover current consume queue file over, " + mappedFile.getFileName() + " " - + offset + " " + size + " " + tagsCode); + + offset + " " + size + " " + tagsCode); break; } } - if (mapedFileOffset == mapedFileSizeLogics) { index++; if (index >= mappedFiles.size()) { log.info("recover last consume queue file over, last maped file " - + mappedFile.getFileName()); + + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); @@ -120,7 +114,7 @@ public class ConsumeQueue { } } else { log.info("recover current consume queue queue over " + mappedFile.getFileName() + " " - + (processOffset + mapedFileOffset)); + + (processOffset + mapedFileOffset)); break; } } @@ -137,8 +131,8 @@ public class ConsumeQueue { if (mappedFile != null) { long offset = 0; int low = - minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile - .getFileFromOffset()) : 0; + minLogicOffset > mappedFile.getFileFromOffset() ? (int)(minLogicOffset - mappedFile + .getFileFromOffset()) : 0; int high = 0; int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1; long leftIndexValue = -1L, rightIndexValue = -1L; @@ -160,7 +154,7 @@ public class ConsumeQueue { } long storeTime = - this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); if (storeTime < 0) { return 0; } else if (storeTime == timestamp) { @@ -189,8 +183,8 @@ public class ConsumeQueue { offset = leftOffset; } else { offset = - Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - - rightIndexValue) ? rightOffset : leftOffset; + Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp + - rightIndexValue) ? rightOffset : leftOffset; } } @@ -223,7 +217,6 @@ public class ConsumeQueue { int size = byteBuffer.getInt(); byteBuffer.getLong(); - if (0 == i) { if (offset >= phyOffet) { this.mappedFileQueue.deleteLastMappedFile(); @@ -249,7 +242,6 @@ public class ConsumeQueue { mappedFile.setFlushedPosition(pos); this.maxPhysicOffset = offset; - if (pos == logicFileSize) { return; } @@ -283,7 +275,6 @@ public class ConsumeQueue { int size = byteBuffer.getInt(); byteBuffer.getLong(); - if (offset >= 0 && size > 0) { lastOffset = offset + size; } else { @@ -295,12 +286,10 @@ public class ConsumeQueue { return lastOffset; } - public boolean flush(final int flushLeastPages) { return this.mappedFileQueue.flush(flushLeastPages); } - public int deleteExpiredFile(long offset) { int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE); this.correctMinOffset(offset); @@ -322,7 +311,7 @@ public class ConsumeQueue { if (offsetPy >= phyMinOffset) { this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i; log.info("compute logics min offset: " + this.getMinOffsetInQuque() + ", topic: " - + this.topic + ", queueId: " + this.queueId); + + this.topic + ", queueId: " + this.queueId); break; } } @@ -335,14 +324,12 @@ public class ConsumeQueue { } } - public long getMinOffsetInQuque() { return this.minLogicOffset / CQ_STORE_UNIT_SIZE; } - public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, - long logicOffset) { + long logicOffset) { final int maxRetries = 30; boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); for (int i = 0; i < maxRetries && canWrite; i++) { @@ -353,7 +340,7 @@ public class ConsumeQueue { } else { // XXX: warn and notify me log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset - + " failed, retry " + i + " times"); + + " failed, retry " + i + " times"); try { Thread.sleep(1000); @@ -369,7 +356,7 @@ public class ConsumeQueue { } private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, - final long cqOffset) { + final long cqOffset) { if (offset <= this.maxPhysicOffset) { return true; @@ -392,19 +379,19 @@ public class ConsumeQueue { this.mappedFileQueue.setCommittedWhere(expectLogicOffset); this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " - + mappedFile.getWrotePosition()); + + mappedFile.getWrotePosition()); } if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( - "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", - expectLogicOffset, - currentLogicOffset, - this.topic, - this.queueId, - expectLogicOffset - currentLogicOffset + "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, + currentLogicOffset, + this.topic, + this.queueId, + expectLogicOffset - currentLogicOffset ); } } @@ -414,14 +401,13 @@ public class ConsumeQueue { return false; } - private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) { ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); byteBuffer.putLong(0L); byteBuffer.putInt(Integer.MAX_VALUE); byteBuffer.putLong(0L); - int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize()); + int until = (int)(untilWhere % this.mappedFileQueue.getMappedFileSize()); for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) { mappedFile.appendMessage(byteBuffer.array()); } @@ -433,7 +419,7 @@ public class ConsumeQueue { if (offset >= this.getMinLogicOffset()) { MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { - SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); + SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int)(offset % mappedFileSize)); return result; } } @@ -480,12 +466,10 @@ public class ConsumeQueue { return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque(); } - public long getMaxOffsetInQuque() { return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; } - public void checkSelf() { mappedFileQueue.checkSelf(); }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java index ac149f4..4ebcb3e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageFilter.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; - public class DefaultMessageFilter implements MessageFilter { @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 8714055..7e3af19 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -16,7 +16,27 @@ */ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.*; +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.SystemClock; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -34,21 +54,8 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import static org.apache.rocketmq.store.config.BrokerRole.SLAVE; - public class DefaultMessageStore implements MessageStore { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -84,7 +91,7 @@ public class DefaultMessageStore implements MessageStore { private final SystemClock systemClock = new SystemClock(); private final ScheduledExecutorService scheduledExecutorService = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread")); + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread")); private final BrokerStatsManager brokerStatsManager; private final MessageArrivingListener messageArrivingListener; private final BrokerConfig brokerConfig; @@ -96,7 +103,7 @@ public class DefaultMessageStore implements MessageStore { private AtomicLong printTimes = new AtomicLong(0); public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, - final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { + final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; @@ -122,13 +129,11 @@ public class DefaultMessageStore implements MessageStore { this.transientStorePool.init(); } - this.allocateMappedFileService.start(); this.indexService.start(); } - public void truncateDirtyLogicFiles(long phyOffset) { ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; @@ -139,7 +144,6 @@ public class DefaultMessageStore implements MessageStore { } } - /** * @throws IOException */ @@ -162,11 +166,10 @@ public class DefaultMessageStore implements MessageStore { if (result) { this.storeCheckpoint = - new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); + new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); - this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); @@ -191,7 +194,6 @@ public class DefaultMessageStore implements MessageStore { this.commitLog.start(); this.storeStatsService.start(); - if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { this.scheduleMessageService.start(); } @@ -293,19 +295,16 @@ public class DefaultMessageStore implements MessageStore { this.printTimes.set(0); } - if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } - if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } - if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); } @@ -332,7 +331,7 @@ public class DefaultMessageStore implements MessageStore { long diff = this.systemClock.now() - begin; if (diff < 10000000 // - && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) { + && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) { return true; } @@ -353,7 +352,7 @@ public class DefaultMessageStore implements MessageStore { } public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, - final SubscriptionData subscriptionData) { + final SubscriptionData subscriptionData) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; @@ -366,7 +365,6 @@ public class DefaultMessageStore implements MessageStore { long beginTime = this.getSystemClock().now(); - GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; long nextBeginOffset = offset; long minOffset = 0; @@ -374,7 +372,6 @@ public class DefaultMessageStore implements MessageStore { GetMessageResult getResult = new GetMessageResult(); - final long maxOffsetPy = this.commitLog.getMaxOffset(); ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); @@ -417,21 +414,18 @@ public class DefaultMessageStore implements MessageStore { maxPhyOffsetPulling = offsetPy; - if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } - boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), - isInDisk)) { + isInDisk)) { break; } - if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) { SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (selectResult != null) { @@ -444,7 +438,6 @@ public class DefaultMessageStore implements MessageStore { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } - nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); } } else { @@ -458,7 +451,6 @@ public class DefaultMessageStore implements MessageStore { } } - if (diskFallRecorded) { long fallBehind = maxOffsetPy - maxPhyOffsetPulling; brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); @@ -466,10 +458,9 @@ public class DefaultMessageStore implements MessageStore { nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); - long diff = maxOffsetPy - maxPhyOffsetPulling; - long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE - * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); + long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE + * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { @@ -479,7 +470,7 @@ public class DefaultMessageStore implements MessageStore { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " - + maxOffset + ", but access logic queue failed."); + + maxOffset + ", but access logic queue failed."); } } } else { @@ -605,7 +596,6 @@ public class DefaultMessageStore implements MessageStore { } - { String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); @@ -613,7 +603,6 @@ public class DefaultMessageStore implements MessageStore { result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio)); } - { if (this.scheduleMessageService != null) { this.scheduleMessageService.buildRunningStats(result); @@ -741,7 +730,6 @@ public class DefaultMessageStore implements MessageStore { break; } - Collections.sort(queryOffsetResult.getPhyOffsets()); queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); @@ -784,12 +772,10 @@ public class DefaultMessageStore implements MessageStore { } } - if (queryMessageResult.getBufferTotalSize() > 0) { break; } - if (lastQueryMsgTime < begin) { break; } @@ -825,8 +811,8 @@ public class DefaultMessageStore implements MessageStore { for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", // - cq.getTopic(), // - cq.getQueueId() // + cq.getTopic(), // + cq.getQueueId() // ); this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); @@ -854,23 +840,22 @@ public class DefaultMessageStore implements MessageStore { Entry<Integer, ConsumeQueue> nextQT = itQT.next(); long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset(); - if (maxCLOffsetInConsumeQueue == -1) { log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", // - nextQT.getValue().getTopic(), // - nextQT.getValue().getQueueId(), // - nextQT.getValue().getMaxPhysicOffset(), // - nextQT.getValue().getMinLogicOffset()); + nextQT.getValue().getTopic(), // + nextQT.getValue().getQueueId(), // + nextQT.getValue().getMaxPhysicOffset(), // + nextQT.getValue().getMinLogicOffset()); } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { log.info( - "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", // - topic, // - nextQT.getKey(), // - minCommitLogOffset, // - maxCLOffsetInConsumeQueue); + "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", // + topic, // + nextQT.getKey(), // + minCommitLogOffset, // + maxCLOffsetInConsumeQueue); DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), - nextQT.getValue().getQueueId()); + nextQT.getValue().getQueueId()); nextQT.getValue().destroy(); itQT.remove(); @@ -910,7 +895,7 @@ public class DefaultMessageStore implements MessageStore { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); final ByteBuffer msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); String msgId = - MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy); + MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy); messageIds.put(msgId, nextOffset++); if (nextOffset > maxOffset) { return messageIds; @@ -1006,11 +991,11 @@ public class DefaultMessageStore implements MessageStore { ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue(// - topic, // - queueId, // - StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // - this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), // - this); + topic, // + queueId, // + StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // + this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), // + this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; @@ -1031,7 +1016,7 @@ public class DefaultMessageStore implements MessageStore { } private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) { - long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); + long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); return (maxOffsetPy - offsetPy) > memory; } @@ -1045,7 +1030,6 @@ public class DefaultMessageStore implements MessageStore { return true; } - if (isInDisk) { if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) { return true; @@ -1093,7 +1077,6 @@ public class DefaultMessageStore implements MessageStore { } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -1101,7 +1084,6 @@ public class DefaultMessageStore implements MessageStore { } }, 1, 10, TimeUnit.MINUTES); - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -1113,7 +1095,7 @@ public class DefaultMessageStore implements MessageStore { String stack = UtilAll.jstack(); final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" - + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; + + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; MixAll.string2FileNotSafe(stack, fileName); } } @@ -1174,11 +1156,11 @@ public class DefaultMessageStore implements MessageStore { continue; } ConsumeQueue logic = new ConsumeQueue( - topic, - queueId, - StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), - this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), - this); + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), + this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), + this); this.putConsumeQueue(topic, queueId, logic); if (!logic.load()) { return false; @@ -1196,7 +1178,6 @@ public class DefaultMessageStore implements MessageStore { private void recover(final boolean lastExitOK) { this.recoverConsumeQueue(); - if (lastExitOK) { this.commitLog.recoverNormally(); } else { @@ -1285,7 +1266,7 @@ public class DefaultMessageStore implements MessageStore { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(), - req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset()); + req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset()); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: @@ -1298,7 +1279,7 @@ public class DefaultMessageStore implements MessageStore { } public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp, - long logicOffset) { + long logicOffset) { ConsumeQueue cq = this.findConsumeQueue(topic, queueId); cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset); } @@ -1311,23 +1292,21 @@ public class DefaultMessageStore implements MessageStore { private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; private final double diskSpaceWarningLevelRatio = - Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); private final double diskSpaceCleanForciblyRatio = - Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); private long lastRedeleteTimestamp = 0; private volatile int manualDeleteFileSeveralTimes = 0; private volatile boolean cleanImmediately = false; - public void excuteDeleteFilesManualy() { this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES; DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked"); } - public void run() { try { this.deleteExpiredFiles(); @@ -1348,27 +1327,24 @@ public class DefaultMessageStore implements MessageStore { boolean spacefull = this.isSpaceToDelete(); boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; - if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; - boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", // - fileReservedTime, // - timeup, // - spacefull, // - manualDeleteFileSeveralTimes, // - cleanAtOnce); - + fileReservedTime, // + timeup, // + spacefull, // + manualDeleteFileSeveralTimes, // + cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, - destroyMapedFileIntervalForcibly, cleanAtOnce); + destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); @@ -1382,7 +1358,7 @@ public class DefaultMessageStore implements MessageStore { if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { this.lastRedeleteTimestamp = currentTimestamp; int destroyMapedFileIntervalForcibly = - DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); + DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { } } @@ -1407,7 +1383,6 @@ public class DefaultMessageStore implements MessageStore { cleanImmediately = false; - { String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); @@ -1433,10 +1408,9 @@ public class DefaultMessageStore implements MessageStore { } } - { String storePathLogics = StorePathConfigHelper - .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); + .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); if (logicsRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); @@ -1490,7 +1464,6 @@ public class DefaultMessageStore implements MessageStore { if (minOffset > this.lastPhysicalMinOffset) { this.lastPhysicalMinOffset = minOffset; - ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { @@ -1506,7 +1479,6 @@ public class DefaultMessageStore implements MessageStore { } } - DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); } } @@ -1520,7 +1492,6 @@ public class DefaultMessageStore implements MessageStore { private static final int RETRY_TIMES_OVER = 3; private long lastFlushTimestamp = 0; - private void doFlush(int retryTimes) { int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); @@ -1530,7 +1501,6 @@ public class DefaultMessageStore implements MessageStore { long logicsMsgTimestamp = 0; - int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { @@ -1558,7 +1528,6 @@ public class DefaultMessageStore implements MessageStore { } } - public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); @@ -1572,19 +1541,16 @@ public class DefaultMessageStore implements MessageStore { } } - this.doFlush(RETRY_TIMES_OVER); DefaultMessageStore.log.info(this.getServiceName() + " service end"); } - @Override public String getServiceName() { return FlushConsumeQueueService.class.getSimpleName(); } - @Override public long getJointime() { return 1000 * 60; @@ -1610,7 +1576,7 @@ public class DefaultMessageStore implements MessageStore { if (this.isCommitLogAvailable()) { log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}", - DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); + DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); } super.shutdown(); @@ -1624,17 +1590,15 @@ public class DefaultMessageStore implements MessageStore { return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset; } - private boolean isCommitLogAvailable() { return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset(); } - private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() // - && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { + && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } @@ -1645,7 +1609,7 @@ public class DefaultMessageStore implements MessageStore { for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = - DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); + DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { @@ -1653,20 +1617,20 @@ public class DefaultMessageStore implements MessageStore { DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() - && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { + && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), - dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, - dispatchRequest.getTagsCode()); + dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, + dispatchRequest.getTagsCode()); } // FIXED BUG By shijia this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService - .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); + .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService - .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) - .addAndGet(dispatchRequest.getMsgSize()); + .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) + .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); @@ -1674,7 +1638,6 @@ public class DefaultMessageStore implements MessageStore { } } else if (!dispatchRequest.isSuccess()) { - if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; @@ -1682,7 +1645,7 @@ public class DefaultMessageStore implements MessageStore { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", - this.reputFromOffset); + this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } @@ -1698,7 +1661,6 @@ public class DefaultMessageStore implements MessageStore { } } - @Override public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); @@ -1715,13 +1677,11 @@ public class DefaultMessageStore implements MessageStore { DefaultMessageStore.log.info(this.getServiceName() + " service end"); } - @Override public String getServiceName() { return ReputMessageService.class.getSimpleName(); } - } public int remainTransientStoreBufferNumbs() { @@ -1733,7 +1693,6 @@ public class DefaultMessageStore implements MessageStore { return remainTransientStoreBufferNumbs() == 0; } - public void unlockMappedFile(final MappedFile mappedFile) { this.scheduledExecutorService.schedule(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java index d0855ab..b086aee 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java +++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java @@ -31,19 +31,18 @@ public class DispatchRequest { private final int sysFlag; private final long preparedTransactionOffset; - public DispatchRequest( - final String topic, - final int queueId, - final long commitLogOffset, - final int msgSize, - final long tagsCode, - final long storeTimestamp, - final long consumeQueueOffset, - final String keys, - final String uniqKey, - final int sysFlag, - final long preparedTransactionOffset + final String topic, + final int queueId, + final long commitLogOffset, + final int msgSize, + final long tagsCode, + final long storeTimestamp, + final long consumeQueueOffset, + final String keys, + final String uniqKey, + final int sysFlag, + final long preparedTransactionOffset ) { this.topic = topic; this.queueId = queueId; @@ -108,57 +107,46 @@ public class DispatchRequest { this.success = success; } - public String getTopic() { return topic; } - public int getQueueId() { return queueId; } - public long getCommitLogOffset() { return commitLogOffset; } - public int getMsgSize() { return msgSize; } - public long getStoreTimestamp() { return storeTimestamp; } - public long getConsumeQueueOffset() { return consumeQueueOffset; } - public String getKeys() { return keys; } - public long getTagsCode() { return tagsCode; } - public int getSysFlag() { return sysFlag; } - public long getPreparedTransactionOffset() { return preparedTransactionOffset; } - public boolean isSuccess() { return success; } @@ -167,5 +155,4 @@ public class DispatchRequest { return uniqKey; } - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java index 0f1ba8c..b7d33f3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java @@ -6,27 +6,25 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; -import org.apache.rocketmq.store.stats.BrokerStatsManager; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; - +import org.apache.rocketmq.store.stats.BrokerStatsManager; public class GetMessageResult { private final List<SelectMappedBufferResult> messageMapedList = - new ArrayList<SelectMappedBufferResult>(100); + new ArrayList<SelectMappedBufferResult>(100); private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100); @@ -41,97 +39,79 @@ public class GetMessageResult { private int msgCount4Commercial = 0; - public GetMessageResult() { } - public GetMessageStatus getStatus() { return status; } - public void setStatus(GetMessageStatus status) { this.status = status; } - public long getNextBeginOffset() { return nextBeginOffset; } - public void setNextBeginOffset(long nextBeginOffset) { this.nextBeginOffset = nextBeginOffset; } - public long getMinOffset() { return minOffset; } - public void setMinOffset(long minOffset) { this.minOffset = minOffset; } - public long getMaxOffset() { return maxOffset; } - public void setMaxOffset(long maxOffset) { this.maxOffset = maxOffset; } - public List<SelectMappedBufferResult> getMessageMapedList() { return messageMapedList; } - public List<ByteBuffer> getMessageBufferList() { return messageBufferList; } - public void addMessage(final SelectMappedBufferResult mapedBuffer) { this.messageMapedList.add(mapedBuffer); this.messageBufferList.add(mapedBuffer.getByteBuffer()); this.bufferTotalSize += mapedBuffer.getSize(); - this.msgCount4Commercial += (int) Math.ceil( - mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT); + this.msgCount4Commercial += (int)Math.ceil( + mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT); } - public void release() { for (SelectMappedBufferResult select : this.messageMapedList) { select.release(); } } - public int getBufferTotalSize() { return bufferTotalSize; } - public void setBufferTotalSize(int bufferTotalSize) { this.bufferTotalSize = bufferTotalSize; } - public int getMessageCount() { return this.messageMapedList.size(); } - public boolean isSuggestPullingFromSlave() { return suggestPullingFromSlave; } - public void setSuggestPullingFromSlave(boolean suggestPullingFromSlave) { this.suggestPullingFromSlave = suggestPullingFromSlave; } @@ -144,12 +124,11 @@ public class GetMessageResult { this.msgCount4Commercial = msgCount4Commercial; } - @Override public String toString() { return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset=" - + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize - + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]"; + + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize + + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java index 003d1d4..f512e12 100644 --- a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java +++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MappedFile.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index ce5f570..6803ec9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -16,16 +16,8 @@ */ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.store.config.FlushDiskType; -import org.apache.rocketmq.store.util.LibC; import com.sun.jna.NativeLong; import com.sun.jna.Pointer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.nio.ch.DirectBuffer; - import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -39,7 +31,13 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.util.LibC; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.nio.ch.DirectBuffer; public class MappedFile extends ReferenceResource { public static final int OS_PAGE_SIZE = 1024 * 4; @@ -48,34 +46,23 @@ public class MappedFile extends ReferenceResource { private static final AtomicLong TOTAL_MAPED_VITUAL_MEMORY = new AtomicLong(0); private static final AtomicInteger TOTAL_MAPED_FILES = new AtomicInteger(0); - - private String fileName; - - private long fileFromOffset; - - protected int fileSize; - - private File file; - - private MappedByteBuffer mappedByteBuffer; - protected final AtomicInteger wrotePosition = new AtomicInteger(0); - - private final AtomicInteger flushedPosition = new AtomicInteger(0); //ADD BY ChenYang protected final AtomicInteger committedPosition = new AtomicInteger(0); - - + private final AtomicInteger flushedPosition = new AtomicInteger(0); + protected int fileSize; protected FileChannel fileChannel; - - private volatile long storeTimestamp = 0; - private boolean firstCreateInQueue = false; - /** * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. */ protected ByteBuffer writeBuffer = null; protected TransientStorePool transientStorePool = null; + private String fileName; + private long fileFromOffset; + private File file; + private MappedByteBuffer mappedByteBuffer; + private volatile long storeTimestamp = 0; + private boolean firstCreateInQueue = false; public MappedFile() { } @@ -88,41 +75,6 @@ public class MappedFile extends ReferenceResource { init(fileName, fileSize, transientStorePool); } - public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { - init(fileName, fileSize); - this.writeBuffer = transientStorePool.borrowBuffer(); - this.transientStorePool = transientStorePool; - } - - private void init(final String fileName, final int fileSize) throws IOException { - this.fileName = fileName; - this.fileSize = fileSize; - this.file = new File(fileName); - this.fileFromOffset = Long.parseLong(this.file.getName()); - boolean ok = false; - - ensureDirOK(this.file.getParent()); - - try { - this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); - this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); - TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize); - TOTAL_MAPED_FILES.incrementAndGet(); - ok = true; - } catch (FileNotFoundException e) { - log.error("create file channel " + this.fileName + " Failed. ", e); - throw e; - } catch (IOException e) { - log.error("map file " + this.fileName + " Failed. ", e); - throw e; - } finally { - if (!ok && this.fileChannel != null) { - this.fileChannel.close(); - } - } - } - - public static void ensureDirOK(final String dirName) { if (dirName != null) { File f = new File(dirName); @@ -133,14 +85,12 @@ public class MappedFile extends ReferenceResource { } } - public static void clean(final ByteBuffer buffer) { if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) return; invoke(invoke(viewed(buffer), "cleaner"), "clean"); } - private static Object invoke(final Object target, final String methodName, final Class<?>... args) { return AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { @@ -155,9 +105,8 @@ public class MappedFile extends ReferenceResource { }); } - private static Method method(Object target, String methodName, Class<?>[] args) - throws NoSuchMethodException { + throws NoSuchMethodException { try { return target.getClass().getMethod(methodName, args); } catch (NoSuchMethodException e) { @@ -165,11 +114,9 @@ public class MappedFile extends ReferenceResource { } } - private static ByteBuffer viewed(ByteBuffer buffer) { String methodName = "viewedBuffer"; - Method[] methods = buffer.getClass().getMethods(); for (int i = 0; i < methods.length; i++) { if (methods[i].getName().equals("attachment")) { @@ -178,23 +125,54 @@ public class MappedFile extends ReferenceResource { } } - ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName); + ByteBuffer viewedBuffer = (ByteBuffer)invoke(buffer, methodName); if (viewedBuffer == null) return buffer; else return viewed(viewedBuffer); } - public static int getTotalmapedfiles() { return TOTAL_MAPED_FILES.get(); } - public static long getTotalMapedVitualMemory() { return TOTAL_MAPED_VITUAL_MEMORY.get(); } + public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { + init(fileName, fileSize); + this.writeBuffer = transientStorePool.borrowBuffer(); + this.transientStorePool = transientStorePool; + } + + private void init(final String fileName, final int fileSize) throws IOException { + this.fileName = fileName; + this.fileSize = fileSize; + this.file = new File(fileName); + this.fileFromOffset = Long.parseLong(this.file.getName()); + boolean ok = false; + + ensureDirOK(this.file.getParent()); + + try { + this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); + this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); + TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize); + TOTAL_MAPED_FILES.incrementAndGet(); + ok = true; + } catch (FileNotFoundException e) { + log.error("create file channel " + this.fileName + " Failed. ", e); + throw e; + } catch (IOException e) { + log.error("map file " + this.fileName + " Failed. ", e); + throw e; + } finally { + if (!ok && this.fileChannel != null) { + this.fileChannel.close(); + } + } + } public long getLastModifiedTimestamp() { return this.file.lastModified(); @@ -214,20 +192,18 @@ public class MappedFile extends ReferenceResource { int currentPos = this.wrotePosition.get(); - if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = - cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); + cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } - log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: " - + this.fileSize); + + this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } @@ -246,7 +222,6 @@ public class MappedFile extends ReferenceResource { public boolean appendMessage(final byte[] data) { int currentPos = this.wrotePosition.get(); - if ((currentPos + data.length) <= this.fileSize) { try { this.fileChannel.position(currentPos); @@ -262,11 +237,7 @@ public class MappedFile extends ReferenceResource { } /** - - * * @param flushLeastPages - - * * @return The current flushed position */ public int flush(final int flushLeastPages) { @@ -370,12 +341,10 @@ public class MappedFile extends ReferenceResource { return flushedPosition.get(); } - public void setFlushedPosition(int pos) { this.flushedPosition.set(pos); } - public boolean isFull() { return this.fileSize == this.wrotePosition.get(); } @@ -392,14 +361,13 @@ public class MappedFile extends ReferenceResource { return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } else { log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " - + this.fileFromOffset); + + this.fileFromOffset); } } else { log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size - + ", fileFromOffset: " + this.fileFromOffset); + + ", fileFromOffset: " + this.fileFromOffset); } - return null; } @@ -419,7 +387,6 @@ public class MappedFile extends ReferenceResource { } } - return null; } @@ -427,13 +394,13 @@ public class MappedFile extends ReferenceResource { public boolean cleanup(final long currentRef) { if (this.isAvailable()) { log.error("this file[REF:" + currentRef + "] " + this.fileName - + " have not shutdown, stop unmaping."); + + " have not shutdown, stop unmaping."); return false; } if (this.isCleanupOver()) { log.error("this file[REF:" + currentRef + "] " + this.fileName - + " have cleanup, do not do it again."); + + " have cleanup, do not do it again."); return true; } @@ -455,9 +422,9 @@ public class MappedFile extends ReferenceResource { long beginTime = System.currentTimeMillis(); boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName - + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" - + this.getFlushedPosition() + ", " - + UtilAll.computeEclipseTimeMilliseconds(beginTime)); + + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + + this.getFlushedPosition() + ", " + + UtilAll.computeEclipseTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn("close file channel " + this.fileName + " Failed. ", e); } @@ -465,7 +432,7 @@ public class MappedFile extends ReferenceResource { return true; } else { log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + this.fileName - + " Failed. cleanupOver: " + this.cleanupOver); + + " Failed. cleanupOver: " + this.cleanupOver); } return false; @@ -475,18 +442,17 @@ public class MappedFile extends ReferenceResource { return wrotePosition.get(); } + public void setWrotePosition(int pos) { + this.wrotePosition.set(pos); + } + /** - * * @return The max position which have valid data */ public int getReadPosition() { return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); } - public void setWrotePosition(int pos) { - this.wrotePosition.set(pos); - } - public void setCommittedPosition(int pos) { this.committedPosition.set(pos); } @@ -497,7 +463,7 @@ public class MappedFile extends ReferenceResource { int flush = 0; long time = System.currentTimeMillis(); for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) { - byteBuffer.put(i, (byte) 0); + byteBuffer.put(i, (byte)0); // force flush when flush disk type is sync if (type == FlushDiskType.SYNC_FLUSH) { if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { @@ -521,11 +487,11 @@ public class MappedFile extends ReferenceResource { // force flush when prepare load finished if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}", - this.getFileName(), System.currentTimeMillis() - beginTime); + this.getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(), - System.currentTimeMillis() - beginTime); + System.currentTimeMillis() - beginTime); this.mlock(); } @@ -542,25 +508,21 @@ public class MappedFile extends ReferenceResource { return this.mappedByteBuffer.slice(); } - public long getStoreTimestamp() { return storeTimestamp; } - public boolean isFirstCreateInQueue() { return firstCreateInQueue; } - public void setFirstCreateInQueue(boolean firstCreateInQueue) { this.firstCreateInQueue = firstCreateInQueue; } - public void mlock() { final long beginTime = System.currentTimeMillis(); - final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); + final long address = ((DirectBuffer)(this.mappedByteBuffer)).address(); Pointer pointer = new Pointer(address); { int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize)); @@ -575,7 +537,7 @@ public class MappedFile extends ReferenceResource { public void munlock() { final long beginTime = System.currentTimeMillis(); - final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); + final long address = ((DirectBuffer)(this.mappedByteBuffer)).address(); Pointer pointer = new Pointer(address); int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize)); log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index a208a07..49455c6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -16,17 +16,19 @@ */ package org.apache.rocketmq.store; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; - - public class MappedFileQueue { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME); @@ -46,15 +48,13 @@ public class MappedFileQueue { private volatile long storeTimestamp = 0; - public MappedFileQueue(final String storePath, int mappedFileSize, - AllocateMappedFileService allocateMappedFileService) { + AllocateMappedFileService allocateMappedFileService) { this.storePath = storePath; this.mappedFileSize = mappedFileSize; this.allocateMappedFileService = allocateMappedFileService; } - public void checkSelf() { if (!this.mappedFiles.isEmpty()) { @@ -66,7 +66,7 @@ public class MappedFileQueue { if (pre != null) { if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) { LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}", - pre.getFileName(), cur.getFileName()); + pre.getFileName(), cur.getFileName()); } } pre = cur; @@ -74,7 +74,6 @@ public class MappedFileQueue { } } - public MappedFile getMappedFileByTime(final long timestamp) { Object[] mfs = this.copyMappedFiles(0); @@ -82,16 +81,15 @@ public class MappedFileQueue { return null; for (int i = 0; i < mfs.length; i++) { - MappedFile mappedFile = (MappedFile) mfs[i]; + MappedFile mappedFile = (MappedFile)mfs[i]; if (mappedFile.getLastModifiedTimestamp() >= timestamp) { return mappedFile; } } - return (MappedFile) mfs[mfs.length - 1]; + return (MappedFile)mfs[mfs.length - 1]; } - private Object[] copyMappedFiles(final int reservedMappedFiles) { Object[] mfs; @@ -103,7 +101,6 @@ public class MappedFileQueue { return mfs; } - public void truncateDirtyFiles(long offset) { List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>(); @@ -111,9 +108,9 @@ public class MappedFileQueue { long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize; if (fileTailOffset > offset) { if (offset >= file.getFileFromOffset()) { - file.setWrotePosition((int) (offset % this.mappedFileSize)); - file.setCommittedPosition((int) (offset % this.mappedFileSize)); - file.setFlushedPosition((int) (offset % this.mappedFileSize)); + file.setWrotePosition((int)(offset % this.mappedFileSize)); + file.setCommittedPosition((int)(offset % this.mappedFileSize)); + file.setFlushedPosition((int)(offset % this.mappedFileSize)); } else { file.destroy(1000); willRemoveFiles.add(file); @@ -124,7 +121,6 @@ public class MappedFileQueue { this.deleteExpiredFile(willRemoveFiles); } - private void deleteExpiredFile(List<MappedFile> files) { if (!files.isEmpty()) { @@ -148,7 +144,6 @@ public class MappedFileQueue { } } - public boolean load() { File dir = new File(this.storePath); File[] files = dir.listFiles(); @@ -159,11 +154,10 @@ public class MappedFileQueue { if (file.length() != this.mappedFileSize) { log.warn(file + "\t" + file.length() - + " length not matched message store config value, ignore it"); + + " length not matched message store config value, ignore it"); return true; } - try { MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); @@ -182,7 +176,6 @@ public class MappedFileQueue { return true; } - public long howMuchFallBehind() { if (this.mappedFiles.isEmpty()) return 0; @@ -198,7 +191,6 @@ public class MappedFileQueue { return 0; } - public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) { long createOffset = -1; MappedFile mappedFileLast = getLastMappedFile(); @@ -214,12 +206,12 @@ public class MappedFileQueue { if (createOffset != -1 && needCreate) { String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); String nextNextFilePath = this.storePath + File.separator - + UtilAll.offset2FileName(createOffset + this.mappedFileSize); + + UtilAll.offset2FileName(createOffset + this.mappedFileSize); MappedFile mappedFile = null; if (this.allocateMappedFileService != null) { mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, - nextNextFilePath, this.mappedFileSize); + nextNextFilePath, this.mappedFileSize); } else { try { mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); @@ -268,11 +260,12 @@ public class MappedFileQueue { if (mappedFileLast != null) { long lastOffset = mappedFileLast.getFileFromOffset() + - mappedFileLast.getWrotePosition(); + mappedFileLast.getWrotePosition(); long diff = lastOffset - offset; final int maxDiff = this.mappedFileSize * 2; - if (diff > maxDiff) return false; + if (diff > maxDiff) + return false; } ListIterator<MappedFile> iterator = this.mappedFiles.listIterator(); @@ -280,7 +273,7 @@ public class MappedFileQueue { while (iterator.hasPrevious()) { mappedFileLast = iterator.previous(); if (offset >= mappedFileLast.getFileFromOffset()) { - int where = (int) (offset % mappedFileLast.getFileSize()); + int where = (int)(offset % mappedFileLast.getFileSize()); mappedFileLast.setFlushedPosition(where); mappedFileLast.setWrotePosition(where); mappedFileLast.setCommittedPosition(where); @@ -306,7 +299,6 @@ public class MappedFileQueue { return -1; } - public long getMaxOffset() { MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { @@ -342,9 +334,9 @@ public class MappedFileQueue { } public int deleteExpiredFileByTime(final long expiredTime, - final int deleteFilesInterval, - final long intervalForcibly, - final boolean cleanImmediately) { + final int deleteFilesInterval, + final long intervalForcibly, + final boolean cleanImmediately) { Object[] mfs = this.copyMappedFiles(0); if (null == mfs) @@ -355,7 +347,7 @@ public class MappedFileQueue { List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { for (int i = 0; i < mfsLength; i++) { - MappedFile mappedFile = (MappedFile) mfs[i]; + MappedFile mappedFile = (MappedFile)mfs[i]; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { if (mappedFile.destroy(intervalForcibly)) { @@ -384,7 +376,6 @@ public class MappedFileQueue { return deleteCount; } - public int deleteExpiredFileByOffset(long offset, int unitSize) { Object[] mfs = this.copyMappedFiles(0); @@ -396,7 +387,7 @@ public class MappedFileQueue { for (int i = 0; i < mfsLength; i++) { boolean destroy; - MappedFile mappedFile = (MappedFile) mfs[i]; + MappedFile mappedFile = (MappedFile)mfs[i]; SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize); if (result != null) { long maxOffsetInLogicQueue = result.getByteBuffer().getLong(); @@ -404,7 +395,7 @@ public class MappedFileQueue { destroy = maxOffsetInLogicQueue < offset; if (destroy) { log.info("physic min offset " + offset + ", logics in current mappedFile max offset " - + maxOffsetInLogicQueue + ", delete it"); + + maxOffsetInLogicQueue + ", delete it"); } } else { log.warn("this being not executed forever."); @@ -425,7 +416,6 @@ public class MappedFileQueue { return deleteCount; } - public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false); @@ -467,10 +457,10 @@ public class MappedFileQueue { try { MappedFile mappedFile = this.getFirstMappedFile(); if (mappedFile != null) { - int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); + int index = (int)((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); if (index < 0 || index >= this.mappedFiles.size()) { LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " + - "mappedFileSize: {}, mappedFiles count: {}", + "mappedFileSize: {}, mappedFiles count: {}", mappedFile, offset, index, @@ -494,7 +484,6 @@ public class MappedFileQueue { return null; } - public MappedFile getFirstMappedFile() { MappedFile mappedFileFirst = null; @@ -515,14 +504,13 @@ public class MappedFileQueue { return findMappedFileByOffset(offset, false); } - public long getMappedMemorySize() { long size = 0; Object[] mfs = this.copyMappedFiles(0); if (mfs != null) { for (Object mf : mfs) { - if (((ReferenceResource) mf).isAvailable()) { + if (((ReferenceResource)mf).isAvailable()) { size += this.mappedFileSize; } } @@ -531,7 +519,6 @@ public class MappedFileQueue { return size; } - public boolean retryDeleteFirstFile(final long intervalForcibly) { MappedFile mappedFile = this.getFirstMappedFile(); if (mappedFile != null) { @@ -554,14 +541,12 @@ public class MappedFileQueue { return false; } - public void shutdown(final long intervalForcibly) { for (MappedFile mf : this.mappedFiles) { mf.shutdown(intervalForcibly); } } - public void destroy() { for (MappedFile mf : this.mappedFiles) { mf.destroy(1000 * 3); @@ -576,27 +561,22 @@ public class MappedFileQueue { } } - public long getFlushedWhere() { return flushedWhere; } - public void setFlushedWhere(long flushedWhere) { this.flushedWhere = flushedWhere; } - public long getStoreTimestamp() { return storeTimestamp; } - public List<MappedFile> getMappedFiles() { return mappedFiles; } - public int getMappedFileSize() { return mappedFileSize; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java index 25304b9..ebc57a7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java index dabb418..4cbdacf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java @@ -6,20 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.message.MessageExt; - public class MessageExtBrokerInner extends MessageExt { private static final long serialVersionUID = 7256001576878700634L; private String propertiesString; @@ -32,22 +31,18 @@ public class MessageExtBrokerInner extends MessageExt { return tags.hashCode(); } - public String getPropertiesString() { return propertiesString; } - public void setPropertiesString(String propertiesString) { this.propertiesString = propertiesString; } - public long getTagsCode() { return tagsCode; } - public void setTagsCode(long tagsCode) { this.tagsCode = tagsCode; }
