http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-store/pom.xml b/rocketmq-store/pom.xml new file mode 100644 index 0000000..440e521 --- /dev/null +++ b/rocketmq-store/pom.xml @@ -0,0 +1,46 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-all</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-store</artifactId> + <name>rocketmq-store ${project.version}</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-common</artifactId> + </dependency> + <dependency> + <groupId>net.java.dev.jna</groupId> + <artifactId>jna</artifactId> + </dependency> + </dependencies> +</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java new file mode 100644 index 0000000..40eee7a --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java @@ -0,0 +1,342 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.ServiceThread; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.store.config.BrokerRole; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; + + +/** + * Create MappedFile in advance + * + * @author shijia.wxr + */ +public class AllocateMappedFileService extends ServiceThread { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private static int waitTimeOut = 1000 * 5; + private ConcurrentHashMap<String, AllocateRequest> requestTable = + new ConcurrentHashMap<String, AllocateRequest>(); + private PriorityBlockingQueue<AllocateRequest> requestQueue = + new PriorityBlockingQueue<AllocateRequest>(); + private volatile boolean hasException = false; + private DefaultMessageStore messageStore; + + + public AllocateMappedFileService(DefaultMessageStore messageStore) { + this.messageStore = messageStore; + } + + + public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) { + int canSubmitRequests = 2; + if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() + && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool + canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size(); + } + } + + AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize); + boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null; + + if (nextPutOK) { + if (canSubmitRequests <= 0) { + log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs()); + this.requestTable.remove(nextFilePath); + return null; + } + boolean offerOK = this.requestQueue.offer(nextReq); + if (!offerOK) { + log.warn("never expected here, add a request to preallocate queue failed"); + } + canSubmitRequests--; + } + + AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize); + boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null; + if (nextNextPutOK) { + if (canSubmitRequests <= 0) { + log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs()); + this.requestTable.remove(nextNextFilePath); + } else { + boolean offerOK = this.requestQueue.offer(nextNextReq); + if (!offerOK) { + log.warn("never expected here, add a request to preallocate queue failed"); + } + } + } + + if (hasException) { + log.warn(this.getServiceName() + " service has exception. so return null"); + return null; + } + + AllocateRequest result = this.requestTable.get(nextFilePath); + try { + if (result != null) { + boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS); + if (!waitOK) { + log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize()); + return null; + } else { + this.requestTable.remove(nextFilePath); + return result.getMappedFile(); + } + } else { + log.error("find preallocate mmap failed, this never happen"); + } + } catch (InterruptedException e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + + return null; + } + + + @Override + public String getServiceName() { + return AllocateMappedFileService.class.getSimpleName(); + } + + + public void shutdown() { + this.stopped = true; + this.thread.interrupt(); + + try { + this.thread.join(this.getJointime()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + for (AllocateRequest req : this.requestTable.values()) { + if (req.mappedFile != null) { + log.info("delete pre allocated maped file, {}", req.mappedFile.getFileName()); + req.mappedFile.destroy(1000); + } + } + } + + + public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped() && this.mmapOperation()) { + + } + log.info(this.getServiceName() + " service end"); + } + + + /** + * Only interrupted by the external thread, will return false + */ + private boolean mmapOperation() { + boolean isSuccess = false; + AllocateRequest req = null; + try { + req = this.requestQueue.take(); + AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath()); + if (null == expectedRequest) { + log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " " + + req.getFileSize()); + return true; + } + if (expectedRequest != req) { + log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " " + + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest); + return true; + } + + if (req.getMappedFile() == null) { + long beginTime = System.currentTimeMillis(); + + MappedFile mappedFile; + if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + try { + mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); + mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); + } catch (RuntimeException e) { + log.warn("Use default implementation."); + mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); + } + } else { + mappedFile = new MappedFile(req.getFilePath(), req.getFileSize()); + } + + long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime); + if (eclipseTime > 10) { + int queueSize = this.requestQueue.size(); + log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize + + " " + req.getFilePath() + " " + req.getFileSize()); + } + + // pre write mappedFile + if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig() + .getMapedFileSizeCommitLog() + && + this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(), + this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); + } + + req.setMappedFile(mappedFile); + this.hasException = false; + isSuccess = true; + } + } catch (InterruptedException e) { + log.warn(this.getServiceName() + " service has exception, maybe by shutdown"); + this.hasException = true; + return false; + } catch (IOException e) { + log.warn(this.getServiceName() + " service has exception. ", e); + this.hasException = true; + if (null != req) { + requestQueue.offer(req); + try { + Thread.sleep(1); + } catch (InterruptedException e1) { + } + } + } finally { + if (req != null && isSuccess) + req.getCountDownLatch().countDown(); + } + return true; + } + + static class AllocateRequest implements Comparable<AllocateRequest> { + // Full file path + private String filePath; + private int fileSize; + private CountDownLatch countDownLatch = new CountDownLatch(1); + private volatile MappedFile mappedFile = null; + + + public AllocateRequest(String filePath, int fileSize) { + this.filePath = filePath; + this.fileSize = fileSize; + } + + + public String getFilePath() { + return filePath; + } + + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + + public int getFileSize() { + return fileSize; + } + + + public void setFileSize(int fileSize) { + this.fileSize = fileSize; + } + + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + + public MappedFile getMappedFile() { + return mappedFile; + } + + + public void setMappedFile(MappedFile mappedFile) { + this.mappedFile = mappedFile; + } + + + public int compareTo(AllocateRequest other) { + if (this.fileSize < other.fileSize) + return 1; + else if (this.fileSize > other.fileSize) { + return -1; + } else { + int mIndex = this.filePath.lastIndexOf(File.separator); + long mName = Long.parseLong(this.filePath.substring(mIndex + 1)); + int oIndex = other.filePath.lastIndexOf(File.separator); + long oName = Long.parseLong(other.filePath.substring(oIndex + 1)); + if (mName < oName) { + return -1; + } else if (mName > oName) { + return 1; + } else { + return 0; + } + } + // return this.fileSize < other.fileSize ? 1 : this.fileSize > + // other.fileSize ? -1 : 0; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((filePath == null) ? 0 : filePath.hashCode()); + result = prime * result + fileSize; + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + AllocateRequest other = (AllocateRequest) obj; + if (filePath == null) { + if (other.filePath != null) + return false; + } else if (!filePath.equals(other.filePath)) + return false; + if (fileSize != other.fileSize) + return false; + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java new file mode 100644 index 0000000..778bd88 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import java.nio.ByteBuffer; + + +/** + * Write messages callback interface + * + * @author shijia.wxr + * + */ +public interface AppendMessageCallback { + + /** + * After message serialization, write MapedByteBuffer + * + * @param byteBuffer + * @param maxBlank + * @param msg + * + * @return How many bytes to write + */ + AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, + final int maxBlank, final MessageExtBrokerInner msg); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java new file mode 100644 index 0000000..a87a917 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +/** + * When write a message to the commit log, returns results + * + * @author shijia.wxr + */ +public class AppendMessageResult { + // Return code + private AppendMessageStatus status; + // Where to start writing + private long wroteOffset; + // Write Bytes + private int wroteBytes; + // Message ID + private String msgId; + // Message storage timestamp + private long storeTimestamp; + // Consume queue's offset(step by one) + private long logicsOffset; + private long pagecacheRT = 0; + + public AppendMessageResult(AppendMessageStatus status) { + this(status, 0, 0, "", 0, 0, 0); + } + + public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId, + long storeTimestamp, long logicsOffset, long pagecacheRT) { + this.status = status; + this.wroteOffset = wroteOffset; + this.wroteBytes = wroteBytes; + this.msgId = msgId; + this.storeTimestamp = storeTimestamp; + this.logicsOffset = logicsOffset; + this.pagecacheRT = pagecacheRT; + } + + public long getPagecacheRT() { + return pagecacheRT; + } + + public void setPagecacheRT(final long pagecacheRT) { + this.pagecacheRT = pagecacheRT; + } + + public boolean isOk() { + return this.status == AppendMessageStatus.PUT_OK; + } + + + public AppendMessageStatus getStatus() { + return status; + } + + + public void setStatus(AppendMessageStatus status) { + this.status = status; + } + + + public long getWroteOffset() { + return wroteOffset; + } + + + public void setWroteOffset(long wroteOffset) { + this.wroteOffset = wroteOffset; + } + + + public int getWroteBytes() { + return wroteBytes; + } + + + public void setWroteBytes(int wroteBytes) { + this.wroteBytes = wroteBytes; + } + + + public String getMsgId() { + return msgId; + } + + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + + public long getStoreTimestamp() { + return storeTimestamp; + } + + + public void setStoreTimestamp(long storeTimestamp) { + this.storeTimestamp = storeTimestamp; + } + + + public long getLogicsOffset() { + return logicsOffset; + } + + + public void setLogicsOffset(long logicsOffset) { + this.logicsOffset = logicsOffset; + } + + @Override + public String toString() { + return "AppendMessageResult{" + + "status=" + status + + ", wroteOffset=" + wroteOffset + + ", wroteBytes=" + wroteBytes + + ", msgId='" + msgId + '\'' + + ", storeTimestamp=" + storeTimestamp + + ", logicsOffset=" + logicsOffset + + ", pagecacheRT=" + pagecacheRT + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java new file mode 100644 index 0000000..97234c0 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +/** + * When write a message to the commit log, returns code + * + * @author shijia.wxr + * + */ +public enum AppendMessageStatus { + PUT_OK, + END_OF_FILE, + MESSAGE_SIZE_EXCEEDED, + PROPERTIES_SIZE_EXCEEDED, + UNKNOWN_ERROR, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java new file mode 100644 index 0000000..23f305d --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java @@ -0,0 +1,1296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.ServiceThread; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; +import com.alibaba.rocketmq.store.config.BrokerRole; +import com.alibaba.rocketmq.store.config.FlushDiskType; +import com.alibaba.rocketmq.store.ha.HAService; +import com.alibaba.rocketmq.store.schedule.ScheduleMessageService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * Store all metadata downtime for recovery, data protection reliability + * + * @author shijia.wxr + */ +public class CommitLog { + // Message's MAGIC CODE daa320a7 + public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8; + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + // End of file empty MAGIC CODE cbd43194 + private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8; + private final MappedFileQueue mappedFileQueue; + private final DefaultMessageStore defaultMessageStore; + private final FlushCommitLogService flushCommitLogService; + + //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods + private final FlushCommitLogService commitLogService; + + private final AppendMessageCallback appendMessageCallback; + private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024); + private volatile long confirmOffset = -1L; + + private volatile long beginTimeInLock = 0; + + //true: Can lock, false : in lock. + private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); + + private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync + + public CommitLog(final DefaultMessageStore defaultMessageStore) { + this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), + defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); + this.defaultMessageStore = defaultMessageStore; + + if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { + this.flushCommitLogService = new GroupCommitService(); + } else { + this.flushCommitLogService = new FlushRealTimeService(); + } + + this.commitLogService = new CommitRealTimeService(); + + this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + } + + public boolean load() { + boolean result = this.mappedFileQueue.load(); + log.info("load commit log " + (result ? "OK" : "Failed")); + return result; + } + + public void start() { + this.flushCommitLogService.start(); + + if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + this.commitLogService.start(); + } + } + + public void shutdown() { + if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + this.commitLogService.shutdown(); + } + + this.flushCommitLogService.shutdown(); + } + + public long flush() { + this.mappedFileQueue.commit(0); + this.mappedFileQueue.flush(0); + return this.mappedFileQueue.getFlushedWhere(); + } + + public long getMaxOffset() { + return this.mappedFileQueue.getMaxOffset(); + } + + public long remainHowManyDataToCommit() { + return this.mappedFileQueue.remainHowManyDataToCommit(); + } + + public long remainHowManyDataToFlush() { + return this.mappedFileQueue.remainHowManyDataToFlush(); + } + + + public int deleteExpiredFile(// + final long expiredTime, // + final int deleteFilesInterval, // + final long intervalForcibly, // + final boolean cleanImmediately// + ) { + return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); + } + + + /** + * Read CommitLog data, use data replication + */ + public SelectMappedBufferResult getData(final long offset) { + return this.getData(offset, offset == 0); + } + + + public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound); + if (mappedFile != null) { + int pos = (int) (offset % mappedFileSize); + SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); + return result; + } + + return null; + } + + + /** + * When the normal exit, data recovery, all memory data have been flush + */ + public void recoverNormally() { + boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); + final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); + if (!mappedFiles.isEmpty()) { + // Began to recover from the last third file + int index = mappedFiles.size() - 3; + if (index < 0) + index = 0; + + MappedFile mappedFile = mappedFiles.get(index); + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + long processOffset = mappedFile.getFileFromOffset(); + long mappedFileOffset = 0; + while (true) { + DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); + int size = dispatchRequest.getMsgSize(); + // Normal data + if (dispatchRequest.isSuccess() && size > 0) { + mappedFileOffset += size; + } + // Come the end of the file, switch to the next file Since the + // return 0 representatives met last hole, + // this can not be included in truncate offset + else if (dispatchRequest.isSuccess() && size == 0) { + index++; + if (index >= mappedFiles.size()) { + // Current branch can not happen + log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName()); + break; + } else { + mappedFile = mappedFiles.get(index); + byteBuffer = mappedFile.sliceByteBuffer(); + processOffset = mappedFile.getFileFromOffset(); + mappedFileOffset = 0; + log.info("recover next physics file, " + mappedFile.getFileName()); + } + } + // Intermediate file read error + else if (!dispatchRequest.isSuccess()) { + log.info("recover physics file end, " + mappedFile.getFileName()); + break; + } + } + + processOffset += mappedFileOffset; + this.mappedFileQueue.setFlushedWhere(processOffset); + this.mappedFileQueue.setCommittedWhere(processOffset); + this.mappedFileQueue.truncateDirtyFiles(processOffset); + } + } + + public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC) { + return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true); + } + + private void doNothingForDeadCode(final Object obj) { + if (obj != null) { + if (log.isDebugEnabled()) { + log.debug(String.valueOf(obj.hashCode())); + } + } + } + + /** + * check the message and returns the message size + * + * @return 0 Come the end of the file // >0 Normal messages // -1 Message + * checksum failure + */ + public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { + try { + // 1 TOTAL SIZE + int totalSize = byteBuffer.getInt(); + + // 2 MAGIC CODE + int magicCode = byteBuffer.getInt(); + switch (magicCode) { + case MESSAGE_MAGIC_CODE: + break; + case BLANK_MAGIC_CODE: + return new DispatchRequest(0, true /* success */); + default: + log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode)); + return new DispatchRequest(-1, false /* success */); + } + + byte[] bytesContent = new byte[totalSize]; + + // 3 BODYCRC + int bodyCRC = byteBuffer.getInt(); + + // 4 QUEUEID + int queueId = byteBuffer.getInt(); + + // 5 FLAG + int flag = byteBuffer.getInt(); + + // 6 QUEUEOFFSET + long queueOffset = byteBuffer.getLong(); + + // 7 PHYSICALOFFSET + long physicOffset = byteBuffer.getLong(); + + // 8 SYSFLAG + int sysFlag = byteBuffer.getInt(); + + // 9 BORNTIMESTAMP + long bornTimeStamp = byteBuffer.getLong(); + + // 10 + ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8); + + // 11 STORETIMESTAMP + long storeTimestamp = byteBuffer.getLong(); + + // 12 + ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8); + + // 13 RECONSUMETIMES + int reconsumeTimes = byteBuffer.getInt(); + + // 14 Prepared Transaction Offset + long preparedTransactionOffset = byteBuffer.getLong(); + + // 15 BODY + int bodyLen = byteBuffer.getInt(); + if (bodyLen > 0) { + if (readBody) { + byteBuffer.get(bytesContent, 0, bodyLen); + + if (checkCRC) { + int crc = UtilAll.crc32(bytesContent, 0, bodyLen); + if (crc != bodyCRC) { + log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); + return new DispatchRequest(-1, false/* success */); + } + } + } else { + byteBuffer.position(byteBuffer.position() + bodyLen); + } + } + + // 16 TOPIC + byte topicLen = byteBuffer.get(); + byteBuffer.get(bytesContent, 0, topicLen); + String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8); + + long tagsCode = 0; + String keys = ""; + String uniqKey = null; + + // 17 properties + short propertiesLength = byteBuffer.getShort(); + if (propertiesLength > 0) { + byteBuffer.get(bytesContent, 0, propertiesLength); + String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8); + Map<String, String> propertiesMap = MessageDecoder.string2messageProperties(properties); + + keys = propertiesMap.get(MessageConst.PROPERTY_KEYS); + + uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + + String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS); + if (tags != null && tags.length() > 0) { + tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags); + } + + // Timing message processing + { + String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); + if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) { + int delayLevel = Integer.parseInt(t); + + if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { + delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); + } + + if (delayLevel > 0) { + tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, + storeTimestamp); + } + } + } + } + + int readLength = calMsgLength(bodyLen, topicLen, propertiesLength); + if (totalSize != readLength) { + doNothingForDeadCode(reconsumeTimes); + doNothingForDeadCode(flag); + doNothingForDeadCode(bornTimeStamp); + doNothingForDeadCode(byteBuffer1); + doNothingForDeadCode(byteBuffer2); + log.error( + "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}", + totalSize, readLength, bodyLen, topicLen, propertiesLength); + return new DispatchRequest(totalSize, false/* success */); + } + + return new DispatchRequest(// + topic, // 1 + queueId, // 2 + physicOffset, // 3 + totalSize, // 4 + tagsCode, // 5 + storeTimestamp, // 6 + queueOffset, // 7 + keys, // 8 + uniqKey, //9 + sysFlag, // 9 + preparedTransactionOffset// 10 + ); + } catch (Exception e) { + } + + return new DispatchRequest(-1, false /* success */); + } + + private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { + final int msgLen = 4 // 1 TOTALSIZE + + 4 // 2 MAGICCODE + + 4 // 3 BODYCRC + + 4 // 4 QUEUEID + + 4 // 5 FLAG + + 8 // 6 QUEUEOFFSET + + 8 // 7 PHYSICALOFFSET + + 4 // 8 SYSFLAG + + 8 // 9 BORNTIMESTAMP + + 8 // 10 BORNHOST + + 8 // 11 STORETIMESTAMP + + 8 // 12 STOREHOSTADDRESS + + 4 // 13 RECONSUMETIMES + + 8 // 14 Prepared Transaction Offset + + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY + + 1 + topicLength // 15 TOPIC + + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16 + // propertiesLength + + 0; + return msgLen; + } + + public long getConfirmOffset() { + return this.confirmOffset; + } + + public void setConfirmOffset(long phyOffset) { + this.confirmOffset = phyOffset; + } + + public void recoverAbnormally() { + // recover by the minimum time stamp + boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); + final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); + if (!mappedFiles.isEmpty()) { + // Looking beginning to recover from which file + int index = mappedFiles.size() - 1; + MappedFile mappedFile = null; + for (; index >= 0; index--) { + mappedFile = mappedFiles.get(index); + if (this.isMappedFileMatchedRecover(mappedFile)) { + log.info("recover from this maped file " + mappedFile.getFileName()); + break; + } + } + + if (index < 0) { + index = 0; + mappedFile = mappedFiles.get(index); + } + + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + long processOffset = mappedFile.getFileFromOffset(); + long mappedFileOffset = 0; + while (true) { + DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); + int size = dispatchRequest.getMsgSize(); + + // Normal data + if (size > 0) { + mappedFileOffset += size; + + + if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { + if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { + this.defaultMessageStore.doDispatch(dispatchRequest); + } + } else { + this.defaultMessageStore.doDispatch(dispatchRequest); + } + } + // Intermediate file read error + else if (size == -1) { + log.info("recover physics file end, " + mappedFile.getFileName()); + break; + } + // Come the end of the file, switch to the next file + // Since the return 0 representatives met last hole, this can + // not be included in truncate offset + else if (size == 0) { + index++; + if (index >= mappedFiles.size()) { + // The current branch under normal circumstances should + // not happen + log.info("recover physics file over, last maped file " + mappedFile.getFileName()); + break; + } else { + mappedFile = mappedFiles.get(index); + byteBuffer = mappedFile.sliceByteBuffer(); + processOffset = mappedFile.getFileFromOffset(); + mappedFileOffset = 0; + log.info("recover next physics file, " + mappedFile.getFileName()); + } + } + } + + processOffset += mappedFileOffset; + this.mappedFileQueue.setFlushedWhere(processOffset); + this.mappedFileQueue.setCommittedWhere(processOffset); + this.mappedFileQueue.truncateDirtyFiles(processOffset); + + // Clear ConsumeQueue redundant data + this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); + } + // Commitlog case files are deleted + else { + this.mappedFileQueue.setFlushedWhere(0); + this.mappedFileQueue.setCommittedWhere(0); + this.defaultMessageStore.destroyLogics(); + } + } + + private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) { + ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); + + int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION); + if (magicCode != MESSAGE_MAGIC_CODE) { + return false; + } + + long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); + if (0 == storeTimestamp) { + return false; + } + + if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()// + && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { + if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { + log.info("find check timestamp, {} {}", // + storeTimestamp, // + UtilAll.timeMillisToHumanString(storeTimestamp)); + return true; + } + } else { + if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { + log.info("find check timestamp, {} {}", // + storeTimestamp, // + UtilAll.timeMillisToHumanString(storeTimestamp)); + return true; + } + } + + return false; + } + + private void notifyMessageArriving() { + + } + + public boolean resetOffset(long offset) { + return this.mappedFileQueue.resetOffset(offset); + } + + public long getBeginTimeInLock() { + return beginTimeInLock; + } + + public PutMessageResult putMessage(final MessageExtBrokerInner msg) { + // Set the storage time + msg.setStoreTimestamp(System.currentTimeMillis()); + // Set the message body BODY CRC (consider the most appropriate setting + // on the client) + msg.setBodyCRC(UtilAll.crc32(msg.getBody())); + // Back to Results + AppendMessageResult result = null; + + StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); + + String topic = msg.getTopic(); + int queueId = msg.getQueueId(); + + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// + || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + // Delay Delivery + if (msg.getDelayTimeLevel() > 0) { + if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { + msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); + } + + topic = ScheduleMessageService.SCHEDULE_TOPIC; + queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); + + // Backup real topic, queueId + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + + msg.setTopic(topic); + msg.setQueueId(queueId); + } + } + + long eclipseTimeInLock = 0; + MappedFile unlockMappedFile = null; + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + + lockForPutMessage(); //spin... + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; + + // Here settings are stored timestamp, in order to ensure an orderly + // global + msg.setStoreTimestamp(beginLockTimestamp); + + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); + } + + result = mappedFile.appendMessage(msg, this.appendMessageCallback); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); + } + result = mappedFile.appendMessage(msg, this.appendMessageCallback); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); + case UNKNOWN_ERROR: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + default: + beginTimeInLock = 0; + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); + } + + eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + beginTimeInLock = 0; + } finally { + releasePutMessageLock(); + } + + if (eclipseTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result); + } + + if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + this.defaultMessageStore.unlockMappedFile(unlockMappedFile); + } + + + PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); + + // Statistics + storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); + storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); + + GroupCommitRequest request = null; + + // Synchronization flush + if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { + final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; + if (msg.isWaitStoreMsgOK()) { + request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); + service.putRequest(request); + boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + if (!flushOK) { + log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() + + " client address: " + msg.getBornHostString()); + putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); + } + } else { + service.wakeup(); + } + } + // Asynchronous flush + else { + if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { + flushCommitLogService.wakeup(); + } else { + commitLogService.wakeup(); + } + } + + // Synchronous write double + if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { + HAService service = this.defaultMessageStore.getHaService(); + if (msg.isWaitStoreMsgOK()) { + // Determine whether to wait + if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { + if (null == request) { + request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); + } + service.putRequest(request); + + service.getWaitNotifyObject().wakeupAll(); + + boolean flushOK = + // TODO + request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + if (!flushOK) { + log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " + + msg.getTags() + " client address: " + msg.getBornHostString()); + putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); + } + } + // Slave problem + else { + // Tell the producer, slave not available + putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); + } + } + } + + return putMessageResult; + } + + /** + * According to receive certain message or offset storage time if an error + * occurs, it returns -1 + */ + public long pickupStoreTimestamp(final long offset, final int size) { + if (offset >= this.getMinOffset()) { + SelectMappedBufferResult result = this.getMessage(offset, size); + if (null != result) { + try { + return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); + } finally { + result.release(); + } + } + } + + return -1; + } + + public long getMinOffset() { + MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); + if (mappedFile != null) { + if (mappedFile.isAvailable()) { + return mappedFile.getFileFromOffset(); + } else { + return this.rollNextFile(mappedFile.getFileFromOffset()); + } + } + + return -1; + } + + public SelectMappedBufferResult getMessage(final long offset, final int size) { + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); + if (mappedFile != null) { + int pos = (int) (offset % mappedFileSize); + return mappedFile.selectMappedBuffer(pos, size); + } + return null; + } + + public long rollNextFile(final long offset) { + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); + return offset + mappedFileSize - offset % mappedFileSize; + } + + public HashMap<String, Long> getTopicQueueTable() { + return topicQueueTable; + } + + + public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) { + this.topicQueueTable = topicQueueTable; + } + + + public void destroy() { + this.mappedFileQueue.destroy(); + } + + + public boolean appendData(long startOffset, byte[] data) { + lockForPutMessage(); //spin... + try { + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); + if (null == mappedFile) { + log.error("appendData getLastMappedFile error " + startOffset); + return false; + } + + return mappedFile.appendMessage(data); + } finally { + releasePutMessageLock(); + } + } + + + public boolean retryDeleteFirstFile(final long intervalForcibly) { + return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly); + } + + public void removeQueueFromTopicQueueTable(final String topic, final int queueId) { + String key = topic + "-" + queueId; + synchronized (this) { + this.topicQueueTable.remove(key); + } + + log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); + } + + public void checkSelf() { + mappedFileQueue.checkSelf(); + } + + abstract class FlushCommitLogService extends ServiceThread { + protected static final int RETRY_TIMES_OVER = 10; + } + + class CommitRealTimeService extends FlushCommitLogService { + + private long lastCommitTimestamp = 0; + + @Override + public String getServiceName() { + return CommitRealTimeService.class.getSimpleName(); + } + + @Override + public void run() { + CommitLog.log.info(this.getServiceName() + " service started"); + while (!this.isStopped()) { + int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); + + int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); + + int commitDataThoroughInterval = + CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); + + long begin = System.currentTimeMillis(); + if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { + this.lastCommitTimestamp = begin; + commitDataLeastPages = 0; + } + + try { + boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); + long end = System.currentTimeMillis(); + if (!result) { + this.lastCommitTimestamp = end; // result = false means some data committed. + //now wake up flush thread. + flushCommitLogService.wakeup(); + } + + if (end - begin > 500) { + log.info("Commit data to file costs {} ms", end - begin); + } + this.waitForRunning(interval); + } catch (Throwable e) { + CommitLog.log.error(this.getServiceName() + " service has exception. ", e); + } + } + + boolean result = false; + for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { + result = CommitLog.this.mappedFileQueue.commit(0); + CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); + } + CommitLog.log.info(this.getServiceName() + " service end"); + } + } + + class FlushRealTimeService extends FlushCommitLogService { + private long lastFlushTimestamp = 0; + private long printTimes = 0; + + + public void run() { + CommitLog.log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); + + int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); + int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); + + int flushPhysicQueueThoroughInterval = + CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); + + boolean printFlushProgress = false; + + // Print flush progress + long currentTimeMillis = System.currentTimeMillis(); + if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { + this.lastFlushTimestamp = currentTimeMillis; + flushPhysicQueueLeastPages = 0; + printFlushProgress = (printTimes++ % 10) == 0; + } + + try { + if (flushCommitLogTimed) { + Thread.sleep(interval); + } else { + this.waitForRunning(interval); + } + + if (printFlushProgress) { + this.printFlushProgress(); + } + + long begin = System.currentTimeMillis(); + CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); + long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); + if (storeTimestamp > 0) { + CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); + } + long past = System.currentTimeMillis() - begin; + if (past > 500) { + log.info("Flush data to disk costs {} ms", past); + } + } catch (Throwable e) { + CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); + this.printFlushProgress(); + } + } + + // Normal shutdown, to ensure that all the flush before exit + boolean result = false; + for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { + result = CommitLog.this.mappedFileQueue.flush(0); + CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); + } + + this.printFlushProgress(); + + CommitLog.log.info(this.getServiceName() + " service end"); + } + + + @Override + public String getServiceName() { + return FlushRealTimeService.class.getSimpleName(); + } + + + private void printFlushProgress() { + // CommitLog.log.info("how much disk fall behind memory, " + // + CommitLog.this.mappedFileQueue.howMuchFallBehind()); + } + + + @Override + public long getJointime() { + return 1000 * 60 * 5; + } + } + + public static class GroupCommitRequest { + private final long nextOffset; + private final CountDownLatch countDownLatch = new CountDownLatch(1); + private volatile boolean flushOK = false; + + + public GroupCommitRequest(long nextOffset) { + this.nextOffset = nextOffset; + } + + + public long getNextOffset() { + return nextOffset; + } + + + public void wakeupCustomer(final boolean flushOK) { + this.flushOK = flushOK; + this.countDownLatch.countDown(); + } + + + public boolean waitForFlush(long timeout) { + try { + this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); + return this.flushOK; + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + } + } + + /** + * GroupCommit Service + */ + class GroupCommitService extends FlushCommitLogService { + private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); + private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); + + + public void putRequest(final GroupCommitRequest request) { + synchronized (this) { + this.requestsWrite.add(request); + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify + } + } + } + + + private void swapRequests() { + List<GroupCommitRequest> tmp = this.requestsWrite; + this.requestsWrite = this.requestsRead; + this.requestsRead = tmp; + } + + + private void doCommit() { + if (!this.requestsRead.isEmpty()) { + for (GroupCommitRequest req : this.requestsRead) { + // There may be a message in the next file, so a maximum of + // two times the flush + boolean flushOK = false; + for (int i = 0; i < 2 && !flushOK; i++) { + flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); + + if (!flushOK) { + CommitLog.this.mappedFileQueue.flush(0); + } + } + + req.wakeupCustomer(flushOK); + } + + long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); + if (storeTimestamp > 0) { + CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); + } + + this.requestsRead.clear(); + } else { + // Because of individual messages is set to not sync flush, it + // will come to this process + CommitLog.this.mappedFileQueue.flush(0); + } + } + + + public void run() { + CommitLog.log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + this.waitForRunning(0); + this.doCommit(); + } catch (Exception e) { + CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + // Under normal circumstances shutdown, wait for the arrival of the + // request, and then flush + try { + Thread.sleep(10); + } catch (InterruptedException e) { + CommitLog.log.warn("GroupCommitService Exception, ", e); + } + + synchronized (this) { + this.swapRequests(); + } + + this.doCommit(); + + CommitLog.log.info(this.getServiceName() + " service end"); + } + + + @Override + protected void onWaitEnd() { + this.swapRequests(); + } + + + @Override + public String getServiceName() { + return GroupCommitService.class.getSimpleName(); + } + + + @Override + public long getJointime() { + return 1000 * 60 * 5; + } + } + + class DefaultAppendMessageCallback implements AppendMessageCallback { + // File at the end of the minimum fixed length empty + private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; + private final ByteBuffer msgIdMemory; + // Store the message content + private final ByteBuffer msgStoreItemMemory; + // The maximum length of the message + private final int maxMessageSize; + // Build Message Key + private final StringBuilder keyBuilder = new StringBuilder(); + + private final ByteBuffer hostHolder = ByteBuffer.allocate(8); + + + DefaultAppendMessageCallback(final int size) { + this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); + this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH); + this.maxMessageSize = size; + } + + + public ByteBuffer getMsgStoreItemMemory() { + return msgStoreItemMemory; + } + + + public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { + // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> + + // PHY OFFSET + long wroteOffset = fileFromOffset + byteBuffer.position(); + + this.resetByteBuffer(hostHolder, 8); + String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); + + // Record ConsumeQueue information + keyBuilder.setLength(0); + keyBuilder.append(msgInner.getTopic()); + keyBuilder.append('-'); + keyBuilder.append(msgInner.getQueueId()); + String key = keyBuilder.toString(); + Long queueOffset = CommitLog.this.topicQueueTable.get(key); + if (null == queueOffset) { + queueOffset = 0L; + CommitLog.this.topicQueueTable.put(key, queueOffset); + } + + // Transaction messages that require special handling + final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); + switch (tranType) { + // Prepared and Rollback message is not consumed, will not enter the + // consumer queuec + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + queueOffset = 0L; + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + default: + break; + } + + /** + * Serialize message + */ + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); + } + + final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData == null ? 0 : topicData.length; + + final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; + + final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); + + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + + ", maxMessageSize: " + this.maxMessageSize); + return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); + } + + // Determines whether there is sufficient free space + if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { + this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); + // 1 TOTALSIZE + this.msgStoreItemMemory.putInt(maxBlank); + // 2 MAGICCODE + this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); + // 3 The remaining space may be any value + // + + // Here the length of the specially set maxBlank + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); + byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), + queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + } + + // Initialization of storage space + this.resetByteBuffer(msgStoreItemMemory, msgLen); + // 1 TOTALSIZE + this.msgStoreItemMemory.putInt(msgLen); + // 2 MAGICCODE + this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); + // 4 QUEUEID + this.msgStoreItemMemory.putInt(msgInner.getQueueId()); + // 5 FLAG + this.msgStoreItemMemory.putInt(msgInner.getFlag()); + // 6 QUEUEOFFSET + this.msgStoreItemMemory.putLong(queueOffset); + // 7 PHYSICALOFFSET + this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); + // 8 SYSFLAG + this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); + // 9 BORNTIMESTAMP + this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); + // 10 BORNHOST + this.resetByteBuffer(hostHolder, 8); + this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); + // 11 STORETIMESTAMP + this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + this.resetByteBuffer(hostHolder, 8); + this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); + //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes()); + // 13 RECONSUMETIMES + this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + this.msgStoreItemMemory.putInt(bodyLength); + if (bodyLength > 0) + this.msgStoreItemMemory.put(msgInner.getBody()); + // 16 TOPIC + this.msgStoreItemMemory.put((byte) topicLength); + this.msgStoreItemMemory.put(topicData); + // 17 PROPERTIES + this.msgStoreItemMemory.putShort(propertiesLength); + if (propertiesLength > 0) + this.msgStoreItemMemory.put(propertiesData); + + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); + // Write messages to the queue buffer + byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); + + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, + msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + + switch (tranType) { + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + // The next update ConsumeQueue information + CommitLog.this.topicQueueTable.put(key, ++queueOffset); + break; + default: + break; + } + return result; + } + + + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { + byteBuffer.flip(); + byteBuffer.limit(limit); + } + } + + public long lockTimeMills() { + long diff = 0; + long begin = this.beginTimeInLock; + if (begin > 0) { + diff = this.defaultMessageStore.now() - begin; + } + + if (diff < 0) { + diff = 0; + } + + return diff; + } + + /** + * Spin util acquired the lock. + */ + private void lockForPutMessage() { + if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { + putMessageNormalLock.lock(); + } else { + boolean flag; + do { + flag = this.putMessageSpinLock.compareAndSet(true, false); + } while (!flag); + } + } + + private void releasePutMessageLock() { + if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { + putMessageNormalLock.unlock(); + } else { + this.putMessageSpinLock.compareAndSet(false, true); + } + } +}
