http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 84a3af6..82fe8f4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -16,19 +16,15 @@ */ package org.apache.rocketmq.store.ha; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.CommitLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.*; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -36,7 +32,13 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.store.CommitLog; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HAService { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -56,38 +58,33 @@ public class HAService { private final HAClient haClient; - public HAService(final DefaultMessageStore defaultMessageStore) throws IOException { this.defaultMessageStore = defaultMessageStore; this.acceptSocketService = - new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort()); + new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort()); this.groupTransferService = new GroupTransferService(); this.haClient = new HAClient(); } - public void updateMasterAddress(final String newAddr) { if (this.haClient != null) { this.haClient.updateMasterAddress(newAddr); } } - public void putRequest(final CommitLog.GroupCommitRequest request) { this.groupTransferService.putRequest(request); } - public boolean isSlaveOK(final long masterPutWhere) { boolean result = this.connectionCount.get() > 0; result = - result - && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore - .getMessageStoreConfig().getHaSlaveFallbehindMax()); + result + && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore + .getMessageStoreConfig().getHaSlaveFallbehindMax()); return result; } - /** */ @@ -103,12 +100,10 @@ public class HAService { } } - public AtomicInteger getConnectionCount() { return connectionCount; } - // public void notifyTransferSome() { // this.groupTransferService.notifyTransferSome(); // } @@ -120,21 +115,18 @@ public class HAService { this.haClient.start(); } - public void addConnection(final HAConnection conn) { synchronized (this.connectionList) { this.connectionList.add(conn); } } - public void removeConnection(final HAConnection conn) { synchronized (this.connectionList) { this.connectionList.remove(conn); } } - public void shutdown() { this.haClient.shutdown(); this.acceptSocketService.shutdown(true); @@ -142,7 +134,6 @@ public class HAService { this.groupTransferService.shutdown(); } - public void destroyConnections() { synchronized (this.connectionList) { for (HAConnection c : this.connectionList) { @@ -153,12 +144,10 @@ public class HAService { } } - public DefaultMessageStore getDefaultMessageStore() { return defaultMessageStore; } - public WaitNotifyObject getWaitNotifyObject() { return waitNotifyObject; } @@ -171,9 +160,9 @@ public class HAService { * Listens to slave connections to create {@link HAConnection}. */ class AcceptSocketService extends ServiceThread { + private final SocketAddress socketAddressListen; private ServerSocketChannel serverSocketChannel; private Selector selector; - private final SocketAddress socketAddressListen; public AcceptSocketService(final int port) { this.socketAddressListen = new InetSocketAddress(port); @@ -181,6 +170,7 @@ public class HAService { /** * Starts listening to slave connections. + * * @throws Exception If fails. */ public void beginAccept() throws Exception { @@ -199,8 +189,7 @@ public class HAService { try { this.serverSocketChannel.close(); this.selector.close(); - } - catch (IOException e) { + } catch (IOException e) { log.error("AcceptSocketService shutdown exception", e); } } @@ -218,11 +207,11 @@ public class HAService { if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { - SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); + SocketChannel sc = ((ServerSocketChannel)k.channel()).accept(); if (sc != null) { HAService.log.info("HAService receive new connection, " - + sc.socket().getRemoteSocketAddress()); + + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = new HAConnection(HAService.this, sc); @@ -264,7 +253,6 @@ public class HAService { private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>(); private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>(); - public void putRequest(final CommitLog.GroupCommitRequest request) { synchronized (this) { this.requestsWrite.add(request); @@ -274,19 +262,16 @@ public class HAService { } } - public void notifyTransferSome() { this.notifyTransferObject.wakeup(); } - private void swapRequests() { List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } - private void doWaitTransfer() { if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) { @@ -307,7 +292,6 @@ public class HAService { } } - public void run() { log.info(this.getServiceName() + " service started"); @@ -323,13 +307,11 @@ public class HAService { log.info(this.getServiceName() + " service end"); } - @Override protected void onWaitEnd() { this.swapRequests(); } - @Override public String getServiceName() { return GroupTransferService.class.getSimpleName(); @@ -349,12 +331,10 @@ public class HAService { private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); - public HAClient() throws IOException { this.selector = RemotingUtil.openSelector(); } - public void updateMasterAddress(final String newAddr) { String currentAddr = this.masterAddress.get(); if (currentAddr == null || !currentAddr.equals(newAddr)) { @@ -363,17 +343,15 @@ public class HAService { } } - private boolean isTimeToReportOffset() { long interval = - HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp; + HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp; boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig() - .getHaSendHeartbeatInterval(); + .getHaSendHeartbeatInterval(); return needHeart; } - private boolean reportSlaveMaxOffset(final long maxOffset) { this.reportOffset.position(0); this.reportOffset.limit(8); @@ -386,7 +364,7 @@ public class HAService { this.socketChannel.write(this.reportOffset); } catch (IOException e) { log.error(this.getServiceName() - + "reportSlaveMaxOffset this.socketChannel.write exception", e); + + "reportSlaveMaxOffset this.socketChannel.write exception", e); return false; } } @@ -394,7 +372,6 @@ public class HAService { return !this.reportOffset.hasRemaining(); } - // private void reallocateByteBuffer() { // ByteBuffer bb = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); // int remain = this.byteBufferRead.limit() - this.dispatchPostion; @@ -423,14 +400,12 @@ public class HAService { this.dispatchPostion = 0; } - private void swapByteBuffer() { ByteBuffer tmp = this.byteBufferRead; this.byteBufferRead = this.byteBufferBackup; this.byteBufferBackup = tmp; } - private boolean processReadEvent() { int readSizeZeroTimes = 0; while (this.byteBufferRead.hasRemaining()) { @@ -462,7 +437,6 @@ public class HAService { return true; } - private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position(); @@ -475,22 +449,19 @@ public class HAService { long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); - if (slavePhyOffset != 0) { if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " - + slavePhyOffset + " MASTER: " + masterPhyOffset); + + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } - if (diff >= (msgHeaderSize + bodySize)) { byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); this.byteBufferRead.get(bodyData); - HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); this.byteBufferRead.position(readSocketPos); @@ -514,7 +485,6 @@ public class HAService { return true; } - private boolean reportSlaveMaxOffsetPlus() { boolean result = true; long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); @@ -530,7 +500,6 @@ public class HAService { return result; } - private boolean connectMaster() throws ClosedChannelException { if (null == socketChannel) { String addr = this.masterAddress.get(); @@ -553,7 +522,6 @@ public class HAService { return this.socketChannel != null; } - private void closeMaster() { if (null != this.socketChannel) { try { @@ -581,7 +549,6 @@ public class HAService { } } - @Override public void run() { log.info(this.getServiceName() + " service started"); @@ -597,10 +564,8 @@ public class HAService { } } - this.selector.select(1000); - boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); @@ -610,14 +575,13 @@ public class HAService { continue; } - long interval = - HAService.this.getDefaultMessageStore().getSystemClock().now() - - this.lastWriteTimestamp; + HAService.this.getDefaultMessageStore().getSystemClock().now() + - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() - .getHaHousekeepingInterval()) { + .getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress - + "] expired, " + interval); + + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } @@ -633,7 +597,6 @@ public class HAService { log.info(this.getServiceName() + " service end"); } - // // private void disableWriteFlag() { // if (this.socketChannel != null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java index c059e10..a96af5e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java @@ -6,27 +6,25 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.ha; import java.util.HashMap; - public class WaitNotifyObject { protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable = - new HashMap<Long, Boolean>(16); + new HashMap<Long, Boolean>(16); protected volatile boolean hasNotified = false; - public void wakeup() { synchronized (this) { if (!this.hasNotified) { @@ -36,7 +34,6 @@ public class WaitNotifyObject { } } - protected void waitForRunning(long interval) { synchronized (this) { if (this.hasNotified) { @@ -56,7 +53,6 @@ public class WaitNotifyObject { } } - protected void onWaitEnd() { } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index d6a223d..de1a31d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -6,28 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.index; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.store.MappedFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.util.List; - +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.store.MappedFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IndexFile { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -41,11 +39,10 @@ public class IndexFile { private final MappedByteBuffer mappedByteBuffer; private final IndexHeader indexHeader; - public IndexFile(final String fileName, final int hashSlotNum, final int indexNum, - final long endPhyOffset, final long endTimestamp) throws IOException { + final long endPhyOffset, final long endTimestamp) throws IOException { int fileTotalSize = - IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); + IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); this.mappedFile = new MappedFile(fileName, fileTotalSize); this.fileChannel = this.mappedFile.getFileChannel(); this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer(); @@ -66,17 +63,14 @@ public class IndexFile { } } - public String getFileName() { return this.mappedFile.getFileName(); } - public void load() { this.indexHeader.load(); } - public void flush() { long beginTime = System.currentTimeMillis(); if (this.mappedFile.hold()) { @@ -114,10 +108,8 @@ public class IndexFile { long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); - timeDiff = timeDiff / 1000; - if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { @@ -127,19 +119,16 @@ public class IndexFile { } int absIndexPos = - IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize - + this.indexHeader.getIndexCount() * indexSize; - + IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + + this.indexHeader.getIndexCount() * indexSize; this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); - this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); + this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int)timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); - this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); - if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); @@ -192,22 +181,22 @@ public class IndexFile { public boolean isTimeMatched(final long begin, final long end) { boolean result = - begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp(); + begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp(); result = - result - || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader - .getEndTimestamp()); + result + || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader + .getEndTimestamp()); result = - result - || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader - .getEndTimestamp()); + result + || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader + .getEndTimestamp()); return result; } public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, - final long begin, final long end, boolean lock) { + final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; @@ -227,25 +216,24 @@ public class IndexFile { // } if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() - || this.indexHeader.getIndexCount() <= 1) { + || this.indexHeader.getIndexCount() <= 1) { // TODO NOTFOUND } else { - for (int nextIndexToRead = slotValue;;) { + for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } int absIndexPos = - IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize - + nextIndexToRead * indexSize; + IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); - long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); + long timeDiff = (long)this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); - if (timeDiff < 0) { break; } @@ -260,8 +248,8 @@ public class IndexFile { } if (prevIndexRead <= invalidIndex - || prevIndexRead > this.indexHeader.getIndexCount() - || prevIndexRead == nextIndexToRead || timeRead < begin) { + || prevIndexRead > this.indexHeader.getIndexCount() + || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java index 130f08e..0c00abd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java @@ -20,7 +20,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - /** * @@ -43,12 +42,10 @@ public class IndexHeader { private AtomicInteger indexCount = new AtomicInteger(1); - public IndexHeader(final ByteBuffer byteBuffer) { this.byteBuffer = byteBuffer; } - public void load() { this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex)); this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex)); @@ -72,67 +69,55 @@ public class IndexHeader { this.byteBuffer.putInt(indexCountIndex, this.indexCount.get()); } - public long getBeginTimestamp() { return beginTimestamp.get(); } - public void setBeginTimestamp(long beginTimestamp) { this.beginTimestamp.set(beginTimestamp); this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp); } - public long getEndTimestamp() { return endTimestamp.get(); } - public void setEndTimestamp(long endTimestamp) { this.endTimestamp.set(endTimestamp); this.byteBuffer.putLong(endTimestampIndex, endTimestamp); } - public long getBeginPhyOffset() { return beginPhyOffset.get(); } - public void setBeginPhyOffset(long beginPhyOffset) { this.beginPhyOffset.set(beginPhyOffset); this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset); } - public long getEndPhyOffset() { return endPhyOffset.get(); } - public void setEndPhyOffset(long endPhyOffset) { this.endPhyOffset.set(endPhyOffset); this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset); } - public AtomicInteger getHashSlotCount() { return hashSlotCount; } - public void incHashSlotCount() { int value = this.hashSlotCount.incrementAndGet(); this.byteBuffer.putInt(hashSlotcountIndex, value); } - public int getIndexCount() { return indexCount.get(); } - public void incIndexCount() { int value = this.indexCount.incrementAndGet(); this.byteBuffer.putInt(indexCountIndex, value); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java index 939ba2d..b72ffe9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.store.index; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; @@ -26,39 +33,25 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - - public class IndexService { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + /** Maximum times to attempt index file creation. */ + private static final int MAX_TRY_IDX_CREATE = 3; private final DefaultMessageStore defaultMessageStore; - private final int hashSlotNum; private final int indexNum; private final String storePath; - private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - /** Maximum times to attempt index file creation. */ - private static final int MAX_TRY_IDX_CREATE = 3; - - public IndexService(final DefaultMessageStore store) { this.defaultMessageStore = store; this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum(); this.indexNum = store.getMessageStoreConfig().getMaxIndexNum(); this.storePath = - StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir()); + StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir()); } - public boolean load(final boolean lastExitOK) { File dir = new File(this.storePath); File[] files = dir.listFiles(); @@ -72,7 +65,7 @@ public class IndexService { if (!lastExitOK) { if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint() - .getIndexMsgTimestamp()) { + .getIndexMsgTimestamp()) { f.destroy(0); continue; } @@ -113,7 +106,7 @@ public class IndexService { if (files != null) { List<IndexFile> fileList = new ArrayList<IndexFile>(); for (int i = 0; i < (files.length - 1); i++) { - IndexFile f = (IndexFile) files[i]; + IndexFile f = (IndexFile)files[i]; if (f.getEndPhyOffset() < offset) { fileList.add(f); } else { @@ -145,7 +138,6 @@ public class IndexService { } } - public void destroy() { try { this.readWriteLock.readLock().lock(); @@ -160,7 +152,6 @@ public class IndexService { } } - public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) { List<Long> phyOffsets = new ArrayList<Long>(maxNum); @@ -183,7 +174,6 @@ public class IndexService { f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile); } - if (f.getBeginTimestamp() < begin) { break; } @@ -202,12 +192,10 @@ public class IndexService { return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset); } - private String buildKey(final String topic, final String key) { return topic + "#" + key; } - public void buildIndex(DispatchRequest req) { IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null) { @@ -255,7 +243,6 @@ public class IndexService { } } - private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) { for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) { log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one"); @@ -300,7 +287,6 @@ public class IndexService { return indexFile; } - public IndexFile getAndCreateLastIndexFile() { IndexFile indexFile = null; IndexFile prevIndexFile = null; @@ -323,15 +309,14 @@ public class IndexService { this.readWriteLock.readLock().unlock(); } - if (indexFile == null) { try { String fileName = - this.storePath + File.separator - + UtilAll.timeMillisToHumanString(System.currentTimeMillis()); + this.storePath + File.separator + + UtilAll.timeMillisToHumanString(System.currentTimeMillis()); indexFile = - new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, - lastUpdateIndexTimestamp); + new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, + lastUpdateIndexTimestamp); this.readWriteLock.writeLock().lock(); this.indexFileList.add(indexFile); } catch (Exception e) { @@ -340,7 +325,6 @@ public class IndexService { this.readWriteLock.writeLock().unlock(); } - if (indexFile != null) { final IndexFile flushThisFile = prevIndexFile; Thread flushThread = new Thread(new Runnable() { @@ -358,7 +342,6 @@ public class IndexService { return indexFile; } - public void flush(final IndexFile f) { if (null == f) return; @@ -377,12 +360,10 @@ public class IndexService { } } - public void start() { } - public void shutdown() { } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java b/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java index e126aee..a864c89 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/QueryOffsetResult.java @@ -6,43 +6,38 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.index; import java.util.List; - public class QueryOffsetResult { private final List<Long> phyOffsets; private final long indexLastUpdateTimestamp; private final long indexLastUpdatePhyoffset; - public QueryOffsetResult(List<Long> phyOffsets, long indexLastUpdateTimestamp, - long indexLastUpdatePhyoffset) { + long indexLastUpdatePhyoffset) { this.phyOffsets = phyOffsets; this.indexLastUpdateTimestamp = indexLastUpdateTimestamp; this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset; } - public List<Long> getPhyOffsets() { return phyOffsets; } - public long getIndexLastUpdateTimestamp() { return indexLastUpdateTimestamp; } - public long getIndexLastUpdatePhyoffset() { return indexLastUpdatePhyoffset; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java index b1520e1..ebe3ffe 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java @@ -6,34 +6,30 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.schedule; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - import java.util.concurrent.ConcurrentHashMap; - +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; /** * */ public class DelayOffsetSerializeWrapper extends RemotingSerializable { private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable = - new ConcurrentHashMap<Integer, Long>(32); - + new ConcurrentHashMap<Integer, Long>(32); public ConcurrentHashMap<Integer, Long> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) { this.offsetTable = offsetTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index c03c181..3df4806 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -6,16 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.schedule; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; @@ -24,16 +31,16 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.running.RunningStats; -import org.apache.rocketmq.store.*; +import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - /** * */ @@ -45,10 +52,10 @@ public class ScheduleMessageService extends ConfigManager { private static final long DELAY_FOR_A_PERIOD = 10000L; private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = - new ConcurrentHashMap<Integer, Long>(32); + new ConcurrentHashMap<Integer, Long>(32); private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable = - new ConcurrentHashMap<Integer, Long>(32); + new ConcurrentHashMap<Integer, Long>(32); private final Timer timer = new Timer("ScheduleMessageTimerThread", true); @@ -56,7 +63,6 @@ public class ScheduleMessageService extends ConfigManager { private int maxDelayLevel; - public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) { this.defaultMessageStore = defaultMessageStore; } @@ -65,6 +71,10 @@ public class ScheduleMessageService extends ConfigManager { return queueId + 1; } + public static int delayLevel2QueueId(final int delayLevel) { + return delayLevel - 1; + } + public void buildRunningStats(HashMap<String, String> stats) { Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { @@ -78,16 +88,10 @@ public class ScheduleMessageService extends ConfigManager { } } - public static int delayLevel2QueueId(final int delayLevel) { - return delayLevel - 1; - } - - private void updateOffset(int delayLevel, long offset) { this.offsetTable.put(delayLevel, offset); } - public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { Long time = this.delayLevelTable.get(delayLevel); if (time != null) { @@ -97,7 +101,6 @@ public class ScheduleMessageService extends ConfigManager { return storeTimestamp + 1000; } - public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { @@ -126,17 +129,14 @@ public class ScheduleMessageService extends ConfigManager { }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); } - public void shutdown() { this.timer.cancel(); } - public int getMaxDelayLevel() { return maxDelayLevel; } - public String encode() { return this.encode(false); } @@ -150,14 +150,14 @@ public class ScheduleMessageService extends ConfigManager { @Override public String configFilePath() { return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig() - .getStorePathRootDir()); + .getStorePathRootDir()); } @Override public void decode(String jsonString) { if (jsonString != null) { DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = - DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class); + DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class); if (delayOffsetSerializeWrapper != null) { this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable()); } @@ -206,13 +206,11 @@ public class ScheduleMessageService extends ConfigManager { private final int delayLevel; private final long offset; - public DeliverDelayedMessageTimerTask(int delayLevel, long offset) { this.delayLevel = delayLevel; this.offset = offset; } - @Override public void run() { try { @@ -221,11 +219,10 @@ public class ScheduleMessageService extends ConfigManager { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( - this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); + this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); } } - /** * @@ -243,11 +240,10 @@ public class ScheduleMessageService extends ConfigManager { return result; } - public void executeOnTimeup() { ConsumeQueue cq = - ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, - delayLevel2QueueId(delayLevel)); + ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, + delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; @@ -262,7 +258,6 @@ public class ScheduleMessageService extends ConfigManager { int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); - long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); @@ -272,31 +267,29 @@ public class ScheduleMessageService extends ConfigManager { if (countdown <= 0) { MessageExt msgExt = - ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( - offsetPy, sizePy); + ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( + offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = - ScheduleMessageService.this.defaultMessageStore - .putMessage(msgInner); + ScheduleMessageService.this.defaultMessageStore + .putMessage(msgInner); if (putMessageResult != null - && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; - } - - else { + } else { // XXX: warn and notify me log.error( - "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", - msgExt.getTopic(), msgExt.getMsgId()); + "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", + msgExt.getTopic(), msgExt.getMsgId()); ScheduleMessageService.this.timer.schedule( - new DeliverDelayedMessageTimerTask(this.delayLevel, - nextOffset), DELAY_FOR_A_PERIOD); + new DeliverDelayedMessageTimerTask(this.delayLevel, + nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, - nextOffset); + nextOffset); return; } } catch (Exception e) { @@ -307,17 +300,15 @@ public class ScheduleMessageService extends ConfigManager { */ log.error( - "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" - + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" - + offsetPy + ",sizePy=" + sizePy, e); + "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + + offsetPy + ",sizePy=" + sizePy, e); } } - } - - else { + } else { ScheduleMessageService.this.timer.schedule( - new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), - countdown); + new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), + countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } @@ -325,7 +316,7 @@ public class ScheduleMessageService extends ConfigManager { nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( - this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); + this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { @@ -342,16 +333,15 @@ public class ScheduleMessageService extends ConfigManager { if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" - + cqMinOffset + ", queueId=" + cq.getQueueId()); + + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, - failScheduleOffset), DELAY_FOR_A_WHILE); + failScheduleOffset), DELAY_FOR_A_WHILE); } - private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setBody(msgExt.getBody()); @@ -360,7 +350,7 @@ public class ScheduleMessageService extends ConfigManager { TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); long tagsCodeValue = - MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); + MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); msgInner.setTagsCode(tagsCodeValue); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java index dd4f6df..cd87b0d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.stats; @@ -21,7 +21,6 @@ import org.apache.rocketmq.store.DefaultMessageStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class BrokerStats { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final DefaultMessageStore defaultMessageStore; @@ -34,12 +33,10 @@ public class BrokerStats { private volatile long msgGetTotalTodayMorning; - public BrokerStats(DefaultMessageStore defaultMessageStore) { this.defaultMessageStore = defaultMessageStore; } - /** */ @@ -48,60 +45,50 @@ public class BrokerStats { this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning; this.msgPutTotalTodayMorning = - this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal(); + this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal(); this.msgGetTotalTodayMorning = - this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get(); + this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get(); log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning); log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning); } - public long getMsgPutTotalYesterdayMorning() { return msgPutTotalYesterdayMorning; } - public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) { this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning; } - public long getMsgPutTotalTodayMorning() { return msgPutTotalTodayMorning; } - public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) { this.msgPutTotalTodayMorning = msgPutTotalTodayMorning; } - public long getMsgGetTotalYesterdayMorning() { return msgGetTotalYesterdayMorning; } - public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) { this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning; } - public long getMsgGetTotalTodayMorning() { return msgGetTotalTodayMorning; } - public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) { this.msgGetTotalTodayMorning = msgGetTotalTodayMorning; } - public long getMsgPutTotalTodayNow() { return this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal(); } - public long getMsgGetTotalTodayNow() { return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index b14780b..f128b09 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -6,16 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.stats; +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.stats.MomentStatsItemSet; @@ -24,11 +27,6 @@ import org.apache.rocketmq.common.stats.StatsItemSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - - public class BrokerStatsManager { public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS"; @@ -65,9 +63,9 @@ public class BrokerStatsManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME); private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "BrokerStatsThread")); + "BrokerStatsThread")); private final ScheduledExecutorService commercialExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "CommercialStatsThread")); + "CommercialStatsThread")); private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>(); private final String clusterName; private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log); @@ -89,7 +87,6 @@ public class BrokerStatsManager { this.statsTable.put(BROKER_GET_FROM_DISK_NUMS, new StatsItemSet(BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(BROKER_GET_FROM_DISK_SIZE, new StatsItemSet(BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log)); - this.statsTable.put(COMMERCIAL_SEND_TIMES, new StatsItemSet(COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG)); this.statsTable.put(COMMERCIAL_RCV_TIMES, new StatsItemSet(COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG)); this.statsTable.put(COMMERCIAL_SEND_SIZE, new StatsItemSet(COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG)); @@ -154,42 +151,36 @@ public class BrokerStatsManager { this.statsTable.get(GROUP_GET_LATENCY).addValue(statsKey, incValue, 1); } - public void incBrokerPutNums() { this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); } - public void incBrokerGetNums(final int incValue) { this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); } - public void incSendBackNums(final String group, final String topic) { final String statsKey = buildStatsKey(topic, group); this.statsTable.get(SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1); } - public double tpsGroupGetNums(final String group, final String topic) { final String statsKey = buildStatsKey(topic, group); return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); } - public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) { final String statsKey = String.format("%d@%s@%s", queueId, topic, group); this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); } - public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) { final String statsKey = String.format("%d@%s@%s", queueId, topic, group); this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); } public void incCommercialValue(final String key, final String owner, final String group, - final String topic, final String type, final int incValue) { + final String topic, final String type, final int incValue) { final String statsKey = buildCommercialStatsKey(owner, topic, group, type); this.statsTable.get(key).addValue(statsKey, incValue, 1); } @@ -206,7 +197,6 @@ public class BrokerStatsManager { return strBuilder.toString(); } - public enum StatsType { SEND_SUCCESS, SEND_FAILURE, http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/util/LibC.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java index ee9e68b..dc5d6a9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java +++ b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store.util; @@ -22,9 +22,8 @@ import com.sun.jna.NativeLong; import com.sun.jna.Platform; import com.sun.jna.Pointer; - public interface LibC extends Library { - LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class); + LibC INSTANCE = (LibC)Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class); int MADV_WILLNEED = 3; int MADV_DONTNEED = 4; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index bfcb33e..5c6bde2 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -6,17 +6,21 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -26,17 +30,11 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.concurrent.atomic.AtomicInteger; - import static org.junit.Assert.assertTrue; - public class DefaultMessageStoreTest { private static final Logger logger = LoggerFactory.getLogger(DefaultMessageStoreTest.class); - + private static final String StoreMessage = "Once, there was a chance for me!"; private static int QUEUE_TOTAL = 100; @@ -82,7 +80,7 @@ public class DefaultMessageStoreTest { PutMessageResult result = master.putMessage(buildMessage()); logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId()); } - + for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); if (result == null) { @@ -133,7 +131,7 @@ public class DefaultMessageStoreTest { PutMessageResult result = master.putMessage(buildMessage()); logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId()); } - + for (long i = 0; i < totalMsgs; i++) { GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); if (result == null) { @@ -142,7 +140,7 @@ public class DefaultMessageStoreTest { assertTrue(result != null); result.release(); logger.debug("read " + i + " OK"); - + } } finally { master.shutdown(); @@ -150,9 +148,9 @@ public class DefaultMessageStoreTest { } logger.debug("================================================================"); } - + private class MyMessageArrivingListener implements MessageArrivingListener { - + @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode) { // Do nothing here http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index ac1e016..018cf99 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z [email protected] $ */ /** @@ -20,12 +22,17 @@ */ package org.apache.rocketmq.store; -import org.junit.*; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class MappedFileQueueTest { private static final Logger logger = LoggerFactory.getLogger(MappedFileQueueTest.class); @@ -55,7 +62,7 @@ public class MappedFileQueueTest { logger.debug("================================================================"); MappedFileQueue mappedFileQueue = - new MappedFileQueue("target/unit_test_store/a/", 1024, null); + new MappedFileQueue("target/unit_test_store/a/", 1024, null); for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); @@ -73,7 +80,6 @@ public class MappedFileQueueTest { logger.debug("MappedFileQueue.getLastMappedFile() OK"); } - @Test public void test_findMapedFileByOffset() { // four-byte string. @@ -81,7 +87,7 @@ public class MappedFileQueueTest { logger.debug("================================================================"); MappedFileQueue mappedFileQueue = - new MappedFileQueue("target/unit_test_store/b/", 1024, null); + new MappedFileQueue("target/unit_test_store/b/", 1024, null); for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); @@ -96,23 +102,23 @@ public class MappedFileQueueTest { MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0); assertTrue(mappedFile != null); assertEquals(mappedFile.getFileFromOffset(), 0); - + mappedFile = mappedFileQueue.findMappedFileByOffset(100); assertTrue(mappedFile != null); assertEquals(mappedFile.getFileFromOffset(), 0); - + mappedFile = mappedFileQueue.findMappedFileByOffset(1024); assertTrue(mappedFile != null); assertEquals(mappedFile.getFileFromOffset(), 1024); - + mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100); assertTrue(mappedFile != null); assertEquals(mappedFile.getFileFromOffset(), 1024); - + mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2); assertTrue(mappedFile != null); assertEquals(mappedFile.getFileFromOffset(), 1024 * 2); - + mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100); assertTrue(mappedFile != null); assertEquals(mappedFile.getFileFromOffset(), 1024 * 2); @@ -135,7 +141,7 @@ public class MappedFileQueueTest { logger.debug("================================================================"); MappedFileQueue mappedFileQueue = - new MappedFileQueue("target/unit_test_store/c/", 1024, null); + new MappedFileQueue("target/unit_test_store/c/", 1024, null); for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); @@ -148,27 +154,27 @@ public class MappedFileQueueTest { boolean result = mappedFileQueue.flush(0); assertFalse(result); assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere()); - + result = mappedFileQueue.flush(0); assertFalse(result); assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere()); - + result = mappedFileQueue.flush(0); assertFalse(result); assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere()); - + result = mappedFileQueue.flush(0); assertFalse(result); assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere()); - + result = mappedFileQueue.flush(0); assertFalse(result); assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere()); - + result = mappedFileQueue.flush(0); assertFalse(result); assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere()); - + mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); logger.debug("MappedFileQueue.flush() OK"); @@ -180,7 +186,7 @@ public class MappedFileQueueTest { logger.debug("================================================================"); MappedFileQueue mappedFileQueue = - new MappedFileQueue("target/unit_test_store/d/", 1024, null); + new MappedFileQueue("target/unit_test_store/d/", 1024, null); for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java index bfa09e9..a10f4eb 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: MappedFileTest.java 1831 2013-05-16 01:39:51Z [email protected] $ */ /** @@ -20,6 +22,7 @@ */ package org.apache.rocketmq.store; +import java.io.IOException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -27,16 +30,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - import static org.junit.Assert.assertTrue; - public class MappedFileTest { - + private static final Logger logger = LoggerFactory.getLogger(MappedFileTest.class); - - + private static final String StoreMessage = "Once, there was a chance for me!"; @BeforeClass @@ -54,15 +53,15 @@ public class MappedFileTest { boolean result = mappedFile.appendMessage(StoreMessage.getBytes()); assertTrue(result); logger.debug("write OK"); - + SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0); byte[] data = new byte[StoreMessage.length()]; selectMappedBufferResult.getByteBuffer().get(data); String readString = new String(data); - + logger.debug("Read: " + readString); assertTrue(readString.equals(StoreMessage)); - + mappedFile.shutdown(1000); assertTrue(!mappedFile.isAvailable()); selectMappedBufferResult.release(); @@ -76,11 +75,11 @@ public class MappedFileTest { boolean result = mappedFile.appendMessage(StoreMessage.getBytes()); assertTrue(result); logger.debug("write OK"); - + SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0); selectMappedBufferResult.release(); mappedFile.shutdown(1000); - + byte[] data = new byte[StoreMessage.length()]; selectMappedBufferResult.getByteBuffer().get(data); String readString = new String(data); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java index f0245a9..2fe4d46 100644 --- a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: StoreCheckpointTest.java 1831 2013-05-16 01:39:51Z [email protected] $ */ /** @@ -20,15 +22,13 @@ */ package org.apache.rocketmq.store; +import java.io.IOException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; - import static org.junit.Assert.assertTrue; - public class StoreCheckpointTest { @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -47,7 +47,7 @@ public class StoreCheckpointTest { storeCheckpoint.setPhysicMsgTimestamp(physicMsgTimestamp); storeCheckpoint.setLogicsMsgTimestamp(logicsMsgTimestamp); storeCheckpoint.flush(); - + long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp(); assertTrue(diff == 3000); storeCheckpoint.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java index 95cf0c8..cdecb20 100644 --- a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: IndexFileTest.java 1831 2013-05-16 01:39:51Z [email protected] $ */ /** @@ -20,16 +22,14 @@ */ package org.apache.rocketmq.store.index; -import org.junit.Test; - import java.util.ArrayList; import java.util.List; +import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - public class IndexFileTest { private static final int HASH_SLOT_NUM = 100; private static final int INDEX_NUM = 400; @@ -45,15 +45,14 @@ public class IndexFileTest { // put over index file capacity. boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); assertFalse(putResult); - + indexFile.destroy(0); } - @Test public void test_put_get_index() throws Exception { IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0); - + for (long i = 0; i < (INDEX_NUM - 1); i++) { boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); assertTrue(putResult); @@ -62,7 +61,7 @@ public class IndexFileTest { // put over index file capacity. boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); assertFalse(putResult); - + final List<Long> phyOffsets = new ArrayList<Long>(); indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true); assertFalse(phyOffsets.isEmpty()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java index 7ac8233..c5d9756 100644 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z [email protected] $ */ /** @@ -20,19 +22,21 @@ */ package org.apache.rocketmq.store.schedule; -import org.apache.rocketmq.store.*; -import org.apache.rocketmq.store.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.concurrent.atomic.AtomicInteger; - import static org.junit.Assert.assertTrue; @Ignore @@ -65,7 +69,6 @@ public class ScheduleMessageTest { long totalMsgs = 10000; QUEUE_TOTAL = 32; - MessageBody = StoreMessage.getBytes(); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); @@ -79,7 +82,6 @@ public class ScheduleMessageTest { boolean load = master.load(); assertTrue(load); - master.start(); for (int i = 0; i < totalMsgs; i++) { MessageExtBrokerInner msg = buildMessage(); @@ -92,7 +94,6 @@ public class ScheduleMessageTest { System.out.println("write message over, wait time up"); Thread.sleep(1000 * 20); - for (long i = 0; i < totalMsgs; i++) { try { GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); @@ -110,10 +111,8 @@ public class ScheduleMessageTest { Thread.sleep(1000 * 15); - master.shutdown(); - master.destroy(); System.out.println("================================================================"); }
