http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java index cb7304e..4122857 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; - public interface MessageFilter { boolean isMessageMatched(final SubscriptionData subscriptionData, final Long tagsCode); }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 328a700..9ed74e6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -6,114 +6,87 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; - import java.util.HashMap; import java.util.Set; - +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; public interface MessageStore { boolean load(); - void start() throws Exception; - void shutdown(); - void destroy(); PutMessageResult putMessage(final MessageExtBrokerInner msg); - GetMessageResult getMessage(final String group, final String topic, final int queueId, - final long offset, final int maxMsgNums, final SubscriptionData subscriptionData); - + final long offset, final int maxMsgNums, final SubscriptionData subscriptionData); long getMaxOffsetInQuque(final String topic, final int queueId); - long getMinOffsetInQuque(final String topic, final int queueId); - long getCommitLogOffsetInQueue(final String topic, final int queueId, final long cqOffset); - long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp); - MessageExt lookMessageByOffset(final long commitLogOffset); - SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset); - SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize); String getRunningDataInfo(); - HashMap<String, String> getRuntimeInfo(); - long getMaxPhyOffset(); - long getMinPhyOffset(); - long getEarliestMessageTime(final String topic, final int queueId); - long getEarliestMessageTime(); + long getEarliestMessageTime(); long getMessageStoreTimeStamp(final String topic, final int queueId, final long offset); - long getMessageTotalInQueue(final String topic, final int queueId); SelectMappedBufferResult getCommitLogData(final long offset); - boolean appendToCommitLog(final long startOffset, final byte[] data); void excuteDeleteFilesManualy(); - QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, - final long begin, final long end); - + final long begin, final long end); void updateHaMasterAddress(final String newAddr); - long slaveFallBehindMuch(); - long now(); - int cleanUnusedTopic(final Set<String> topics); - void cleanExpiredConsumerQueue(); - boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset); - long dispatchBehindBytes(); long flush(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java index d36e7ee..b6ede7e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; @@ -20,42 +20,35 @@ public class PutMessageResult { private PutMessageStatus putMessageStatus; private AppendMessageResult appendMessageResult; - public PutMessageResult(PutMessageStatus putMessageStatus, AppendMessageResult appendMessageResult) { this.putMessageStatus = putMessageStatus; this.appendMessageResult = appendMessageResult; } - public boolean isOk() { return this.appendMessageResult != null && this.appendMessageResult.isOk(); } - public AppendMessageResult getAppendMessageResult() { return appendMessageResult; } - public void setAppendMessageResult(AppendMessageResult appendMessageResult) { this.appendMessageResult = appendMessageResult; } - public PutMessageStatus getPutMessageStatus() { return putMessageStatus; } - public void setPutMessageStatus(PutMessageStatus putMessageStatus) { this.putMessageStatus = putMessageStatus; } - @Override public String toString() { return "PutMessageResult [putMessageStatus=" + putMessageStatus + ", appendMessageResult=" - + appendMessageResult + "]"; + + appendMessageResult + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java index 1b6ec9a..abab63f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java index 1f300a2..8b45b7c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; @@ -20,11 +20,10 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; - public class QueryMessageResult { private final List<SelectMappedBufferResult> messageMapedList = - new ArrayList<SelectMappedBufferResult>(100); + new ArrayList<SelectMappedBufferResult>(100); private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100); private long indexLastUpdateTimestamp; @@ -32,46 +31,38 @@ public class QueryMessageResult { private int bufferTotalSize = 0; - public void addMessage(final SelectMappedBufferResult mapedBuffer) { this.messageMapedList.add(mapedBuffer); this.messageBufferList.add(mapedBuffer.getByteBuffer()); this.bufferTotalSize += mapedBuffer.getSize(); } - public void release() { for (SelectMappedBufferResult select : this.messageMapedList) { select.release(); } } - public long getIndexLastUpdateTimestamp() { return indexLastUpdateTimestamp; } - public void setIndexLastUpdateTimestamp(long indexLastUpdateTimestamp) { this.indexLastUpdateTimestamp = indexLastUpdateTimestamp; } - public long getIndexLastUpdatePhyoffset() { return indexLastUpdatePhyoffset; } - public void setIndexLastUpdatePhyoffset(long indexLastUpdatePhyoffset) { this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset; } - public List<ByteBuffer> getMessageBufferList() { return messageBufferList; } - public int getBufferTotalSize() { return bufferTotalSize; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java b/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java index 6563232..b8b75da 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java +++ b/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java @@ -6,26 +6,24 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; import java.util.concurrent.atomic.AtomicLong; - public abstract class ReferenceResource { protected final AtomicLong refCount = new AtomicLong(1); protected volatile boolean available = true; protected volatile boolean cleanupOver = false; private volatile long firstShutdownTimestamp = 0; - public synchronized boolean hold() { if (this.isAvailable()) { if (this.refCount.getAndIncrement() > 0) { @@ -38,21 +36,16 @@ public abstract class ReferenceResource { return false; } - public boolean isAvailable() { return this.available; } - - public void shutdown(final long intervalForcibly) { if (this.available) { this.available = false; this.firstShutdownTimestamp = System.currentTimeMillis(); this.release(); - } - - else if (this.getRefCount() > 0) { + } else if (this.getRefCount() > 0) { if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { this.refCount.set(-1000 - this.getRefCount()); this.release(); @@ -77,7 +70,6 @@ public abstract class ReferenceResource { public abstract boolean cleanup(final long currentRef); - public boolean isCleanupOver() { return this.refCount.get() <= 0 && this.cleanupOver; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java index 8e7f29f..52c269c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java +++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; @@ -29,16 +29,13 @@ public class RunningFlags { private static final int DISK_FULL_BIT = 1 << 4; private volatile int flagBits = 0; - public RunningFlags() { } - public int getFlagBits() { return flagBits; } - public boolean getAndMakeReadable() { boolean result = this.isReadable(); if (!result) { @@ -47,7 +44,6 @@ public class RunningFlags { return result; } - public boolean isReadable() { if ((this.flagBits & NOT_READABLE_BIT) == 0) { return true; @@ -56,7 +52,6 @@ public class RunningFlags { return false; } - public boolean getAndMakeNotReadable() { boolean result = this.isReadable(); if (result) { @@ -65,7 +60,6 @@ public class RunningFlags { return result; } - public boolean getAndMakeWriteable() { boolean result = this.isWriteable(); if (!result) { @@ -74,7 +68,6 @@ public class RunningFlags { return result; } - public boolean isWriteable() { if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) { return true; @@ -83,7 +76,6 @@ public class RunningFlags { return false; } - public boolean getAndMakeNotWriteable() { boolean result = this.isWriteable(); if (result) { @@ -92,12 +84,10 @@ public class RunningFlags { return result; } - public void makeLogicsQueueError() { this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT; } - public boolean isLogicsQueueError() { if ((this.flagBits & WRITE_LOGICS_QUEUE_ERROR_BIT) == WRITE_LOGICS_QUEUE_ERROR_BIT) { return true; @@ -106,12 +96,10 @@ public class RunningFlags { return false; } - public void makeIndexFileError() { this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT; } - public boolean isIndexFileError() { if ((this.flagBits & WRITE_INDEX_FILE_ERROR_BIT) == WRITE_INDEX_FILE_ERROR_BIT) { return true; @@ -120,14 +108,12 @@ public class RunningFlags { return false; } - public boolean getAndMakeDiskFull() { boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT); this.flagBits |= DISK_FULL_BIT; return result; } - public boolean getAndMakeDiskOK() { boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT); this.flagBits &= ~DISK_FULL_BIT; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java index 218f809..ddd9383 100644 --- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; import java.nio.ByteBuffer; - public class SelectMappedBufferResult { private final long startOffset; @@ -29,7 +28,6 @@ public class SelectMappedBufferResult { private MappedFile mappedFile; - public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, int size, MappedFile mappedFile) { this.startOffset = startOffset; this.byteBuffer = byteBuffer; @@ -37,28 +35,23 @@ public class SelectMappedBufferResult { this.mappedFile = mappedFile; } - public ByteBuffer getByteBuffer() { return byteBuffer; } - public int getSize() { return size; } - public void setSize(final int s) { this.size = s; this.byteBuffer.limit(this.size); } - public MappedFile getMappedFile() { return mappedFile; } - // @Override // protected void finalize() { // if (this.mappedFile != null) { @@ -66,7 +59,6 @@ public class SelectMappedBufferResult { // } // } - public synchronized void release() { if (this.mappedFile != null) { this.mappedFile.release(); @@ -74,7 +66,6 @@ public class SelectMappedBufferResult { } } - public long getStartOffset() { return startOffset; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java index 7fb5158..49a342e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java @@ -6,28 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; - +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StoreCheckpoint { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -38,7 +36,6 @@ public class StoreCheckpoint { private volatile long logicsMsgTimestamp = 0; private volatile long indexMsgTimestamp = 0; - public StoreCheckpoint(final String scpPath) throws IOException { File file = new File(scpPath); MappedFile.ensureDirOK(file.getParent()); @@ -55,17 +52,16 @@ public class StoreCheckpoint { this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16); log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", " - + UtilAll.timeMillisToHumanString(this.physicMsgTimestamp)); + + UtilAll.timeMillisToHumanString(this.physicMsgTimestamp)); log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", " - + UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp)); + + UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp)); log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", " - + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp)); + + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp)); } else { log.info("store checkpoint file not exists, " + scpPath); } } - public void shutdown() { this.flush(); @@ -79,7 +75,6 @@ public class StoreCheckpoint { } } - public void flush() { this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp); this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp); @@ -87,36 +82,29 @@ public class StoreCheckpoint { this.mappedByteBuffer.force(); } - public long getPhysicMsgTimestamp() { return physicMsgTimestamp; } - public void setPhysicMsgTimestamp(long physicMsgTimestamp) { this.physicMsgTimestamp = physicMsgTimestamp; } - public long getLogicsMsgTimestamp() { return logicsMsgTimestamp; } - public void setLogicsMsgTimestamp(long logicsMsgTimestamp) { this.logicsMsgTimestamp = logicsMsgTimestamp; } - public long getMinTimestampIndex() { return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp); } - public long getMinTimestamp() { long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp); - // fixed https://github.org/apache/rocketmqissues/467 min -= 1000 * 3; if (min < 0) @@ -125,12 +113,10 @@ public class StoreCheckpoint { return min; } - public long getIndexMsgTimestamp() { return indexMsgTimestamp; } - public void setIndexMsgTimestamp(long indexMsgTimestamp) { this.indexMsgTimestamp = indexMsgTimestamp; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java index 91b70fb..d4ba147 100644 --- a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java +++ b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java @@ -16,11 +16,6 @@ */ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.constant.LoggerName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.text.MessageFormat; import java.util.HashMap; import java.util.LinkedList; @@ -28,7 +23,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; - +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.constant.LoggerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StoreStatsService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -36,7 +34,7 @@ public class StoreStatsService extends ServiceThread { private static final int FREQUENCY_OF_SAMPLING = 1000; private static final int MAX_RECORDS_OF_SAMPLING = 60 * 10; - private static final String[] PUT_MESSAGE_ENTIRE_TIME_MAX_DESC = new String[]{ + private static final String[] PUT_MESSAGE_ENTIRE_TIME_MAX_DESC = new String[] { "[<=0ms]", "[0~10ms]", "[10~50ms]", "[50~100ms]", "[100~200ms]", "[200~500ms]", "[500ms~1s]", "[1~2s]", "[2~3s]", "[3~4s]", "[4~5s]", "[5~10s]", "[10s~]", }; @@ -45,9 +43,9 @@ public class StoreStatsService extends ServiceThread { private final AtomicLong putMessageFailedTimes = new AtomicLong(0); private final Map<String, AtomicLong> putMessageTopicTimesTotal = - new ConcurrentHashMap<String, AtomicLong>(128); + new ConcurrentHashMap<String, AtomicLong>(128); private final Map<String, AtomicLong> putMessageTopicSizeTotal = - new ConcurrentHashMap<String, AtomicLong>(128); + new ConcurrentHashMap<String, AtomicLong>(128); private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0); private final AtomicLong getMessageTransferedMsgCount = new AtomicLong(0); @@ -71,7 +69,6 @@ public class StoreStatsService extends ServiceThread { private ReentrantLock lockSampling = new ReentrantLock(); private long lastPrintTimestamp = System.currentTimeMillis(); - public StoreStatsService() { this.initPutMessageDistributeTime(); } @@ -96,7 +93,8 @@ public class StoreStatsService extends ServiceThread { public void setPutMessageEntireTimeMax(long value) { final AtomicLong[] times = this.putMessageDistributeTime; - if (null == times) return; + if (null == times) + return; // us if (value <= 0) { @@ -140,37 +138,32 @@ public class StoreStatsService extends ServiceThread { if (value > this.putMessageEntireTimeMax) { this.lockPut.lock(); this.putMessageEntireTimeMax = - value > this.putMessageEntireTimeMax ? value : this.putMessageEntireTimeMax; + value > this.putMessageEntireTimeMax ? value : this.putMessageEntireTimeMax; this.lockPut.unlock(); } } - public long getGetMessageEntireTimeMax() { return getMessageEntireTimeMax; } - public void setGetMessageEntireTimeMax(long value) { if (value > this.getMessageEntireTimeMax) { this.lockGet.lock(); this.getMessageEntireTimeMax = - value > this.getMessageEntireTimeMax ? value : this.getMessageEntireTimeMax; + value > this.getMessageEntireTimeMax ? value : this.getMessageEntireTimeMax; this.lockGet.unlock(); } } - public long getDispatchMaxBuffer() { return dispatchMaxBuffer; } - public void setDispatchMaxBuffer(long value) { this.dispatchMaxBuffer = value > this.dispatchMaxBuffer ? value : this.dispatchMaxBuffer; } - @Override public String toString() { final StringBuilder sb = new StringBuilder(1024); @@ -184,9 +177,9 @@ public class StoreStatsService extends ServiceThread { sb.append("\tputMessageTimesTotal: " + totalTimes + "\r\n"); sb.append("\tputMessageSizeTotal: " + this.getPutMessageSizeTotal() + "\r\n"); sb.append("\tputMessageDistributeTime: " + this.getPutMessageDistributeTimeStringInfo(totalTimes) - + "\r\n"); + + "\r\n"); sb.append("\tputMessageAverageSize: " + (this.getPutMessageSizeTotal() / totalTimes.doubleValue()) - + "\r\n"); + + "\r\n"); sb.append("\tdispatchMaxBuffer: " + this.dispatchMaxBuffer + "\r\n"); sb.append("\tgetMessageEntireTimeMax: " + this.getMessageEntireTimeMax + "\r\n"); sb.append("\tputTps: " + this.getPutTps() + "\r\n"); @@ -218,7 +211,7 @@ public class StoreStatsService extends ServiceThread { long hours = (time % day) / hour; long minutes = (time % hour) / minute; long seconds = (time % minute) / second; - return messageFormat.format(new Long[]{days, hours, minutes, seconds}); + return messageFormat.format(new Long[] {days, hours, minutes, seconds}); } public long getPutMessageSizeTotal() { @@ -239,11 +232,9 @@ public class StoreStatsService extends ServiceThread { sb.append(this.getPutTps(10)); sb.append(" "); - sb.append(this.getPutTps(60)); sb.append(" "); - sb.append(this.getPutTps(600)); return sb.toString(); @@ -255,11 +246,9 @@ public class StoreStatsService extends ServiceThread { sb.append(this.getGetFoundTps(10)); sb.append(" "); - sb.append(this.getGetFoundTps(60)); sb.append(" "); - sb.append(this.getGetFoundTps(600)); return sb.toString(); @@ -271,11 +260,9 @@ public class StoreStatsService extends ServiceThread { sb.append(this.getGetMissTps(10)); sb.append(" "); - sb.append(this.getGetMissTps(60)); sb.append(" "); - sb.append(this.getGetMissTps(600)); return sb.toString(); @@ -287,11 +274,9 @@ public class StoreStatsService extends ServiceThread { sb.append(this.getGetTotalTps(10)); sb.append(" "); - sb.append(this.getGetTotalTps(60)); sb.append(" "); - sb.append(this.getGetTotalTps(600)); return sb.toString(); @@ -303,11 +288,9 @@ public class StoreStatsService extends ServiceThread { sb.append(this.getGetTransferedTps(10)); sb.append(" "); - sb.append(this.getGetTransferedTps(60)); sb.append(" "); - sb.append(this.getGetTransferedTps(600)); return sb.toString(); @@ -315,7 +298,8 @@ public class StoreStatsService extends ServiceThread { private String putMessageDistributeTimeToString() { final AtomicLong[] times = this.putMessageDistributeTime; - if (null == times) return null; + if (null == times) + return null; final StringBuilder sb = new StringBuilder(); for (int i = 0; i < times.length; i++) { @@ -352,7 +336,7 @@ public class StoreStatsService extends ServiceThread { if (this.getTimesFoundList.size() > time) { CallSnapshot lastBefore = - this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1)); + this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1)); result += CallSnapshot.getTPS(lastBefore, last); } } finally { @@ -370,7 +354,7 @@ public class StoreStatsService extends ServiceThread { if (this.getTimesMissList.size() > time) { CallSnapshot lastBefore = - this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1)); + this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1)); result += CallSnapshot.getTPS(lastBefore, last); } @@ -391,7 +375,7 @@ public class StoreStatsService extends ServiceThread { if (this.getTimesFoundList.size() > time) { CallSnapshot lastBefore = - this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1)); + this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1)); found = CallSnapshot.getTPS(lastBefore, last); } } @@ -400,7 +384,7 @@ public class StoreStatsService extends ServiceThread { if (this.getTimesMissList.size() > time) { CallSnapshot lastBefore = - this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1)); + this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1)); miss = CallSnapshot.getTPS(lastBefore, last); } } @@ -420,7 +404,7 @@ public class StoreStatsService extends ServiceThread { if (this.transferedMsgCountList.size() > time) { CallSnapshot lastBefore = - this.transferedMsgCountList.get(this.transferedMsgCountList.size() - (time + 1)); + this.transferedMsgCountList.get(this.transferedMsgCountList.size() - (time + 1)); result += CallSnapshot.getTPS(lastBefore, last); } @@ -445,9 +429,9 @@ public class StoreStatsService extends ServiceThread { result.put("putMessageTimesTotal", String.valueOf(totalTimes)); result.put("putMessageSizeTotal", String.valueOf(this.getPutMessageSizeTotal())); result.put("putMessageDistributeTime", - String.valueOf(this.getPutMessageDistributeTimeStringInfo(totalTimes))); + String.valueOf(this.getPutMessageDistributeTimeStringInfo(totalTimes))); result.put("putMessageAverageSize", - String.valueOf(this.getPutMessageSizeTotal() / totalTimes.doubleValue())); + String.valueOf(this.getPutMessageSizeTotal() / totalTimes.doubleValue())); result.put("dispatchMaxBuffer", String.valueOf(this.dispatchMaxBuffer)); result.put("getMessageEntireTimeMax", String.valueOf(this.getMessageEntireTimeMax)); result.put("putTps", String.valueOf(this.getPutTps())); @@ -491,19 +475,19 @@ public class StoreStatsService extends ServiceThread { } this.getTimesFoundList.add(new CallSnapshot(System.currentTimeMillis(), - this.getMessageTimesTotalFound.get())); + this.getMessageTimesTotalFound.get())); if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { this.getTimesFoundList.removeFirst(); } this.getTimesMissList.add(new CallSnapshot(System.currentTimeMillis(), - this.getMessageTimesTotalMiss.get())); + this.getMessageTimesTotalMiss.get())); if (this.getTimesMissList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { this.getTimesMissList.removeFirst(); } this.transferedMsgCountList.add(new CallSnapshot(System.currentTimeMillis(), - this.getMessageTransferedMsgCount.get())); + this.getMessageTransferedMsgCount.get())); if (this.transferedMsgCountList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { this.transferedMsgCountList.removeFirst(); } @@ -518,14 +502,15 @@ public class StoreStatsService extends ServiceThread { this.lastPrintTimestamp = System.currentTimeMillis(); log.info("[STORETPS] put_tps {} get_found_tps {} get_miss_tps {} get_transfered_tps {}", - this.getPutTps(printTPSInterval), - this.getGetFoundTps(printTPSInterval), - this.getGetMissTps(printTPSInterval), - this.getGetTransferedTps(printTPSInterval) + this.getPutTps(printTPSInterval), + this.getGetFoundTps(printTPSInterval), + this.getGetMissTps(printTPSInterval), + this.getGetTransferedTps(printTPSInterval) ); final AtomicLong[] times = this.initPutMessageDistributeTime(); - if (null == times) return; + if (null == times) + return; final StringBuilder sb = new StringBuilder(); long totalPut = 0; @@ -544,22 +529,18 @@ public class StoreStatsService extends ServiceThread { return getMessageTimesTotalFound; } - public AtomicLong getGetMessageTimesTotalMiss() { return getMessageTimesTotalMiss; } - public AtomicLong getGetMessageTransferedMsgCount() { return getMessageTransferedMsgCount; } - public AtomicLong getPutMessageFailedTimes() { return putMessageFailedTimes; } - public AtomicLong getSinglePutMessageTopicSizeTotal(String topic) { AtomicLong rs = putMessageTopicSizeTotal.get(topic); if (null == rs) { @@ -569,7 +550,6 @@ public class StoreStatsService extends ServiceThread { return rs; } - public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) { AtomicLong rs = putMessageTopicTimesTotal.get(topic); if (null == rs) { @@ -579,12 +559,10 @@ public class StoreStatsService extends ServiceThread { return rs; } - public Map<String, AtomicLong> getPutMessageTopicTimesTotal() { return putMessageTopicTimesTotal; } - public Map<String, AtomicLong> getPutMessageTopicSizeTotal() { return putMessageTopicSizeTotal; } @@ -593,13 +571,11 @@ public class StoreStatsService extends ServiceThread { public final long timestamp; public final long callTimesTotal; - public CallSnapshot(long timestamp, long callTimesTotal) { this.timestamp = timestamp; this.callTimesTotal = callTimesTotal; } - public static double getTPS(final CallSnapshot begin, final CallSnapshot end) { long total = end.callTimesTotal - begin.callTimesTotal; Long time = end.timestamp - begin.timestamp; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java index d44720c..9cd7145 100644 --- a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java +++ b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java @@ -6,33 +6,31 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; - /** * */ public class StoreUtil { public static final long TOTAL_PHYSICAL_MEMORY_SIZE = getTotalPhysicalMemorySize(); - @SuppressWarnings("restriction") public static long getTotalPhysicalMemorySize() { long physicalTotal = 1024 * 1024 * 1024 * 24; OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean(); if (osmxb instanceof com.sun.management.OperatingSystemMXBean) { - physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize(); + physicalTotal = ((com.sun.management.OperatingSystemMXBean)osmxb).getTotalPhysicalMemorySize(); } return physicalTotal; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java index bd5b629..3e332ba 100644 --- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java +++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java @@ -16,19 +16,18 @@ */ package org.apache.rocketmq.store; +import com.sun.jna.NativeLong; +import com.sun.jna.Pointer; +import java.nio.ByteBuffer; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.util.LibC; -import com.sun.jna.NativeLong; -import com.sun.jna.Pointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.nio.ch.DirectBuffer; -import java.nio.ByteBuffer; -import java.util.Deque; -import java.util.concurrent.ConcurrentLinkedDeque; - public class TransientStorePool { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -51,7 +50,7 @@ public class TransientStorePool { for (int i = 0; i < poolSize; i++) { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); - final long address = ((DirectBuffer) byteBuffer).address(); + final long address = ((DirectBuffer)byteBuffer).address(); Pointer pointer = new Pointer(address); LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); @@ -61,7 +60,7 @@ public class TransientStorePool { public void destroy() { for (ByteBuffer byteBuffer : availableBuffers) { - final long address = ((DirectBuffer) byteBuffer).address(); + final long address = ((DirectBuffer)byteBuffer).address(); Pointer pointer = new Pointer(address); LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize)); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java b/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java index 6ebb723..bebacc9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.config; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java b/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java index 02b0ced..6a52a67 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.config; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 205adec..d1e2bfe 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -6,22 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.config; +import java.io.File; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; -import java.io.File; - - public class MessageStoreConfig { //The root directory in which the log data is kept @ImportantField @@ -30,7 +28,7 @@ public class MessageStoreConfig { //The directory in which the commitlog is kept @ImportantField private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" - + File.separator + "commitlog"; + + File.separator + "commitlog"; // CommitLog file size,default is 1G private int mapedFileSizeCommitLog = 1024 * 1024 * 1024; @@ -133,7 +131,7 @@ public class MessageStoreConfig { private boolean transientStorePoolEnable = false; private int transientStorePoolSize = 5; private boolean fastFailIfNoBufferInStorePool = false; - + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -170,119 +168,96 @@ public class MessageStoreConfig { return warmMapedFileEnable; } - public void setWarmMapedFileEnable(boolean warmMapedFileEnable) { this.warmMapedFileEnable = warmMapedFileEnable; } - public int getMapedFileSizeCommitLog() { return mapedFileSizeCommitLog; } - public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) { this.mapedFileSizeCommitLog = mapedFileSizeCommitLog; } - public int getMapedFileSizeConsumeQueue() { - int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0)); - return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE); + int factor = (int)Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0)); + return (int)(factor * ConsumeQueue.CQ_STORE_UNIT_SIZE); } - public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) { this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue; } - public int getFlushIntervalCommitLog() { return flushIntervalCommitLog; } - public void setFlushIntervalCommitLog(int flushIntervalCommitLog) { this.flushIntervalCommitLog = flushIntervalCommitLog; } - public int getFlushIntervalConsumeQueue() { return flushIntervalConsumeQueue; } - public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) { this.flushIntervalConsumeQueue = flushIntervalConsumeQueue; } - public int getPutMsgIndexHightWater() { return putMsgIndexHightWater; } - public void setPutMsgIndexHightWater(int putMsgIndexHightWater) { this.putMsgIndexHightWater = putMsgIndexHightWater; } - public int getCleanResourceInterval() { return cleanResourceInterval; } - public void setCleanResourceInterval(int cleanResourceInterval) { this.cleanResourceInterval = cleanResourceInterval; } - public int getMaxMessageSize() { return maxMessageSize; } - public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } - public boolean isCheckCRCOnRecover() { return checkCRCOnRecover; } - public boolean getCheckCRCOnRecover() { return checkCRCOnRecover; } - public void setCheckCRCOnRecover(boolean checkCRCOnRecover) { this.checkCRCOnRecover = checkCRCOnRecover; } - public String getStorePathCommitLog() { return storePathCommitLog; } - public void setStorePathCommitLog(String storePathCommitLog) { this.storePathCommitLog = storePathCommitLog; } - public String getDeleteWhen() { return deleteWhen; } - public void setDeleteWhen(String deleteWhen) { this.deleteWhen = deleteWhen; } - public int getDiskMaxUsedSpaceRatio() { if (this.diskMaxUsedSpaceRatio < 10) return 10; @@ -293,361 +268,294 @@ public class MessageStoreConfig { return diskMaxUsedSpaceRatio; } - public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) { this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio; } - public int getDeleteCommitLogFilesInterval() { return deleteCommitLogFilesInterval; } - public void setDeleteCommitLogFilesInterval(int deleteCommitLogFilesInterval) { this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval; } - public int getDeleteConsumeQueueFilesInterval() { return deleteConsumeQueueFilesInterval; } - public void setDeleteConsumeQueueFilesInterval(int deleteConsumeQueueFilesInterval) { this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval; } - public int getMaxTransferBytesOnMessageInMemory() { return maxTransferBytesOnMessageInMemory; } - public void setMaxTransferBytesOnMessageInMemory(int maxTransferBytesOnMessageInMemory) { this.maxTransferBytesOnMessageInMemory = maxTransferBytesOnMessageInMemory; } - public int getMaxTransferCountOnMessageInMemory() { return maxTransferCountOnMessageInMemory; } - public void setMaxTransferCountOnMessageInMemory(int maxTransferCountOnMessageInMemory) { this.maxTransferCountOnMessageInMemory = maxTransferCountOnMessageInMemory; } - public int getMaxTransferBytesOnMessageInDisk() { return maxTransferBytesOnMessageInDisk; } - public void setMaxTransferBytesOnMessageInDisk(int maxTransferBytesOnMessageInDisk) { this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk; } - public int getMaxTransferCountOnMessageInDisk() { return maxTransferCountOnMessageInDisk; } - public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) { this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk; } - public int getFlushCommitLogLeastPages() { return flushCommitLogLeastPages; } - public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) { this.flushCommitLogLeastPages = flushCommitLogLeastPages; } - public int getFlushConsumeQueueLeastPages() { return flushConsumeQueueLeastPages; } - public void setFlushConsumeQueueLeastPages(int flushConsumeQueueLeastPages) { this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages; } - public int getFlushCommitLogThoroughInterval() { return flushCommitLogThoroughInterval; } - public void setFlushCommitLogThoroughInterval(int flushCommitLogThoroughInterval) { this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval; } - public int getFlushConsumeQueueThoroughInterval() { return flushConsumeQueueThoroughInterval; } - public void setFlushConsumeQueueThoroughInterval(int flushConsumeQueueThoroughInterval) { this.flushConsumeQueueThoroughInterval = flushConsumeQueueThoroughInterval; } - public int getDestroyMapedFileIntervalForcibly() { return destroyMapedFileIntervalForcibly; } - public void setDestroyMapedFileIntervalForcibly(int destroyMapedFileIntervalForcibly) { this.destroyMapedFileIntervalForcibly = destroyMapedFileIntervalForcibly; } - public int getFileReservedTime() { return fileReservedTime; } - public void setFileReservedTime(int fileReservedTime) { this.fileReservedTime = fileReservedTime; } - public int getRedeleteHangedFileInterval() { return redeleteHangedFileInterval; } - public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) { this.redeleteHangedFileInterval = redeleteHangedFileInterval; } - public int getAccessMessageInMemoryMaxRatio() { return accessMessageInMemoryMaxRatio; } - public void setAccessMessageInMemoryMaxRatio(int accessMessageInMemoryMaxRatio) { this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio; } - public boolean isMessageIndexEnable() { return messageIndexEnable; } - public void setMessageIndexEnable(boolean messageIndexEnable) { this.messageIndexEnable = messageIndexEnable; } - public int getMaxHashSlotNum() { return maxHashSlotNum; } - public void setMaxHashSlotNum(int maxHashSlotNum) { this.maxHashSlotNum = maxHashSlotNum; } - public int getMaxIndexNum() { return maxIndexNum; } - public void setMaxIndexNum(int maxIndexNum) { this.maxIndexNum = maxIndexNum; } - public int getMaxMsgsNumBatch() { return maxMsgsNumBatch; } - public void setMaxMsgsNumBatch(int maxMsgsNumBatch) { this.maxMsgsNumBatch = maxMsgsNumBatch; } - public int getHaListenPort() { return haListenPort; } - public void setHaListenPort(int haListenPort) { this.haListenPort = haListenPort; } - public int getHaSendHeartbeatInterval() { return haSendHeartbeatInterval; } - public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) { this.haSendHeartbeatInterval = haSendHeartbeatInterval; } - public int getHaHousekeepingInterval() { return haHousekeepingInterval; } - public void setHaHousekeepingInterval(int haHousekeepingInterval) { this.haHousekeepingInterval = haHousekeepingInterval; } - public BrokerRole getBrokerRole() { return brokerRole; } - public void setBrokerRole(BrokerRole brokerRole) { - this.brokerRole = brokerRole; - } - public void setBrokerRole(String brokerRole) { this.brokerRole = BrokerRole.valueOf(brokerRole); } + public void setBrokerRole(BrokerRole brokerRole) { + this.brokerRole = brokerRole; + } + public int getHaTransferBatchSize() { return haTransferBatchSize; } - public void setHaTransferBatchSize(int haTransferBatchSize) { this.haTransferBatchSize = haTransferBatchSize; } - public int getHaSlaveFallbehindMax() { return haSlaveFallbehindMax; } - public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) { this.haSlaveFallbehindMax = haSlaveFallbehindMax; } - public FlushDiskType getFlushDiskType() { return flushDiskType; } - public void setFlushDiskType(FlushDiskType flushDiskType) { - this.flushDiskType = flushDiskType; - } - public void setFlushDiskType(String type) { this.flushDiskType = FlushDiskType.valueOf(type); } + public void setFlushDiskType(FlushDiskType flushDiskType) { + this.flushDiskType = flushDiskType; + } + public int getSyncFlushTimeout() { return syncFlushTimeout; } - public void setSyncFlushTimeout(int syncFlushTimeout) { this.syncFlushTimeout = syncFlushTimeout; } - public String getHaMasterAddress() { return haMasterAddress; } - public void setHaMasterAddress(String haMasterAddress) { this.haMasterAddress = haMasterAddress; } - public String getMessageDelayLevel() { return messageDelayLevel; } - public void setMessageDelayLevel(String messageDelayLevel) { this.messageDelayLevel = messageDelayLevel; } - public long getFlushDelayOffsetInterval() { return flushDelayOffsetInterval; } - public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) { this.flushDelayOffsetInterval = flushDelayOffsetInterval; } - public boolean isCleanFileForciblyEnable() { return cleanFileForciblyEnable; } - public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) { this.cleanFileForciblyEnable = cleanFileForciblyEnable; } - public boolean isMessageIndexSafe() { return messageIndexSafe; } - public void setMessageIndexSafe(boolean messageIndexSafe) { this.messageIndexSafe = messageIndexSafe; } - public boolean isFlushCommitLogTimed() { return flushCommitLogTimed; } - public void setFlushCommitLogTimed(boolean flushCommitLogTimed) { this.flushCommitLogTimed = flushCommitLogTimed; } - public String getStorePathRootDir() { return storePathRootDir; } - public void setStorePathRootDir(String storePathRootDir) { this.storePathRootDir = storePathRootDir; } - public int getFlushLeastPagesWhenWarmMapedFile() { return flushLeastPagesWhenWarmMapedFile; } - public void setFlushLeastPagesWhenWarmMapedFile(int flushLeastPagesWhenWarmMapedFile) { this.flushLeastPagesWhenWarmMapedFile = flushLeastPagesWhenWarmMapedFile; } - public boolean isOffsetCheckInSlave() { return offsetCheckInSlave; } - public void setOffsetCheckInSlave(boolean offsetCheckInSlave) { this.offsetCheckInSlave = offsetCheckInSlave; } @@ -666,7 +574,7 @@ public class MessageStoreConfig { */ public boolean isTransientStorePoolEnable() { return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() - && BrokerRole.SLAVE != getBrokerRole(); + && BrokerRole.SLAVE != getBrokerRole(); } public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java index 8796436..d1cd7de 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java @@ -6,51 +6,44 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.config; import java.io.File; - public class StorePathConfigHelper { public static String getStorePathConsumeQueue(final String rootDir) { return rootDir + File.separator + "consumequeue"; } - public static String getStorePathIndex(final String rootDir) { return rootDir + File.separator + "index"; } - public static String getStoreCheckpoint(final String rootDir) { return rootDir + File.separator + "checkpoint"; } - public static String getAbortFile(final String rootDir) { return rootDir + File.separator + "abort"; } - public static String getDelayOffsetStorePath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "delayOffset.json"; } - public static String getTranStateTableStorePath(final String rootDir) { return rootDir + File.separator + "transaction" + File.separator + "statetable"; } - public static String getTranRedoLogStorePath(final String rootDir) { return rootDir + File.separator + "transaction" + File.separator + "redolog"; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index e8965d3..a601e81 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -6,16 +6,21 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.ha; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.remoting.common.RemotingUtil; @@ -23,13 +28,6 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; - - public class HAConnection { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final HAService haService; @@ -41,7 +39,6 @@ public class HAConnection { private volatile long slaveRequestOffset = -1; private volatile long slaveAckOffset = -1; - public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException { this.haService = haService; this.socketChannel = socketChannel; @@ -56,20 +53,17 @@ public class HAConnection { this.haService.getConnectionCount().incrementAndGet(); } - public void start() { this.readSocketService.start(); this.writeSocketService.start(); } - public void shutdown() { this.writeSocketService.shutdown(true); this.readSocketService.shutdown(true); this.close(); } - public void close() { if (this.socketChannel != null) { try { @@ -80,7 +74,6 @@ public class HAConnection { } } - public SocketChannel getSocketChannel() { return socketChannel; } @@ -97,7 +90,6 @@ public class HAConnection { private int processPostion = 0; private volatile long lastReadTimestamp = System.currentTimeMillis(); - public ReadSocketService(final SocketChannel socketChannel) throws IOException { this.selector = RemotingUtil.openSelector(); this.socketChannel = socketChannel; @@ -105,7 +97,6 @@ public class HAConnection { this.thread.setDaemon(true); } - @Override public void run() { HAConnection.log.info(this.getServiceName() + " service started"); @@ -119,7 +110,6 @@ public class HAConnection { break; } - long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval); @@ -135,10 +125,8 @@ public class HAConnection { writeSocketService.makeStop(); - haService.removeConnection(HAConnection.this); - HAConnection.this.haService.getConnectionCount().decrementAndGet(); SelectionKey sk = this.socketChannel.keyFor(this.selector); @@ -180,14 +168,12 @@ public class HAConnection { long readOffset = this.byteBufferRead.getLong(pos - 8); this.processPostion = pos; - HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0) { HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } - HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); } } else if (readSize == 0) { @@ -223,7 +209,6 @@ public class HAConnection { private boolean lastWriteOver = true; private long lastWriteTimestamp = System.currentTimeMillis(); - public WriteSocketService(final SocketChannel socketChannel) throws IOException { this.selector = RemotingUtil.openSelector(); this.socketChannel = socketChannel; @@ -231,7 +216,6 @@ public class HAConnection { this.thread.setDaemon(true); } - @Override public void run() { HAConnection.log.info(this.getServiceName() + " service started"); @@ -245,15 +229,13 @@ public class HAConnection { continue; } - - if (-1 == this.nextTransferFromWhere) { if (0 == HAConnection.this.slaveRequestOffset) { long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); masterOffset = - masterOffset - - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() - .getMapedFileSizeCommitLog()); + masterOffset + - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() + .getMapedFileSizeCommitLog()); if (masterOffset < 0) { masterOffset = 0; @@ -265,16 +247,16 @@ public class HAConnection { } log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr - + "], and slave request " + HAConnection.this.slaveRequestOffset); + + "], and slave request " + HAConnection.this.slaveRequestOffset); } if (this.lastWriteOver) { long interval = - HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; + HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() - .getHaSendHeartbeatInterval()) { + .getHaSendHeartbeatInterval()) { // Build Header this.byteBufferHeader.position(0); @@ -287,16 +269,14 @@ public class HAConnection { if (!this.lastWriteOver) continue; } - } - - else { + } else { this.lastWriteOver = this.transferData(); if (!this.lastWriteOver) continue; } SelectMappedBufferResult selectResult = - HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); + HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); if (selectResult != null) { int size = selectResult.getSize(); if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { @@ -328,7 +308,6 @@ public class HAConnection { } } - if (this.selectMappedBufferResult != null) { this.selectMappedBufferResult.release(); } @@ -337,7 +316,6 @@ public class HAConnection { readSocketService.makeStop(); - haService.removeConnection(HAConnection.this); SelectionKey sk = this.socketChannel.keyFor(this.selector); @@ -355,7 +333,6 @@ public class HAConnection { HAConnection.log.info(this.getServiceName() + " service end"); } - /** */ @@ -409,13 +386,11 @@ public class HAConnection { return result; } - @Override public String getServiceName() { return WriteSocketService.class.getSimpleName(); } - @Override public void shutdown() { super.shutdown();