http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java new file mode 100644 index 0000000..00f9833 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java @@ -0,0 +1,654 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.ha; + +import com.alibaba.rocketmq.common.ServiceThread; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import com.alibaba.rocketmq.store.CommitLog.GroupCommitRequest; +import com.alibaba.rocketmq.store.DefaultMessageStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * @author shijia.wxr + */ +public class HAService { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + + private final AtomicInteger connectionCount = new AtomicInteger(0); + + private final List<HAConnection> connectionList = new LinkedList<HAConnection>(); + + private final AcceptSocketService acceptSocketService; + + private final DefaultMessageStore defaultMessageStore; + + private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject(); + private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0); + + private final GroupTransferService groupTransferService; + + private final HAClient haClient; + + + public HAService(final DefaultMessageStore defaultMessageStore) throws IOException { + this.defaultMessageStore = defaultMessageStore; + this.acceptSocketService = + new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort()); + this.groupTransferService = new GroupTransferService(); + this.haClient = new HAClient(); + } + + + public void updateMasterAddress(final String newAddr) { + if (this.haClient != null) { + this.haClient.updateMasterAddress(newAddr); + } + } + + + public void putRequest(final GroupCommitRequest request) { + this.groupTransferService.putRequest(request); + } + + + public boolean isSlaveOK(final long masterPutWhere) { + boolean result = this.connectionCount.get() > 0; + result = + result + && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore + .getMessageStoreConfig().getHaSlaveFallbehindMax()); + return result; + } + + + /** + + */ + public void notifyTransferSome(final long offset) { + for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { + boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); + if (ok) { + this.groupTransferService.notifyTransferSome(); + break; + } else { + value = this.push2SlaveMaxOffset.get(); + } + } + } + + + public AtomicInteger getConnectionCount() { + return connectionCount; + } + + + // public void notifyTransferSome() { + // this.groupTransferService.notifyTransferSome(); + // } + + public void start() { + this.acceptSocketService.beginAccept(); + this.acceptSocketService.start(); + this.groupTransferService.start(); + this.haClient.start(); + } + + + public void addConnection(final HAConnection conn) { + synchronized (this.connectionList) { + this.connectionList.add(conn); + } + } + + + public void removeConnection(final HAConnection conn) { + synchronized (this.connectionList) { + this.connectionList.remove(conn); + } + } + + + public void shutdown() { + this.haClient.shutdown(); + this.acceptSocketService.shutdown(true); + this.destroyConnections(); + this.groupTransferService.shutdown(); + } + + + public void destroyConnections() { + synchronized (this.connectionList) { + for (HAConnection c : this.connectionList) { + c.shutdown(); + } + + this.connectionList.clear(); + } + } + + + public DefaultMessageStore getDefaultMessageStore() { + return defaultMessageStore; + } + + + public WaitNotifyObject getWaitNotifyObject() { + return waitNotifyObject; + } + + public AtomicLong getPush2SlaveMaxOffset() { + return push2SlaveMaxOffset; + } + + class AcceptSocketService extends ServiceThread { + private ServerSocketChannel serverSocketChannel; + private Selector selector; + private SocketAddress socketAddressListen; + + + public AcceptSocketService(final int port) { + this.socketAddressListen = new InetSocketAddress(port); + } + + + public void beginAccept() { + try { + this.serverSocketChannel = ServerSocketChannel.open(); + this.selector = RemotingUtil.openSelector(); + this.serverSocketChannel.socket().setReuseAddress(true); + this.serverSocketChannel.socket().bind(this.socketAddressListen); + this.serverSocketChannel.configureBlocking(false); + this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); + } catch (Exception e) { + log.error("beginAccept exception", e); + } + } + + + @Override + public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + this.selector.select(1000); + Set<SelectionKey> selected = this.selector.selectedKeys(); + if (selected != null) { + for (SelectionKey k : selected) { + if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { + SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); + if (sc != null) { + HAService.log.info("HAService receive new connection, " + + sc.socket().getRemoteSocketAddress()); + + try { + HAConnection conn = new HAConnection(HAService.this, sc); + conn.start(); + HAService.this.addConnection(conn); + } catch (Exception e) { + log.error("new HAConnection exception", e); + sc.close(); + } + } + } else { + log.warn("Unexpected ops in select " + k.readyOps()); + } + } + + selected.clear(); + } + + } catch (Exception e) { + log.error(this.getServiceName() + " service has exception.", e); + } + } + + log.error(this.getServiceName() + " service end"); + } + + + @Override + public String getServiceName() { + return AcceptSocketService.class.getSimpleName(); + } + } + + /** + * GroupTransferService Service + */ + class GroupTransferService extends ServiceThread { + + private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); + private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); + private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); + + + public void putRequest(final GroupCommitRequest request) { + synchronized (this) { + this.requestsWrite.add(request); + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify + } + } + } + + + public void notifyTransferSome() { + this.notifyTransferObject.wakeup(); + } + + + private void swapRequests() { + List<GroupCommitRequest> tmp = this.requestsWrite; + this.requestsWrite = this.requestsRead; + this.requestsRead = tmp; + } + + + private void doWaitTransfer() { + if (!this.requestsRead.isEmpty()) { + for (GroupCommitRequest req : this.requestsRead) { + boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + for (int i = 0; !transferOK && i < 5; i++) { + this.notifyTransferObject.waitForRunning(1000); + transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + } + + if (!transferOK) { + log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); + } + + req.wakeupCustomer(transferOK); + } + + this.requestsRead.clear(); + } + } + + + public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + this.waitForRunning(0); + this.doWaitTransfer(); + } catch (Exception e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + log.info(this.getServiceName() + " service end"); + } + + + @Override + protected void onWaitEnd() { + this.swapRequests(); + } + + + @Override + public String getServiceName() { + return GroupTransferService.class.getSimpleName(); + } + } + + class HAClient extends ServiceThread { + private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; + private final AtomicReference<String> masterAddress = new AtomicReference<String>(); + private final ByteBuffer reportOffset = ByteBuffer.allocate(8); + private SocketChannel socketChannel; + private Selector selector; + private long lastWriteTimestamp = System.currentTimeMillis(); + + private long currentReportedOffset = 0; + private int dispatchPostion = 0; + private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); + private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); + + + public HAClient() throws IOException { + this.selector = RemotingUtil.openSelector(); + } + + + public void updateMasterAddress(final String newAddr) { + String currentAddr = this.masterAddress.get(); + if (currentAddr == null || !currentAddr.equals(newAddr)) { + this.masterAddress.set(newAddr); + log.info("update master address, OLD: " + currentAddr + " NEW: " + newAddr); + } + } + + + private boolean isTimeToReportOffset() { + long interval = + HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp; + boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig() + .getHaSendHeartbeatInterval(); + + return needHeart; + } + + + private boolean reportSlaveMaxOffset(final long maxOffset) { + this.reportOffset.position(0); + this.reportOffset.limit(8); + this.reportOffset.putLong(maxOffset); + this.reportOffset.position(0); + this.reportOffset.limit(8); + + for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) { + try { + this.socketChannel.write(this.reportOffset); + } catch (IOException e) { + log.error(this.getServiceName() + + "reportSlaveMaxOffset this.socketChannel.write exception", e); + return false; + } + } + + return !this.reportOffset.hasRemaining(); + } + + + // private void reallocateByteBuffer() { + // ByteBuffer bb = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); + // int remain = this.byteBufferRead.limit() - this.dispatchPostion; + // bb.put(this.byteBufferRead.array(), this.dispatchPostion, remain); + // this.dispatchPostion = 0; + // this.byteBufferRead = bb; + // } + + /** + + */ + private void reallocateByteBuffer() { + int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion; + if (remain > 0) { + this.byteBufferRead.position(this.dispatchPostion); + + this.byteBufferBackup.position(0); + this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE); + this.byteBufferBackup.put(this.byteBufferRead); + } + + this.swapByteBuffer(); + + this.byteBufferRead.position(remain); + this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE); + this.dispatchPostion = 0; + } + + + private void swapByteBuffer() { + ByteBuffer tmp = this.byteBufferRead; + this.byteBufferRead = this.byteBufferBackup; + this.byteBufferBackup = tmp; + } + + + private boolean processReadEvent() { + int readSizeZeroTimes = 0; + while (this.byteBufferRead.hasRemaining()) { + try { + int readSize = this.socketChannel.read(this.byteBufferRead); + if (readSize > 0) { + lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now(); + readSizeZeroTimes = 0; + boolean result = this.dispatchReadRequest(); + if (!result) { + log.error("HAClient, dispatchReadRequest error"); + return false; + } + } else if (readSize == 0) { + if (++readSizeZeroTimes >= 3) { + break; + } + } else { + // TODO ERROR + log.info("HAClient, processReadEvent read socket < 0"); + return false; + } + } catch (IOException e) { + log.info("HAClient, processReadEvent read socket exception", e); + return false; + } + } + + return true; + } + + + private boolean dispatchReadRequest() { + final int msgHeaderSize = 8 + 4; // phyoffset + size + int readSocketPos = this.byteBufferRead.position(); + + while (true) { + int diff = this.byteBufferRead.position() - this.dispatchPostion; + if (diff >= msgHeaderSize) { + long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); + int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); + + long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); + + + if (slavePhyOffset != 0) { + if (slavePhyOffset != masterPhyOffset) { + log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + + slavePhyOffset + " MASTER: " + masterPhyOffset); + return false; + } + } + + + if (diff >= (msgHeaderSize + bodySize)) { + byte[] bodyData = new byte[bodySize]; + this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); + this.byteBufferRead.get(bodyData); + + + HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); + + this.byteBufferRead.position(readSocketPos); + this.dispatchPostion += msgHeaderSize + bodySize; + + if (!reportSlaveMaxOffsetPlus()) { + return false; + } + + continue; + } + } + + if (!this.byteBufferRead.hasRemaining()) { + this.reallocateByteBuffer(); + } + + break; + } + + return true; + } + + + private boolean reportSlaveMaxOffsetPlus() { + boolean result = true; + long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); + if (currentPhyOffset > this.currentReportedOffset) { + this.currentReportedOffset = currentPhyOffset; + result = this.reportSlaveMaxOffset(this.currentReportedOffset); + if (!result) { + this.closeMaster(); + log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset); + } + } + + return result; + } + + + private boolean connectMaster() throws ClosedChannelException { + if (null == socketChannel) { + String addr = this.masterAddress.get(); + if (addr != null) { + + SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); + if (socketAddress != null) { + this.socketChannel = RemotingUtil.connect(socketAddress); + if (this.socketChannel != null) { + this.socketChannel.register(this.selector, SelectionKey.OP_READ); + } + } + } + + this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); + + this.lastWriteTimestamp = System.currentTimeMillis(); + } + + return this.socketChannel != null; + } + + + private void closeMaster() { + if (null != this.socketChannel) { + try { + + SelectionKey sk = this.socketChannel.keyFor(this.selector); + if (sk != null) { + sk.cancel(); + } + + this.socketChannel.close(); + + this.socketChannel = null; + } catch (IOException e) { + log.warn("closeMaster exception. ", e); + } + + this.lastWriteTimestamp = 0; + this.dispatchPostion = 0; + + this.byteBufferBackup.position(0); + this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE); + + this.byteBufferRead.position(0); + this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE); + } + } + + + @Override + public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + if (this.connectMaster()) { + + if (this.isTimeToReportOffset()) { + boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); + if (!result) { + this.closeMaster(); + } + } + + + this.selector.select(1000); + + + boolean ok = this.processReadEvent(); + if (!ok) { + this.closeMaster(); + } + + if (!reportSlaveMaxOffsetPlus()) { + continue; + } + + + long interval = + HAService.this.getDefaultMessageStore().getSystemClock().now() + - this.lastWriteTimestamp; + if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() + .getHaHousekeepingInterval()) { + log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + + "] expired, " + interval); + this.closeMaster(); + log.warn("HAClient, master not response some time, so close connection"); + } + } else { + this.waitForRunning(1000 * 5); + } + } catch (Exception e) { + log.warn(this.getServiceName() + " service has exception. ", e); + this.waitForRunning(1000 * 5); + } + } + + log.info(this.getServiceName() + " service end"); + } + + + // + // private void disableWriteFlag() { + // if (this.socketChannel != null) { + // SelectionKey sk = this.socketChannel.keyFor(this.selector); + // if (sk != null) { + // int ops = sk.interestOps(); + // ops &= ~SelectionKey.OP_WRITE; + // sk.interestOps(ops); + // } + // } + // } + // + // + // private void enableWriteFlag() { + // if (this.socketChannel != null) { + // SelectionKey sk = this.socketChannel.keyFor(this.selector); + // if (sk != null) { + // int ops = sk.interestOps(); + // ops |= SelectionKey.OP_WRITE; + // sk.interestOps(ops); + // } + // } + // } + + @Override + public String getServiceName() { + return HAClient.class.getSimpleName(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java new file mode 100644 index 0000000..f540cdb --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.ha; + +import java.util.HashMap; + + +/** + * @author shijia.wxr + */ +public class WaitNotifyObject { + + protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable = + new HashMap<Long, Boolean>(16); + + protected volatile boolean hasNotified = false; + + + public void wakeup() { + synchronized (this) { + if (!this.hasNotified) { + this.hasNotified = true; + this.notify(); + } + } + } + + + protected void waitForRunning(long interval) { + synchronized (this) { + if (this.hasNotified) { + this.hasNotified = false; + this.onWaitEnd(); + return; + } + + try { + this.wait(interval); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + this.hasNotified = false; + this.onWaitEnd(); + } + } + } + + + protected void onWaitEnd() { + } + + public void wakeupAll() { + synchronized (this) { + boolean needNotify = false; + + for (Boolean value : this.waitingThreadTable.values()) { + needNotify = needNotify || !value; + value = true; + } + + if (needNotify) { + this.notifyAll(); + } + } + } + + public void allWaitForRunning(long interval) { + long currentThreadId = Thread.currentThread().getId(); + synchronized (this) { + Boolean notified = this.waitingThreadTable.get(currentThreadId); + if (notified != null && notified) { + this.waitingThreadTable.put(currentThreadId, false); + this.onWaitEnd(); + return; + } + + try { + this.wait(interval); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + this.waitingThreadTable.put(currentThreadId, false); + this.onWaitEnd(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java new file mode 100644 index 0000000..f353320 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.index; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.store.MappedFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class IndexFile { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private static int hashSlotSize = 4; + private static int indexSize = 20; + private static int invalidIndex = 0; + private final int hashSlotNum; + private final int indexNum; + private final MappedFile mappedFile; + private final FileChannel fileChannel; + private final MappedByteBuffer mappedByteBuffer; + private final IndexHeader indexHeader; + + + public IndexFile(final String fileName, final int hashSlotNum, final int indexNum, + final long endPhyOffset, final long endTimestamp) throws IOException { + int fileTotalSize = + IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize); + this.mappedFile = new MappedFile(fileName, fileTotalSize); + this.fileChannel = this.mappedFile.getFileChannel(); + this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer(); + this.hashSlotNum = hashSlotNum; + this.indexNum = indexNum; + + ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); + this.indexHeader = new IndexHeader(byteBuffer); + + if (endPhyOffset > 0) { + this.indexHeader.setBeginPhyOffset(endPhyOffset); + this.indexHeader.setEndPhyOffset(endPhyOffset); + } + + if (endTimestamp > 0) { + this.indexHeader.setBeginTimestamp(endTimestamp); + this.indexHeader.setEndTimestamp(endTimestamp); + } + } + + + public String getFileName() { + return this.mappedFile.getFileName(); + } + + + public void load() { + this.indexHeader.load(); + } + + + public void flush() { + long beginTime = System.currentTimeMillis(); + if (this.mappedFile.hold()) { + this.indexHeader.updateByteBuffer(); + this.mappedByteBuffer.force(); + this.mappedFile.release(); + log.info("flush index file eclipse time(ms) " + (System.currentTimeMillis() - beginTime)); + } + } + + public boolean isWriteFull() { + return this.indexHeader.getIndexCount() >= this.indexNum; + } + + + public boolean destroy(final long intervalForcibly) { + return this.mappedFile.destroy(intervalForcibly); + } + + public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { + if (this.indexHeader.getIndexCount() < this.indexNum) { + int keyHash = indexKeyHashMethod(key); + int slotPos = keyHash % this.hashSlotNum; + int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; + + FileLock fileLock = null; + + try { + + // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, + // false); + int slotValue = this.mappedByteBuffer.getInt(absSlotPos); + if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { + slotValue = invalidIndex; + } + + long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); + + + timeDiff = timeDiff / 1000; + + + if (this.indexHeader.getBeginTimestamp() <= 0) { + timeDiff = 0; + } else if (timeDiff > Integer.MAX_VALUE) { + timeDiff = Integer.MAX_VALUE; + } else if (timeDiff < 0) { + timeDiff = 0; + } + + int absIndexPos = + IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + + this.indexHeader.getIndexCount() * indexSize; + + + this.mappedByteBuffer.putInt(absIndexPos, keyHash); + this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); + this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); + this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); + + + this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); + + + if (this.indexHeader.getIndexCount() <= 1) { + this.indexHeader.setBeginPhyOffset(phyOffset); + this.indexHeader.setBeginTimestamp(storeTimestamp); + } + + this.indexHeader.incHashSlotCount(); + this.indexHeader.incIndexCount(); + this.indexHeader.setEndPhyOffset(phyOffset); + this.indexHeader.setEndTimestamp(storeTimestamp); + + return true; + } catch (Exception e) { + log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); + } finally { + if (fileLock != null) { + try { + fileLock.release(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } else { + log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num " + + this.indexNum); + } + + return false; + } + + public int indexKeyHashMethod(final String key) { + int keyHash = key.hashCode(); + int keyHashPositive = Math.abs(keyHash); + if (keyHashPositive < 0) + keyHashPositive = 0; + return keyHashPositive; + } + + public long getBeginTimestamp() { + return this.indexHeader.getBeginTimestamp(); + } + + public long getEndTimestamp() { + return this.indexHeader.getEndTimestamp(); + } + + public long getEndPhyOffset() { + return this.indexHeader.getEndPhyOffset(); + } + + public boolean isTimeMatched(final long begin, final long end) { + boolean result = + begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp(); + + result = + result + || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader + .getEndTimestamp()); + + result = + result + || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader + .getEndTimestamp()); + return result; + } + + public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, + final long begin, final long end, boolean lock) { + if (this.mappedFile.hold()) { + int keyHash = indexKeyHashMethod(key); + int slotPos = keyHash % this.hashSlotNum; + int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; + + FileLock fileLock = null; + try { + if (lock) { + // fileLock = this.fileChannel.lock(absSlotPos, + // hashSlotSize, true); + } + + int slotValue = this.mappedByteBuffer.getInt(absSlotPos); + // if (fileLock != null) { + // fileLock.release(); + // fileLock = null; + // } + + if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() + || this.indexHeader.getIndexCount() <= 1) { + // TODO NOTFOUND + } else { + for (int nextIndexToRead = slotValue;;) { + if (phyOffsets.size() >= maxNum) { + break; + } + + int absIndexPos = + IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + + nextIndexToRead * indexSize; + + int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); + long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); + + long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); + int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); + + + if (timeDiff < 0) { + break; + } + + timeDiff *= 1000L; + + long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; + boolean timeMatched = (timeRead >= begin) && (timeRead <= end); + + if (keyHash == keyHashRead && timeMatched) { + phyOffsets.add(phyOffsetRead); + } + + if (prevIndexRead <= invalidIndex + || prevIndexRead > this.indexHeader.getIndexCount() + || prevIndexRead == nextIndexToRead || timeRead < begin) { + break; + } + + nextIndexToRead = prevIndexRead; + } + } + } catch (Exception e) { + log.error("selectPhyOffset exception ", e); + } finally { + if (fileLock != null) { + try { + fileLock.release(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + this.mappedFile.release(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java new file mode 100644 index 0000000..d6015e3 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.index; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + + +/** + + * + * @author shijia.wxr + * + */ +public class IndexHeader { + public static final int INDEX_HEADER_SIZE = 40; + private static int beginTimestampIndex = 0; + private static int endTimestampIndex = 8; + private static int beginPhyoffsetIndex = 16; + private static int endPhyoffsetIndex = 24; + private static int hashSlotcountIndex = 32; + private static int indexCountIndex = 36; + private final ByteBuffer byteBuffer; + private AtomicLong beginTimestamp = new AtomicLong(0); + private AtomicLong endTimestamp = new AtomicLong(0); + private AtomicLong beginPhyOffset = new AtomicLong(0); + private AtomicLong endPhyOffset = new AtomicLong(0); + private AtomicInteger hashSlotCount = new AtomicInteger(0); + + private AtomicInteger indexCount = new AtomicInteger(1); + + + public IndexHeader(final ByteBuffer byteBuffer) { + this.byteBuffer = byteBuffer; + } + + + public void load() { + this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex)); + this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex)); + this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex)); + this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex)); + + this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex)); + this.indexCount.set(byteBuffer.getInt(indexCountIndex)); + + if (this.indexCount.get() <= 0) { + this.indexCount.set(1); + } + } + + public void updateByteBuffer() { + this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get()); + this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get()); + this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get()); + this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get()); + this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get()); + this.byteBuffer.putInt(indexCountIndex, this.indexCount.get()); + } + + + public long getBeginTimestamp() { + return beginTimestamp.get(); + } + + + public void setBeginTimestamp(long beginTimestamp) { + this.beginTimestamp.set(beginTimestamp); + this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp); + } + + + public long getEndTimestamp() { + return endTimestamp.get(); + } + + + public void setEndTimestamp(long endTimestamp) { + this.endTimestamp.set(endTimestamp); + this.byteBuffer.putLong(endTimestampIndex, endTimestamp); + } + + + public long getBeginPhyOffset() { + return beginPhyOffset.get(); + } + + + public void setBeginPhyOffset(long beginPhyOffset) { + this.beginPhyOffset.set(beginPhyOffset); + this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset); + } + + + public long getEndPhyOffset() { + return endPhyOffset.get(); + } + + + public void setEndPhyOffset(long endPhyOffset) { + this.endPhyOffset.set(endPhyOffset); + this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset); + } + + + public AtomicInteger getHashSlotCount() { + return hashSlotCount; + } + + + public void incHashSlotCount() { + int value = this.hashSlotCount.incrementAndGet(); + this.byteBuffer.putInt(hashSlotcountIndex, value); + } + + + public int getIndexCount() { + return indexCount.get(); + } + + + public void incIndexCount() { + int value = this.indexCount.incrementAndGet(); + this.byteBuffer.putInt(indexCountIndex, value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java new file mode 100644 index 0000000..f275f80 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.index; + +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; +import com.alibaba.rocketmq.store.DefaultMessageStore; +import com.alibaba.rocketmq.store.DispatchRequest; +import com.alibaba.rocketmq.store.config.StorePathConfigHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +/** + * @author shijia.wxr + */ +public class IndexService { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private final DefaultMessageStore defaultMessageStore; + + private final int hashSlotNum; + private final int indexNum; + private final String storePath; + + private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>(); + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + + public IndexService(final DefaultMessageStore store) { + this.defaultMessageStore = store; + this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum(); + this.indexNum = store.getMessageStoreConfig().getMaxIndexNum(); + this.storePath = + StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir()); + } + + + public boolean load(final boolean lastExitOK) { + File dir = new File(this.storePath); + File[] files = dir.listFiles(); + if (files != null) { + // ascending order + Arrays.sort(files); + for (File file : files) { + try { + IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0); + f.load(); + + if (!lastExitOK) { + if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint() + .getIndexMsgTimestamp()) { + f.destroy(0); + continue; + } + } + + log.info("load index file OK, " + f.getFileName()); + this.indexFileList.add(f); + } catch (IOException e) { + log.error("load file " + file + " error", e); + return false; + } catch (NumberFormatException e) { + continue; + } + } + } + + return true; + } + + public void deleteExpiredFile(long offset) { + Object[] files = null; + try { + this.readWriteLock.readLock().lock(); + if (this.indexFileList.isEmpty()) { + return; + } + + long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset(); + if (endPhyOffset < offset) { + files = this.indexFileList.toArray(); + } + } catch (Exception e) { + log.error("destroy exception", e); + } finally { + this.readWriteLock.readLock().unlock(); + } + + if (files != null) { + List<IndexFile> fileList = new ArrayList<IndexFile>(); + for (int i = 0; i < (files.length - 1); i++) { + IndexFile f = (IndexFile) files[i]; + if (f.getEndPhyOffset() < offset) { + fileList.add(f); + } else { + break; + } + } + + this.deleteExpiredFile(fileList); + } + } + + private void deleteExpiredFile(List<IndexFile> files) { + if (!files.isEmpty()) { + try { + this.readWriteLock.writeLock().lock(); + for (IndexFile file : files) { + boolean destroyed = file.destroy(3000); + destroyed = destroyed && this.indexFileList.remove(file); + if (!destroyed) { + log.error("deleteExpiredFile remove failed."); + break; + } + } + } catch (Exception e) { + log.error("deleteExpiredFile has exception.", e); + } finally { + this.readWriteLock.writeLock().unlock(); + } + } + } + + + public void destroy() { + try { + this.readWriteLock.readLock().lock(); + for (IndexFile f : this.indexFileList) { + f.destroy(1000 * 3); + } + this.indexFileList.clear(); + } catch (Exception e) { + log.error("destroy exception", e); + } finally { + this.readWriteLock.readLock().unlock(); + } + } + + + public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) { + List<Long> phyOffsets = new ArrayList<Long>(maxNum); + + long indexLastUpdateTimestamp = 0; + long indexLastUpdatePhyoffset = 0; + maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); + try { + this.readWriteLock.readLock().lock(); + if (!this.indexFileList.isEmpty()) { + for (int i = this.indexFileList.size(); i > 0; i--) { + IndexFile f = this.indexFileList.get(i - 1); + boolean lastFile = i == this.indexFileList.size(); + if (lastFile) { + indexLastUpdateTimestamp = f.getEndTimestamp(); + indexLastUpdatePhyoffset = f.getEndPhyOffset(); + } + + if (f.isTimeMatched(begin, end)) { + + f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile); + } + + + if (f.getBeginTimestamp() < begin) { + break; + } + + if (phyOffsets.size() >= maxNum) { + break; + } + } + } + } catch (Exception e) { + log.error("queryMsg exception", e); + } finally { + this.readWriteLock.readLock().unlock(); + } + + return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset); + } + + + private String buildKey(final String topic, final String key) { + return topic + "#" + key; + } + + + public void buildIndex(DispatchRequest req) { + IndexFile indexFile = retryGetAndCreateIndexFile(); + if (indexFile != null) { + long endPhyOffset = indexFile.getEndPhyOffset(); + DispatchRequest msg = req; + String topic = msg.getTopic(); + String keys = msg.getKeys(); + if (msg.getCommitLogOffset() < endPhyOffset) { + return; + } + + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); + switch (tranType) { + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + break; + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + return; + } + + if (req.getUniqKey() != null) { + indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey())); + if (indexFile == null) { + log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); + return; + } + } + + if (keys != null && keys.length() > 0) { + String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); + for (int i = 0; i < keyset.length; i++) { + String key = keyset[i]; + if (key.length() > 0) { + indexFile = putKey(indexFile, msg, buildKey(topic, key)); + if (indexFile == null) { + log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); + return; + } + } + } + } + } else { + log.error("build index error, stop building index"); + } + } + + + private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) { + for (boolean ok = + indexFile.putKey(idxKey, msg.getCommitLogOffset(), + msg.getStoreTimestamp()); !ok; ) { + log.warn("index file full, so create another one, " + indexFile.getFileName()); + indexFile = retryGetAndCreateIndexFile(); + if (null == indexFile) { + return null; + } + + ok = + indexFile.putKey(idxKey, msg.getCommitLogOffset(), + msg.getStoreTimestamp()); + } + return indexFile; + } + + + public IndexFile retryGetAndCreateIndexFile() { + IndexFile indexFile = null; + + + for (int times = 0; null == indexFile && times < 3; times++) { + indexFile = this.getAndCreateLastIndexFile(); + if (null != indexFile) + break; + + try { + log.error("try to create index file, " + times + " times"); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + + if (null == indexFile) { + this.defaultMessageStore.getAccessRights().makeIndexFileError(); + log.error("mark index file can not build flag"); + } + + return indexFile; + } + + + public IndexFile getAndCreateLastIndexFile() { + IndexFile indexFile = null; + IndexFile prevIndexFile = null; + long lastUpdateEndPhyOffset = 0; + long lastUpdateIndexTimestamp = 0; + + { + this.readWriteLock.readLock().lock(); + if (!this.indexFileList.isEmpty()) { + IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1); + if (!tmp.isWriteFull()) { + indexFile = tmp; + } else { + lastUpdateEndPhyOffset = tmp.getEndPhyOffset(); + lastUpdateIndexTimestamp = tmp.getEndTimestamp(); + prevIndexFile = tmp; + } + } + + this.readWriteLock.readLock().unlock(); + } + + + if (indexFile == null) { + try { + String fileName = + this.storePath + File.separator + + UtilAll.timeMillisToHumanString(System.currentTimeMillis()); + indexFile = + new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, + lastUpdateIndexTimestamp); + this.readWriteLock.writeLock().lock(); + this.indexFileList.add(indexFile); + } catch (Exception e) { + log.error("getLastIndexFile exception ", e); + } finally { + this.readWriteLock.writeLock().unlock(); + } + + + if (indexFile != null) { + final IndexFile flushThisFile = prevIndexFile; + Thread flushThread = new Thread(new Runnable() { + @Override + public void run() { + IndexService.this.flush(flushThisFile); + } + }, "FlushIndexFileThread"); + + flushThread.setDaemon(true); + flushThread.start(); + } + } + + return indexFile; + } + + + public void flush(final IndexFile f) { + if (null == f) + return; + + long indexMsgTimestamp = 0; + + if (f.isWriteFull()) { + indexMsgTimestamp = f.getEndTimestamp(); + } + + f.flush(); + + if (indexMsgTimestamp > 0) { + this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp); + this.defaultMessageStore.getStoreCheckpoint().flush(); + } + } + + + public void start() { + + } + + + public void shutdown() { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java new file mode 100644 index 0000000..89d0755 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.index; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class QueryOffsetResult { + private final List<Long> phyOffsets; + private final long indexLastUpdateTimestamp; + private final long indexLastUpdatePhyoffset; + + + public QueryOffsetResult(List<Long> phyOffsets, long indexLastUpdateTimestamp, + long indexLastUpdatePhyoffset) { + this.phyOffsets = phyOffsets; + this.indexLastUpdateTimestamp = indexLastUpdateTimestamp; + this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset; + } + + + public List<Long> getPhyOffsets() { + return phyOffsets; + } + + + public long getIndexLastUpdateTimestamp() { + return indexLastUpdateTimestamp; + } + + + public long getIndexLastUpdatePhyoffset() { + return indexLastUpdatePhyoffset; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java new file mode 100644 index 0000000..5f4720d --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.schedule; + +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + * + */ +public class DelayOffsetSerializeWrapper extends RemotingSerializable { + private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable = + new ConcurrentHashMap<Integer, Long>(32); + + + public ConcurrentHashMap<Integer, Long> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) { + this.offsetTable = offsetTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java new file mode 100644 index 0000000..e243a7e --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.schedule; + +import com.alibaba.rocketmq.common.ConfigManager; +import com.alibaba.rocketmq.common.TopicFilterType; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.running.RunningStats; +import com.alibaba.rocketmq.store.*; +import com.alibaba.rocketmq.store.config.StorePathConfigHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + * + */ +public class ScheduleMessageService extends ConfigManager { + public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private static final long FIRST_DELAY_TIME = 1000L; + private static final long DELAY_FOR_A_WHILE = 100L; + private static final long DELAY_FOR_A_PERIOD = 10000L; + + private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = + new ConcurrentHashMap<Integer, Long>(32); + + private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable = + new ConcurrentHashMap<Integer, Long>(32); + + private final Timer timer = new Timer("ScheduleMessageTimerThread", true); + + private final DefaultMessageStore defaultMessageStore; + + private int maxDelayLevel; + + + public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) { + this.defaultMessageStore = defaultMessageStore; + } + + public static int queueId2DelayLevel(final int queueId) { + return queueId + 1; + } + + public void buildRunningStats(HashMap<String, String> stats) { + Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Integer, Long> next = it.next(); + int queueId = delayLevel2QueueId(next.getKey()); + long delayOffset = next.getValue(); + long maxOffset = this.defaultMessageStore.getMaxOffsetInQuque(SCHEDULE_TOPIC, queueId); + String value = String.format("%d,%d", delayOffset, maxOffset); + String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey()); + stats.put(key, value); + } + } + + public static int delayLevel2QueueId(final int delayLevel) { + return delayLevel - 1; + } + + + private void updateOffset(int delayLevel, long offset) { + this.offsetTable.put(delayLevel, offset); + } + + + public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { + Long time = this.delayLevelTable.get(delayLevel); + if (time != null) { + return time + storeTimestamp; + } + + return storeTimestamp + 1000; + } + + + public void start() { + + for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { + Integer level = entry.getKey(); + Long timeDelay = entry.getValue(); + Long offset = this.offsetTable.get(level); + if (null == offset) { + offset = 0L; + } + + if (timeDelay != null) { + this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); + } + } + + this.timer.scheduleAtFixedRate(new TimerTask() { + + @Override + public void run() { + try { + ScheduleMessageService.this.persist(); + } catch (Exception e) { + log.error("scheduleAtFixedRate flush exception", e); + } + } + }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); + } + + + public void shutdown() { + this.timer.cancel(); + } + + + public int getMaxDelayLevel() { + return maxDelayLevel; + } + + + public String encode() { + return this.encode(false); + } + + public boolean load() { + boolean result = super.load(); + result = result && this.parseDelayLevel(); + return result; + } + + @Override + public String configFilePath() { + return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig() + .getStorePathRootDir()); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = + DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class); + if (delayOffsetSerializeWrapper != null) { + this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable()); + } + } + } + + public String encode(final boolean prettyFormat) { + DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper(); + delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable); + return delayOffsetSerializeWrapper.toJson(prettyFormat); + } + + public boolean parseDelayLevel() { + HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); + timeUnitTable.put("s", 1000L); + timeUnitTable.put("m", 1000L * 60); + timeUnitTable.put("h", 1000L * 60 * 60); + timeUnitTable.put("d", 1000L * 60 * 60 * 24); + + String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); + try { + String[] levelArray = levelString.split(" "); + for (int i = 0; i < levelArray.length; i++) { + String value = levelArray[i]; + String ch = value.substring(value.length() - 1); + Long tu = timeUnitTable.get(ch); + + int level = i + 1; + if (level > this.maxDelayLevel) { + this.maxDelayLevel = level; + } + long num = Long.parseLong(value.substring(0, value.length() - 1)); + long delayTimeMillis = tu * num; + this.delayLevelTable.put(level, delayTimeMillis); + } + } catch (Exception e) { + log.error("parseDelayLevel exception", e); + log.info("levelString String = {}", levelString); + return false; + } + + return true; + } + + class DeliverDelayedMessageTimerTask extends TimerTask { + private final int delayLevel; + private final long offset; + + + public DeliverDelayedMessageTimerTask(int delayLevel, long offset) { + this.delayLevel = delayLevel; + this.offset = offset; + } + + + @Override + public void run() { + try { + this.executeOnTimeup(); + } catch (Exception e) { + // XXX: warn and notify me + log.error("ScheduleMessageService, executeOnTimeup exception", e); + ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( + this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); + } + } + + + /** + + * + * @return + */ + private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { + + long result = deliverTimestamp; + + long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); + if (deliverTimestamp > maxTimestamp) { + result = now; + } + + return result; + } + + + public void executeOnTimeup() { + ConsumeQueue cq = + ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, + delayLevel2QueueId(delayLevel)); + + long failScheduleOffset = offset; + + if (cq != null) { + SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); + if (bufferCQ != null) { + try { + long nextOffset = offset; + int i = 0; + for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { + long offsetPy = bufferCQ.getByteBuffer().getLong(); + int sizePy = bufferCQ.getByteBuffer().getInt(); + long tagsCode = bufferCQ.getByteBuffer().getLong(); + + + long now = System.currentTimeMillis(); + long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); + + nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); + + long countdown = deliverTimestamp - now; + + if (countdown <= 0) { + MessageExt msgExt = + ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( + offsetPy, sizePy); + + if (msgExt != null) { + try { + MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); + PutMessageResult putMessageResult = + ScheduleMessageService.this.defaultMessageStore + .putMessage(msgInner); + + if (putMessageResult != null + && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + continue; + } + + else { + // XXX: warn and notify me + log.error( + "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", + msgExt.getTopic(), msgExt.getMsgId()); + ScheduleMessageService.this.timer.schedule( + new DeliverDelayedMessageTimerTask(this.delayLevel, + nextOffset), DELAY_FOR_A_PERIOD); + ScheduleMessageService.this.updateOffset(this.delayLevel, + nextOffset); + return; + } + } catch (Exception e) { + /* + * XXX: warn and notify me + + + + */ + log.error( + "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + + offsetPy + ",sizePy=" + sizePy, e); + } + } + } + + else { + ScheduleMessageService.this.timer.schedule( + new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), + countdown); + ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); + return; + } + } // end of for + + nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); + ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( + this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); + ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); + return; + } finally { + + bufferCQ.release(); + } + } // end of if (bufferCQ != null) + else { + /* + + + */ + long cqMinOffset = cq.getMinOffsetInQuque(); + if (offset < cqMinOffset) { + failScheduleOffset = cqMinOffset; + log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + + cqMinOffset + ", queueId=" + cq.getQueueId()); + } + } + } // end of if (cq != null) + + ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, + failScheduleOffset), DELAY_FOR_A_WHILE); + } + + + private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setBody(msgExt.getBody()); + msgInner.setFlag(msgExt.getFlag()); + MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + + TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); + long tagsCodeValue = + MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); + msgInner.setTagsCode(tagsCodeValue); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); + + msgInner.setSysFlag(msgExt.getSysFlag()); + msgInner.setBornTimestamp(msgExt.getBornTimestamp()); + msgInner.setBornHost(msgExt.getBornHost()); + msgInner.setStoreHost(msgExt.getStoreHost()); + msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); + + msgInner.setWaitStoreMsgOK(false); + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + + msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); + + String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID); + int queueId = Integer.parseInt(queueIdStr); + msgInner.setQueueId(queueId); + + return msgInner; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java new file mode 100644 index 0000000..539d4be --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.stats; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.store.DefaultMessageStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @author shijia.wxr + */ +public class BrokerStats { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final DefaultMessageStore defaultMessageStore; + + private volatile long msgPutTotalYesterdayMorning; + + private volatile long msgPutTotalTodayMorning; + + private volatile long msgGetTotalYesterdayMorning; + + private volatile long msgGetTotalTodayMorning; + + + public BrokerStats(DefaultMessageStore defaultMessageStore) { + this.defaultMessageStore = defaultMessageStore; + } + + + /** + + */ + public void record() { + this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning; + this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning; + + this.msgPutTotalTodayMorning = + this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal(); + this.msgGetTotalTodayMorning = + this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get(); + + log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning); + log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning); + } + + + public long getMsgPutTotalYesterdayMorning() { + return msgPutTotalYesterdayMorning; + } + + + public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) { + this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning; + } + + + public long getMsgPutTotalTodayMorning() { + return msgPutTotalTodayMorning; + } + + + public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) { + this.msgPutTotalTodayMorning = msgPutTotalTodayMorning; + } + + + public long getMsgGetTotalYesterdayMorning() { + return msgGetTotalYesterdayMorning; + } + + + public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) { + this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning; + } + + + public long getMsgGetTotalTodayMorning() { + return msgGetTotalTodayMorning; + } + + + public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) { + this.msgGetTotalTodayMorning = msgGetTotalTodayMorning; + } + + + public long getMsgPutTotalTodayNow() { + return this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal(); + } + + + public long getMsgGetTotalTodayNow() { + return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get(); + } +}
