http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-store/pom.xml b/rocketmq-store/pom.xml
new file mode 100644
index 0000000..440e521
--- /dev/null
+++ b/rocketmq-store/pom.xml
@@ -0,0 +1,46 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-store</artifactId>
+    <name>rocketmq-store ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>net.java.dev.jna</groupId>
+            <artifactId>jna</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
new file mode 100644
index 0000000..40eee7a
--- /dev/null
+++ 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Create MappedFile in advance
+ *
+ * @author shijia.wxr
+ */
+public class AllocateMappedFileService extends ServiceThread {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static int waitTimeOut = 1000 * 5;
+    private ConcurrentHashMap<String, AllocateRequest> requestTable =
+            new ConcurrentHashMap<String, AllocateRequest>();
+    private PriorityBlockingQueue<AllocateRequest> requestQueue =
+            new PriorityBlockingQueue<AllocateRequest>();
+    private volatile boolean hasException = false;
+    private DefaultMessageStore messageStore;
+
+
+    public AllocateMappedFileService(DefaultMessageStore messageStore) {
+        this.messageStore = messageStore;
+    }
+
+
+    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, 
String nextNextFilePath, int fileSize) {
+        int canSubmitRequests = 2;
+        if 
(this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+            if 
(this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
+                    && BrokerRole.SLAVE != 
this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is 
slave, don't fast fail even no buffer in pool
+                canSubmitRequests = 
this.messageStore.getTransientStorePool().remainBufferNumbs() - 
this.requestQueue.size();
+            }
+        }
+
+        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
+        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, 
nextReq) == null;
+
+        if (nextPutOK) {
+            if (canSubmitRequests <= 0) {
+                log.warn("[NOTIFYME]TransientStorePool is not enough, so 
create mapped file error, " +
+                        "RequestQueueSize : {}, StorePoolSize: {}", 
this.requestQueue.size(), 
this.messageStore.getTransientStorePool().remainBufferNumbs());
+                this.requestTable.remove(nextFilePath);
+                return null;
+            }
+            boolean offerOK = this.requestQueue.offer(nextReq);
+            if (!offerOK) {
+                log.warn("never expected here, add a request to preallocate 
queue failed");
+            }
+            canSubmitRequests--;
+        }
+
+        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, 
fileSize);
+        boolean nextNextPutOK = 
this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
+        if (nextNextPutOK) {
+            if (canSubmitRequests <= 0) {
+                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip 
preallocate mapped file, " +
+                        "RequestQueueSize : {}, StorePoolSize: {}", 
this.requestQueue.size(), 
this.messageStore.getTransientStorePool().remainBufferNumbs());
+                this.requestTable.remove(nextNextFilePath);
+            } else {
+                boolean offerOK = this.requestQueue.offer(nextNextReq);
+                if (!offerOK) {
+                    log.warn("never expected here, add a request to 
preallocate queue failed");
+                }
+            }
+        }
+
+        if (hasException) {
+            log.warn(this.getServiceName() + " service has exception. so 
return null");
+            return null;
+        }
+
+        AllocateRequest result = this.requestTable.get(nextFilePath);
+        try {
+            if (result != null) {
+                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, 
TimeUnit.MILLISECONDS);
+                if (!waitOK) {
+                    log.warn("create mmap timeout " + result.getFilePath() + " 
" + result.getFileSize());
+                    return null;
+                } else {
+                    this.requestTable.remove(nextFilePath);
+                    return result.getMappedFile();
+                }
+            } else {
+                log.error("find preallocate mmap failed, this never happen");
+            }
+        } catch (InterruptedException e) {
+            log.warn(this.getServiceName() + " service has exception. ", e);
+        }
+
+        return null;
+    }
+
+
+    @Override
+    public String getServiceName() {
+        return AllocateMappedFileService.class.getSimpleName();
+    }
+
+
+    public void shutdown() {
+        this.stopped = true;
+        this.thread.interrupt();
+
+        try {
+            this.thread.join(this.getJointime());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        for (AllocateRequest req : this.requestTable.values()) {
+            if (req.mappedFile != null) {
+                log.info("delete pre allocated maped file, {}", 
req.mappedFile.getFileName());
+                req.mappedFile.destroy(1000);
+            }
+        }
+    }
+
+
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+
+        while (!this.isStopped() && this.mmapOperation()) {
+
+        }
+        log.info(this.getServiceName() + " service end");
+    }
+
+
+    /**
+     * Only interrupted by the external thread, will return false
+     */
+    private boolean mmapOperation() {
+        boolean isSuccess = false;
+        AllocateRequest req = null;
+        try {
+            req = this.requestQueue.take();
+            AllocateRequest expectedRequest = 
this.requestTable.get(req.getFilePath());
+            if (null == expectedRequest) {
+                log.warn("this mmap request expired, maybe cause timeout " + 
req.getFilePath() + " "
+                        + req.getFileSize());
+                return true;
+            }
+            if (expectedRequest != req) {
+                log.warn("never expected here,  maybe cause timeout " + 
req.getFilePath() + " "
+                        + req.getFileSize() + ", req:" + req + ", 
expectedRequest:" + expectedRequest);
+                return true;
+            }
+
+            if (req.getMappedFile() == null) {
+                long beginTime = System.currentTimeMillis();
+
+                MappedFile mappedFile;
+                if 
(messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+                    try {
+                        mappedFile = 
ServiceLoader.load(MappedFile.class).iterator().next();
+                        mappedFile.init(req.getFilePath(), req.getFileSize(), 
messageStore.getTransientStorePool());
+                    } catch (RuntimeException e) {
+                        log.warn("Use default implementation.");
+                        mappedFile = new MappedFile(req.getFilePath(), 
req.getFileSize(), messageStore.getTransientStorePool());
+                    }
+                } else {
+                    mappedFile = new MappedFile(req.getFilePath(), 
req.getFileSize());
+                }
+
+                long eclipseTime = 
UtilAll.computeEclipseTimeMilliseconds(beginTime);
+                if (eclipseTime > 10) {
+                    int queueSize = this.requestQueue.size();
+                    log.warn("create mappedFile spent time(ms) " + eclipseTime 
+ " queue size " + queueSize
+                            + " " + req.getFilePath() + " " + 
req.getFileSize());
+                }
+
+                // pre write mappedFile
+                if (mappedFile.getFileSize() >= 
this.messageStore.getMessageStoreConfig()
+                        .getMapedFileSizeCommitLog()
+                        &&
+                        
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+                    
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
+                            
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
+                }
+
+                req.setMappedFile(mappedFile);
+                this.hasException = false;
+                isSuccess = true;
+            }
+        } catch (InterruptedException e) {
+            log.warn(this.getServiceName() + " service has exception, maybe by 
shutdown");
+            this.hasException = true;
+            return false;
+        } catch (IOException e) {
+            log.warn(this.getServiceName() + " service has exception. ", e);
+            this.hasException = true;
+            if (null != req) {
+                requestQueue.offer(req);
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException e1) {
+                }
+            }
+        } finally {
+            if (req != null && isSuccess)
+                req.getCountDownLatch().countDown();
+        }
+        return true;
+    }
+
+    static class AllocateRequest implements Comparable<AllocateRequest> {
+        // Full file path
+        private String filePath;
+        private int fileSize;
+        private CountDownLatch countDownLatch = new CountDownLatch(1);
+        private volatile MappedFile mappedFile = null;
+
+
+        public AllocateRequest(String filePath, int fileSize) {
+            this.filePath = filePath;
+            this.fileSize = fileSize;
+        }
+
+
+        public String getFilePath() {
+            return filePath;
+        }
+
+
+        public void setFilePath(String filePath) {
+            this.filePath = filePath;
+        }
+
+
+        public int getFileSize() {
+            return fileSize;
+        }
+
+
+        public void setFileSize(int fileSize) {
+            this.fileSize = fileSize;
+        }
+
+
+        public CountDownLatch getCountDownLatch() {
+            return countDownLatch;
+        }
+
+
+        public void setCountDownLatch(CountDownLatch countDownLatch) {
+            this.countDownLatch = countDownLatch;
+        }
+
+
+        public MappedFile getMappedFile() {
+            return mappedFile;
+        }
+
+
+        public void setMappedFile(MappedFile mappedFile) {
+            this.mappedFile = mappedFile;
+        }
+
+
+        public int compareTo(AllocateRequest other) {
+            if (this.fileSize < other.fileSize)
+                return 1;
+            else if (this.fileSize > other.fileSize) {
+                return -1;
+            } else {
+                int mIndex = this.filePath.lastIndexOf(File.separator);
+                long mName = Long.parseLong(this.filePath.substring(mIndex + 
1));
+                int oIndex = other.filePath.lastIndexOf(File.separator);
+                long oName = Long.parseLong(other.filePath.substring(oIndex + 
1));
+                if (mName < oName) {
+                    return -1;
+                } else if (mName > oName) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+            // return this.fileSize < other.fileSize ? 1 : this.fileSize >
+            // other.fileSize ? -1 : 0;
+        }
+
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((filePath == null) ? 0 : 
filePath.hashCode());
+            result = prime * result + fileSize;
+            return result;
+        }
+
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            AllocateRequest other = (AllocateRequest) obj;
+            if (filePath == null) {
+                if (other.filePath != null)
+                    return false;
+            } else if (!filePath.equals(other.filePath))
+                return false;
+            if (fileSize != other.fileSize)
+                return false;
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
new file mode 100644
index 0000000..778bd88
--- /dev/null
+++ 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageCallback.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Write messages callback interface
+ *
+ * @author shijia.wxr
+ *
+ */
+public interface AppendMessageCallback {
+
+    /**
+     * After message serialization, write MapedByteBuffer
+     *
+     * @param byteBuffer
+     * @param maxBlank
+     * @param msg
+     *
+     * @return How many bytes to write
+     */
+    AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer 
byteBuffer,
+                                 final int maxBlank, final 
MessageExtBrokerInner msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
new file mode 100644
index 0000000..a87a917
--- /dev/null
+++ 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageResult.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * When write a message to the commit log, returns results
+ *
+ * @author shijia.wxr
+ */
+public class AppendMessageResult {
+    // Return code
+    private AppendMessageStatus status;
+    // Where to start writing
+    private long wroteOffset;
+    // Write Bytes
+    private int wroteBytes;
+    // Message ID
+    private String msgId;
+    // Message storage timestamp
+    private long storeTimestamp;
+    // Consume queue's offset(step by one)
+    private long logicsOffset;
+    private long pagecacheRT = 0;
+
+    public AppendMessageResult(AppendMessageStatus status) {
+        this(status, 0, 0, "", 0, 0, 0);
+    }
+
+    public AppendMessageResult(AppendMessageStatus status, long wroteOffset, 
int wroteBytes, String msgId,
+                               long storeTimestamp, long logicsOffset, long 
pagecacheRT) {
+        this.status = status;
+        this.wroteOffset = wroteOffset;
+        this.wroteBytes = wroteBytes;
+        this.msgId = msgId;
+        this.storeTimestamp = storeTimestamp;
+        this.logicsOffset = logicsOffset;
+        this.pagecacheRT = pagecacheRT;
+    }
+
+    public long getPagecacheRT() {
+        return pagecacheRT;
+    }
+
+    public void setPagecacheRT(final long pagecacheRT) {
+        this.pagecacheRT = pagecacheRT;
+    }
+
+    public boolean isOk() {
+        return this.status == AppendMessageStatus.PUT_OK;
+    }
+
+
+    public AppendMessageStatus getStatus() {
+        return status;
+    }
+
+
+    public void setStatus(AppendMessageStatus status) {
+        this.status = status;
+    }
+
+
+    public long getWroteOffset() {
+        return wroteOffset;
+    }
+
+
+    public void setWroteOffset(long wroteOffset) {
+        this.wroteOffset = wroteOffset;
+    }
+
+
+    public int getWroteBytes() {
+        return wroteBytes;
+    }
+
+
+    public void setWroteBytes(int wroteBytes) {
+        this.wroteBytes = wroteBytes;
+    }
+
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+
+    public void setStoreTimestamp(long storeTimestamp) {
+        this.storeTimestamp = storeTimestamp;
+    }
+
+
+    public long getLogicsOffset() {
+        return logicsOffset;
+    }
+
+
+    public void setLogicsOffset(long logicsOffset) {
+        this.logicsOffset = logicsOffset;
+    }
+
+    @Override
+    public String toString() {
+        return "AppendMessageResult{" +
+                "status=" + status +
+                ", wroteOffset=" + wroteOffset +
+                ", wroteBytes=" + wroteBytes +
+                ", msgId='" + msgId + '\'' +
+                ", storeTimestamp=" + storeTimestamp +
+                ", logicsOffset=" + logicsOffset +
+                ", pagecacheRT=" + pagecacheRT +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
new file mode 100644
index 0000000..97234c0
--- /dev/null
+++ 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AppendMessageStatus.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * When write a message to the commit log, returns code
+ *
+ * @author shijia.wxr
+ *
+ */
+public enum AppendMessageStatus {
+    PUT_OK,
+    END_OF_FILE,
+    MESSAGE_SIZE_EXCEEDED,
+    PROPERTIES_SIZE_EXCEEDED,
+    UNKNOWN_ERROR,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java 
b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
new file mode 100644
index 0000000..23f305d
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/CommitLog.java
@@ -0,0 +1,1296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageAccessor;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import com.alibaba.rocketmq.store.config.FlushDiskType;
+import com.alibaba.rocketmq.store.ha.HAService;
+import com.alibaba.rocketmq.store.schedule.ScheduleMessageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * Store all metadata downtime for recovery, data protection reliability
+ *
+ * @author shijia.wxr
+ */
+public class CommitLog {
+    // Message's MAGIC CODE daa320a7
+    public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    // End of file empty MAGIC CODE cbd43194
+    private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
+    private final MappedFileQueue mappedFileQueue;
+    private final DefaultMessageStore defaultMessageStore;
+    private final FlushCommitLogService flushCommitLogService;
+
+    //If TransientStorePool enabled, we must flush message to FileChannel at 
fixed periods
+    private final FlushCommitLogService commitLogService;
+
+    private final AppendMessageCallback appendMessageCallback;
+    private HashMap<String/* topic-queueid */, Long/* offset */> 
topicQueueTable = new HashMap<String, Long>(1024);
+    private volatile long confirmOffset = -1L;
+
+    private volatile long beginTimeInLock = 0;
+
+    //true: Can lock, false : in lock.
+    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // 
NonfairSync
+
+    public CommitLog(final DefaultMessageStore defaultMessageStore) {
+        this.mappedFileQueue = new 
MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
+                
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), 
defaultMessageStore.getAllocateMappedFileService());
+        this.defaultMessageStore = defaultMessageStore;
+
+        if (FlushDiskType.SYNC_FLUSH == 
defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+            this.flushCommitLogService = new GroupCommitService();
+        } else {
+            this.flushCommitLogService = new FlushRealTimeService();
+        }
+
+        this.commitLogService = new CommitRealTimeService();
+
+        this.appendMessageCallback = new 
DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+    }
+
+    public boolean load() {
+        boolean result = this.mappedFileQueue.load();
+        log.info("load commit log " + (result ? "OK" : "Failed"));
+        return result;
+    }
+
+    public void start() {
+        this.flushCommitLogService.start();
+
+        if 
(defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+            this.commitLogService.start();
+        }
+    }
+
+    public void shutdown() {
+        if 
(defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+            this.commitLogService.shutdown();
+        }
+
+        this.flushCommitLogService.shutdown();
+    }
+
+    public long flush() {
+        this.mappedFileQueue.commit(0);
+        this.mappedFileQueue.flush(0);
+        return this.mappedFileQueue.getFlushedWhere();
+    }
+
+    public long getMaxOffset() {
+        return this.mappedFileQueue.getMaxOffset();
+    }
+
+    public long remainHowManyDataToCommit() {
+        return this.mappedFileQueue.remainHowManyDataToCommit();
+    }
+
+    public long remainHowManyDataToFlush() {
+        return this.mappedFileQueue.remainHowManyDataToFlush();
+    }
+
+
+    public int deleteExpiredFile(//
+                                 final long expiredTime, //
+                                 final int deleteFilesInterval, //
+                                 final long intervalForcibly, //
+                                 final boolean cleanImmediately//
+    ) {
+        return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, 
deleteFilesInterval, intervalForcibly, cleanImmediately);
+    }
+
+
+    /**
+     * Read CommitLog data, use data replication
+     */
+    public SelectMappedBufferResult getData(final long offset) {
+        return this.getData(offset, offset == 0);
+    }
+
+
+    public SelectMappedBufferResult getData(final long offset, final boolean 
returnFirstOnNotFound) {
+        int mappedFileSize = 
this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+        MappedFile mappedFile = 
this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
+        if (mappedFile != null) {
+            int pos = (int) (offset % mappedFileSize);
+            SelectMappedBufferResult result = 
mappedFile.selectMappedBuffer(pos);
+            return result;
+        }
+
+        return null;
+    }
+
+
+    /**
+     * When the normal exit, data recovery, all memory data have been flush
+     */
+    public void recoverNormally() {
+        boolean checkCRCOnRecover = 
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+        final List<MappedFile> mappedFiles = 
this.mappedFileQueue.getMappedFiles();
+        if (!mappedFiles.isEmpty()) {
+            // Began to recover from the last third file
+            int index = mappedFiles.size() - 3;
+            if (index < 0)
+                index = 0;
+
+            MappedFile mappedFile = mappedFiles.get(index);
+            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+            long processOffset = mappedFile.getFileFromOffset();
+            long mappedFileOffset = 0;
+            while (true) {
+                DispatchRequest dispatchRequest = 
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
+                int size = dispatchRequest.getMsgSize();
+                // Normal data
+                if (dispatchRequest.isSuccess() && size > 0) {
+                    mappedFileOffset += size;
+                }
+                // Come the end of the file, switch to the next file Since the
+                // return 0 representatives met last hole,
+                // this can not be included in truncate offset
+                else if (dispatchRequest.isSuccess() && size == 0) {
+                    index++;
+                    if (index >= mappedFiles.size()) {
+                        // Current branch can not happen
+                        log.info("recover last 3 physics file over, last maped 
file " + mappedFile.getFileName());
+                        break;
+                    } else {
+                        mappedFile = mappedFiles.get(index);
+                        byteBuffer = mappedFile.sliceByteBuffer();
+                        processOffset = mappedFile.getFileFromOffset();
+                        mappedFileOffset = 0;
+                        log.info("recover next physics file, " + 
mappedFile.getFileName());
+                    }
+                }
+                // Intermediate file read error
+                else if (!dispatchRequest.isSuccess()) {
+                    log.info("recover physics file end, " + 
mappedFile.getFileName());
+                    break;
+                }
+            }
+
+            processOffset += mappedFileOffset;
+            this.mappedFileQueue.setFlushedWhere(processOffset);
+            this.mappedFileQueue.setCommittedWhere(processOffset);
+            this.mappedFileQueue.truncateDirtyFiles(processOffset);
+        }
+    }
+
+    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer 
byteBuffer, final boolean checkCRC) {
+        return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
+    }
+
+    private void doNothingForDeadCode(final Object obj) {
+        if (obj != null) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.valueOf(obj.hashCode()));
+            }
+        }
+    }
+
+    /**
+     * check the message and returns the message size
+     *
+     * @return 0 Come the end of the file // >0 Normal messages // -1 Message
+     * checksum failure
+     */
+    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer 
byteBuffer, final boolean checkCRC, final boolean readBody) {
+        try {
+            // 1 TOTAL SIZE
+            int totalSize = byteBuffer.getInt();
+
+            // 2 MAGIC CODE
+            int magicCode = byteBuffer.getInt();
+            switch (magicCode) {
+                case MESSAGE_MAGIC_CODE:
+                    break;
+                case BLANK_MAGIC_CODE:
+                    return new DispatchRequest(0, true /* success */);
+                default:
+                    log.warn("found a illegal magic code 0x" + 
Integer.toHexString(magicCode));
+                    return new DispatchRequest(-1, false /* success */);
+            }
+
+            byte[] bytesContent = new byte[totalSize];
+
+            // 3 BODYCRC
+            int bodyCRC = byteBuffer.getInt();
+
+            // 4 QUEUEID
+            int queueId = byteBuffer.getInt();
+
+            // 5 FLAG
+            int flag = byteBuffer.getInt();
+
+            // 6 QUEUEOFFSET
+            long queueOffset = byteBuffer.getLong();
+
+            // 7 PHYSICALOFFSET
+            long physicOffset = byteBuffer.getLong();
+
+            // 8 SYSFLAG
+            int sysFlag = byteBuffer.getInt();
+
+            // 9 BORNTIMESTAMP
+            long bornTimeStamp = byteBuffer.getLong();
+
+            // 10
+            ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8);
+
+            // 11 STORETIMESTAMP
+            long storeTimestamp = byteBuffer.getLong();
+
+            // 12
+            ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8);
+
+            // 13 RECONSUMETIMES
+            int reconsumeTimes = byteBuffer.getInt();
+
+            // 14 Prepared Transaction Offset
+            long preparedTransactionOffset = byteBuffer.getLong();
+
+            // 15 BODY
+            int bodyLen = byteBuffer.getInt();
+            if (bodyLen > 0) {
+                if (readBody) {
+                    byteBuffer.get(bytesContent, 0, bodyLen);
+
+                    if (checkCRC) {
+                        int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
+                        if (crc != bodyCRC) {
+                            log.warn("CRC check failed. bodyCRC={}, 
currentCRC={}", crc, bodyCRC);
+                            return new DispatchRequest(-1, false/* success */);
+                        }
+                    }
+                } else {
+                    byteBuffer.position(byteBuffer.position() + bodyLen);
+                }
+            }
+
+            // 16 TOPIC
+            byte topicLen = byteBuffer.get();
+            byteBuffer.get(bytesContent, 0, topicLen);
+            String topic = new String(bytesContent, 0, topicLen, 
MessageDecoder.CHARSET_UTF8);
+
+            long tagsCode = 0;
+            String keys = "";
+            String uniqKey = null;
+
+            // 17 properties
+            short propertiesLength = byteBuffer.getShort();
+            if (propertiesLength > 0) {
+                byteBuffer.get(bytesContent, 0, propertiesLength);
+                String properties = new String(bytesContent, 0, 
propertiesLength, MessageDecoder.CHARSET_UTF8);
+                Map<String, String> propertiesMap = 
MessageDecoder.string2messageProperties(properties);
+
+                keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
+
+                uniqKey = 
propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+
+                String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
+                if (tags != null && tags.length() > 0) {
+                    tagsCode = 
MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag),
 tags);
+                }
+
+                // Timing message processing
+                {
+                    String t = 
propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+                    if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && 
t != null) {
+                        int delayLevel = Integer.parseInt(t);
+
+                        if (delayLevel > 
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
+                            delayLevel = 
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
+                        }
+
+                        if (delayLevel > 0) {
+                            tagsCode = 
this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
+                                    storeTimestamp);
+                        }
+                    }
+                }
+            }
+
+            int readLength = calMsgLength(bodyLen, topicLen, propertiesLength);
+            if (totalSize != readLength) {
+                doNothingForDeadCode(reconsumeTimes);
+                doNothingForDeadCode(flag);
+                doNothingForDeadCode(bornTimeStamp);
+                doNothingForDeadCode(byteBuffer1);
+                doNothingForDeadCode(byteBuffer2);
+                log.error(
+                        "[BUG]read total count not equals msg total size. 
totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
+                        totalSize, readLength, bodyLen, topicLen, 
propertiesLength);
+                return new DispatchRequest(totalSize, false/* success */);
+            }
+
+            return new DispatchRequest(//
+                    topic, // 1
+                    queueId, // 2
+                    physicOffset, // 3
+                    totalSize, // 4
+                    tagsCode, // 5
+                    storeTimestamp, // 6
+                    queueOffset, // 7
+                    keys, // 8
+                    uniqKey, //9
+                    sysFlag, // 9
+                    preparedTransactionOffset// 10
+            );
+        } catch (Exception e) {
+        }
+
+        return new DispatchRequest(-1, false /* success */);
+    }
+
+    private int calMsgLength(int bodyLength, int topicLength, int 
propertiesLength) {
+        final int msgLen = 4 // 1 TOTALSIZE
+                + 4 // 2 MAGICCODE
+                + 4 // 3 BODYCRC
+                + 4 // 4 QUEUEID
+                + 4 // 5 FLAG
+                + 8 // 6 QUEUEOFFSET
+                + 8 // 7 PHYSICALOFFSET
+                + 4 // 8 SYSFLAG
+                + 8 // 9 BORNTIMESTAMP
+                + 8 // 10 BORNHOST
+                + 8 // 11 STORETIMESTAMP
+                + 8 // 12 STOREHOSTADDRESS
+                + 4 // 13 RECONSUMETIMES
+                + 8 // 14 Prepared Transaction Offset
+                + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY
+                + 1 + topicLength // 15 TOPIC
+                + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16
+                // propertiesLength
+                + 0;
+        return msgLen;
+    }
+
+    public long getConfirmOffset() {
+        return this.confirmOffset;
+    }
+
+    public void setConfirmOffset(long phyOffset) {
+        this.confirmOffset = phyOffset;
+    }
+
+    public void recoverAbnormally() {
+        // recover by the minimum time stamp
+        boolean checkCRCOnRecover = 
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+        final List<MappedFile> mappedFiles = 
this.mappedFileQueue.getMappedFiles();
+        if (!mappedFiles.isEmpty()) {
+            // Looking beginning to recover from which file
+            int index = mappedFiles.size() - 1;
+            MappedFile mappedFile = null;
+            for (; index >= 0; index--) {
+                mappedFile = mappedFiles.get(index);
+                if (this.isMappedFileMatchedRecover(mappedFile)) {
+                    log.info("recover from this maped file " + 
mappedFile.getFileName());
+                    break;
+                }
+            }
+
+            if (index < 0) {
+                index = 0;
+                mappedFile = mappedFiles.get(index);
+            }
+
+            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+            long processOffset = mappedFile.getFileFromOffset();
+            long mappedFileOffset = 0;
+            while (true) {
+                DispatchRequest dispatchRequest = 
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
+                int size = dispatchRequest.getMsgSize();
+
+                // Normal data
+                if (size > 0) {
+                    mappedFileOffset += size;
+
+
+                    if 
(this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+                        if (dispatchRequest.getCommitLogOffset() < 
this.defaultMessageStore.getConfirmOffset()) {
+                            
this.defaultMessageStore.doDispatch(dispatchRequest);
+                        }
+                    } else {
+                        this.defaultMessageStore.doDispatch(dispatchRequest);
+                    }
+                }
+                // Intermediate file read error
+                else if (size == -1) {
+                    log.info("recover physics file end, " + 
mappedFile.getFileName());
+                    break;
+                }
+                // Come the end of the file, switch to the next file
+                // Since the return 0 representatives met last hole, this can
+                // not be included in truncate offset
+                else if (size == 0) {
+                    index++;
+                    if (index >= mappedFiles.size()) {
+                        // The current branch under normal circumstances should
+                        // not happen
+                        log.info("recover physics file over, last maped file " 
+ mappedFile.getFileName());
+                        break;
+                    } else {
+                        mappedFile = mappedFiles.get(index);
+                        byteBuffer = mappedFile.sliceByteBuffer();
+                        processOffset = mappedFile.getFileFromOffset();
+                        mappedFileOffset = 0;
+                        log.info("recover next physics file, " + 
mappedFile.getFileName());
+                    }
+                }
+            }
+
+            processOffset += mappedFileOffset;
+            this.mappedFileQueue.setFlushedWhere(processOffset);
+            this.mappedFileQueue.setCommittedWhere(processOffset);
+            this.mappedFileQueue.truncateDirtyFiles(processOffset);
+
+            // Clear ConsumeQueue redundant data
+            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
+        }
+        // Commitlog case files are deleted
+        else {
+            this.mappedFileQueue.setFlushedWhere(0);
+            this.mappedFileQueue.setCommittedWhere(0);
+            this.defaultMessageStore.destroyLogics();
+        }
+    }
+
+    private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
+        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+
+        int magicCode = 
byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
+        if (magicCode != MESSAGE_MAGIC_CODE) {
+            return false;
+        }
+
+        long storeTimestamp = 
byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+        if (0 == storeTimestamp) {
+            return false;
+        }
+
+        if 
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
+                && 
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
+            if (storeTimestamp <= 
this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
+                log.info("find check timestamp, {} {}", //
+                        storeTimestamp, //
+                        UtilAll.timeMillisToHumanString(storeTimestamp));
+                return true;
+            }
+        } else {
+            if (storeTimestamp <= 
this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
+                log.info("find check timestamp, {} {}", //
+                        storeTimestamp, //
+                        UtilAll.timeMillisToHumanString(storeTimestamp));
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private void notifyMessageArriving() {
+
+    }
+
+    public boolean resetOffset(long offset) {
+        return this.mappedFileQueue.resetOffset(offset);
+    }
+
+    public long getBeginTimeInLock() {
+        return beginTimeInLock;
+    }
+
+    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+        // Set the storage time
+        msg.setStoreTimestamp(System.currentTimeMillis());
+        // Set the message body BODY CRC (consider the most appropriate setting
+        // on the client)
+        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+        // Back to Results
+        AppendMessageResult result = null;
+
+        StoreStatsService storeStatsService = 
this.defaultMessageStore.getStoreStatsService();
+
+        String topic = msg.getTopic();
+        int queueId = msg.getQueueId();
+
+        final int tranType = 
MessageSysFlag.getTransactionValue(msg.getSysFlag());
+        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
+                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+            // Delay Delivery
+            if (msg.getDelayTimeLevel() > 0) {
+                if (msg.getDelayTimeLevel() > 
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
+                    
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
+                }
+
+                topic = ScheduleMessageService.SCHEDULE_TOPIC;
+                queueId = 
ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
+
+                // Backup real topic, queueId
+                MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
+                MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
+                
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+                msg.setTopic(topic);
+                msg.setQueueId(queueId);
+            }
+        }
+
+        long eclipseTimeInLock = 0;
+        MappedFile unlockMappedFile = null;
+        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+
+        lockForPutMessage(); //spin...
+        try {
+            long beginLockTimestamp = 
this.defaultMessageStore.getSystemClock().now();
+            this.beginTimeInLock = beginLockTimestamp;
+
+            // Here settings are stored timestamp, in order to ensure an 
orderly
+            // global
+            msg.setStoreTimestamp(beginLockTimestamp);
+
+            if (null == mappedFile || mappedFile.isFull()) {
+                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 
Mark: NewFile may be cause noise
+            }
+            if (null == mappedFile) {
+                log.error("create maped file1 error, topic: " + msg.getTopic() 
+ " clientAddr: " + msg.getBornHostString());
+                beginTimeInLock = 0;
+                return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
+            }
+
+            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
+            switch (result.getStatus()) {
+                case PUT_OK:
+                    break;
+                case END_OF_FILE:
+                    unlockMappedFile = mappedFile;
+                    // Create a new file, re-write the message
+                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
+                    if (null == mappedFile) {
+                        // XXX: warn and notify me
+                        log.error("create maped file2 error, topic: " + 
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
+                        beginTimeInLock = 0;
+                        return new 
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
+                    }
+                    result = mappedFile.appendMessage(msg, 
this.appendMessageCallback);
+                    break;
+                case MESSAGE_SIZE_EXCEEDED:
+                case PROPERTIES_SIZE_EXCEEDED:
+                    beginTimeInLock = 0;
+                    return new 
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
+                case UNKNOWN_ERROR:
+                    beginTimeInLock = 0;
+                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+                default:
+                    beginTimeInLock = 0;
+                    return new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+            }
+
+            eclipseTimeInLock = 
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
+            beginTimeInLock = 0;
+        } finally {
+            releasePutMessageLock();
+        }
+
+        if (eclipseTimeInLock > 500) {
+            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, 
result);
+        }
+
+        if (null != unlockMappedFile && 
this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
+        }
+
+
+        PutMessageResult putMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, result);
+
+        // Statistics
+        
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
+        
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
+
+        GroupCommitRequest request = null;
+
+        // Synchronization flush
+        if (FlushDiskType.SYNC_FLUSH == 
this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+            final GroupCommitService service = (GroupCommitService) 
this.flushCommitLogService;
+            if (msg.isWaitStoreMsgOK()) {
+                request = new GroupCommitRequest(result.getWroteOffset() + 
result.getWroteBytes());
+                service.putRequest(request);
+                boolean flushOK = 
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+                if (!flushOK) {
+                    log.error("do groupcommit, wait for flush failed, topic: " 
+ msg.getTopic() + " tags: " + msg.getTags()
+                            + " client address: " + msg.getBornHostString());
+                    
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+                }
+            } else {
+                service.wakeup();
+            }
+        }
+        // Asynchronous flush
+        else {
+            if 
(!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable())
 {
+                flushCommitLogService.wakeup();
+            } else {
+                commitLogService.wakeup();
+            }
+        }
+
+        // Synchronous write double
+        if (BrokerRole.SYNC_MASTER == 
this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
+            HAService service = this.defaultMessageStore.getHaService();
+            if (msg.isWaitStoreMsgOK()) {
+                // Determine whether to wait
+                if (service.isSlaveOK(result.getWroteOffset() + 
result.getWroteBytes())) {
+                    if (null == request) {
+                        request = new 
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+                    }
+                    service.putRequest(request);
+
+                    service.getWaitNotifyObject().wakeupAll();
+
+                    boolean flushOK =
+                            // TODO
+                            
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+                    if (!flushOK) {
+                        log.error("do sync transfer other node, wait return, 
but failed, topic: " + msg.getTopic() + " tags: "
+                                + msg.getTags() + " client address: " + 
msg.getBornHostString());
+                        
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+                    }
+                }
+                // Slave problem
+                else {
+                    // Tell the producer, slave not available
+                    
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
+                }
+            }
+        }
+
+        return putMessageResult;
+    }
+
+    /**
+     * According to receive certain message or offset storage time if an error
+     * occurs, it returns -1
+     */
+    public long pickupStoreTimestamp(final long offset, final int size) {
+        if (offset >= this.getMinOffset()) {
+            SelectMappedBufferResult result = this.getMessage(offset, size);
+            if (null != result) {
+                try {
+                    return 
result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+                } finally {
+                    result.release();
+                }
+            }
+        }
+
+        return -1;
+    }
+
+    public long getMinOffset() {
+        MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
+        if (mappedFile != null) {
+            if (mappedFile.isAvailable()) {
+                return mappedFile.getFileFromOffset();
+            } else {
+                return this.rollNextFile(mappedFile.getFileFromOffset());
+            }
+        }
+
+        return -1;
+    }
+
+    public SelectMappedBufferResult getMessage(final long offset, final int 
size) {
+        int mappedFileSize = 
this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+        MappedFile mappedFile = 
this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+        if (mappedFile != null) {
+            int pos = (int) (offset % mappedFileSize);
+            return mappedFile.selectMappedBuffer(pos, size);
+        }
+        return null;
+    }
+
+    public long rollNextFile(final long offset) {
+        int mappedFileSize = 
this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
+        return offset + mappedFileSize - offset % mappedFileSize;
+    }
+
+    public HashMap<String, Long> getTopicQueueTable() {
+        return topicQueueTable;
+    }
+
+
+    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+        this.topicQueueTable = topicQueueTable;
+    }
+
+
+    public void destroy() {
+        this.mappedFileQueue.destroy();
+    }
+
+
+    public boolean appendData(long startOffset, byte[] data) {
+        lockForPutMessage(); //spin...
+        try {
+            MappedFile mappedFile = 
this.mappedFileQueue.getLastMappedFile(startOffset);
+            if (null == mappedFile) {
+                log.error("appendData getLastMappedFile error  " + 
startOffset);
+                return false;
+            }
+
+            return mappedFile.appendMessage(data);
+        } finally {
+            releasePutMessageLock();
+        }
+    }
+
+
+    public boolean retryDeleteFirstFile(final long intervalForcibly) {
+        return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly);
+    }
+
+    public void removeQueueFromTopicQueueTable(final String topic, final int 
queueId) {
+        String key = topic + "-" + queueId;
+        synchronized (this) {
+            this.topicQueueTable.remove(key);
+        }
+
+        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", 
topic, queueId);
+    }
+
+    public void checkSelf() {
+        mappedFileQueue.checkSelf();
+    }
+
+    abstract class FlushCommitLogService extends ServiceThread {
+        protected static final int RETRY_TIMES_OVER = 10;
+    }
+
+    class CommitRealTimeService extends FlushCommitLogService {
+
+        private long lastCommitTimestamp = 0;
+
+        @Override
+        public String getServiceName() {
+            return CommitRealTimeService.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            CommitLog.log.info(this.getServiceName() + " service started");
+            while (!this.isStopped()) {
+                int interval = 
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
+
+                int commitDataLeastPages = 
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
+
+                int commitDataThoroughInterval =
+                        
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
+
+                long begin = System.currentTimeMillis();
+                if (begin >= (this.lastCommitTimestamp + 
commitDataThoroughInterval)) {
+                    this.lastCommitTimestamp = begin;
+                    commitDataLeastPages = 0;
+                }
+
+                try {
+                    boolean result = 
CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
+                    long end = System.currentTimeMillis();
+                    if (!result) {
+                        this.lastCommitTimestamp = end; // result = false 
means some data committed.
+                        //now wake up flush thread.
+                        flushCommitLogService.wakeup();
+                    }
+
+                    if (end - begin > 500) {
+                        log.info("Commit data to file costs {} ms", end - 
begin);
+                    }
+                    this.waitForRunning(interval);
+                } catch (Throwable e) {
+                    CommitLog.log.error(this.getServiceName() + " service has 
exception. ", e);
+                }
+            }
+
+            boolean result = false;
+            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
+                result = CommitLog.this.mappedFileQueue.commit(0);
+                CommitLog.log.info(this.getServiceName() + " service shutdown, 
retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
+            }
+            CommitLog.log.info(this.getServiceName() + " service end");
+        }
+    }
+
+    class FlushRealTimeService extends FlushCommitLogService {
+        private long lastFlushTimestamp = 0;
+        private long printTimes = 0;
+
+
+        public void run() {
+            CommitLog.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                boolean flushCommitLogTimed = 
CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
+
+                int interval = 
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
+                int flushPhysicQueueLeastPages = 
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
+
+                int flushPhysicQueueThoroughInterval =
+                        
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
+
+                boolean printFlushProgress = false;
+
+                // Print flush progress
+                long currentTimeMillis = System.currentTimeMillis();
+                if (currentTimeMillis >= (this.lastFlushTimestamp + 
flushPhysicQueueThoroughInterval)) {
+                    this.lastFlushTimestamp = currentTimeMillis;
+                    flushPhysicQueueLeastPages = 0;
+                    printFlushProgress = (printTimes++ % 10) == 0;
+                }
+
+                try {
+                    if (flushCommitLogTimed) {
+                        Thread.sleep(interval);
+                    } else {
+                        this.waitForRunning(interval);
+                    }
+
+                    if (printFlushProgress) {
+                        this.printFlushProgress();
+                    }
+
+                    long begin = System.currentTimeMillis();
+                    
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
+                    long storeTimestamp = 
CommitLog.this.mappedFileQueue.getStoreTimestamp();
+                    if (storeTimestamp > 0) {
+                        
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+                    }
+                    long past = System.currentTimeMillis() - begin;
+                    if (past > 500) {
+                        log.info("Flush data to disk costs {} ms", past);
+                    }
+                } catch (Throwable e) {
+                    CommitLog.log.warn(this.getServiceName() + " service has 
exception. ", e);
+                    this.printFlushProgress();
+                }
+            }
+
+            // Normal shutdown, to ensure that all the flush before exit
+            boolean result = false;
+            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
+                result = CommitLog.this.mappedFileQueue.flush(0);
+                CommitLog.log.info(this.getServiceName() + " service shutdown, 
retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
+            }
+
+            this.printFlushProgress();
+
+            CommitLog.log.info(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return FlushRealTimeService.class.getSimpleName();
+        }
+
+
+        private void printFlushProgress() {
+            // CommitLog.log.info("how much disk fall behind memory, "
+            // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
+        }
+
+
+        @Override
+        public long getJointime() {
+            return 1000 * 60 * 5;
+        }
+    }
+
+    public static class GroupCommitRequest {
+        private final long nextOffset;
+        private final CountDownLatch countDownLatch = new CountDownLatch(1);
+        private volatile boolean flushOK = false;
+
+
+        public GroupCommitRequest(long nextOffset) {
+            this.nextOffset = nextOffset;
+        }
+
+
+        public long getNextOffset() {
+            return nextOffset;
+        }
+
+
+        public void wakeupCustomer(final boolean flushOK) {
+            this.flushOK = flushOK;
+            this.countDownLatch.countDown();
+        }
+
+
+        public boolean waitForFlush(long timeout) {
+            try {
+                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+                return this.flushOK;
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                return false;
+            }
+        }
+    }
+
+    /**
+     * GroupCommit Service
+     */
+    class GroupCommitService extends FlushCommitLogService {
+        private volatile List<GroupCommitRequest> requestsWrite = new 
ArrayList<GroupCommitRequest>();
+        private volatile List<GroupCommitRequest> requestsRead = new 
ArrayList<GroupCommitRequest>();
+
+
+        public void putRequest(final GroupCommitRequest request) {
+            synchronized (this) {
+                this.requestsWrite.add(request);
+                if (hasNotified.compareAndSet(false, true)) {
+                    waitPoint.countDown(); // notify
+                }
+            }
+        }
+
+
+        private void swapRequests() {
+            List<GroupCommitRequest> tmp = this.requestsWrite;
+            this.requestsWrite = this.requestsRead;
+            this.requestsRead = tmp;
+        }
+
+
+        private void doCommit() {
+            if (!this.requestsRead.isEmpty()) {
+                for (GroupCommitRequest req : this.requestsRead) {
+                    // There may be a message in the next file, so a maximum of
+                    // two times the flush
+                    boolean flushOK = false;
+                    for (int i = 0; i < 2 && !flushOK; i++) {
+                        flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+
+                        if (!flushOK) {
+                            CommitLog.this.mappedFileQueue.flush(0);
+                        }
+                    }
+
+                    req.wakeupCustomer(flushOK);
+                }
+
+                long storeTimestamp = 
CommitLog.this.mappedFileQueue.getStoreTimestamp();
+                if (storeTimestamp > 0) {
+                    
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+                }
+
+                this.requestsRead.clear();
+            } else {
+                // Because of individual messages is set to not sync flush, it
+                // will come to this process
+                CommitLog.this.mappedFileQueue.flush(0);
+            }
+        }
+
+
+        public void run() {
+            CommitLog.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.waitForRunning(0);
+                    this.doCommit();
+                } catch (Exception e) {
+                    CommitLog.log.warn(this.getServiceName() + " service has 
exception. ", e);
+                }
+            }
+
+            // Under normal circumstances shutdown, wait for the arrival of the
+            // request, and then flush
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                CommitLog.log.warn("GroupCommitService Exception, ", e);
+            }
+
+            synchronized (this) {
+                this.swapRequests();
+            }
+
+            this.doCommit();
+
+            CommitLog.log.info(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        protected void onWaitEnd() {
+            this.swapRequests();
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return GroupCommitService.class.getSimpleName();
+        }
+
+
+        @Override
+        public long getJointime() {
+            return 1000 * 60 * 5;
+        }
+    }
+
+    class DefaultAppendMessageCallback implements AppendMessageCallback {
+        // File at the end of the minimum fixed length empty
+        private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
+        private final ByteBuffer msgIdMemory;
+        // Store the message content
+        private final ByteBuffer msgStoreItemMemory;
+        // The maximum length of the message
+        private final int maxMessageSize;
+        // Build Message Key
+        private final StringBuilder keyBuilder = new StringBuilder();
+
+        private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
+
+
+        DefaultAppendMessageCallback(final int size) {
+            this.msgIdMemory = 
ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
+            this.msgStoreItemMemory = ByteBuffer.allocate(size + 
END_FILE_MIN_BLANK_LENGTH);
+            this.maxMessageSize = size;
+        }
+
+
+        public ByteBuffer getMsgStoreItemMemory() {
+            return msgStoreItemMemory;
+        }
+
+
+        public AppendMessageResult doAppend(final long fileFromOffset, final 
ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner 
msgInner) {
+            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
+
+            // PHY OFFSET
+            long wroteOffset = fileFromOffset + byteBuffer.position();
+
+            this.resetByteBuffer(hostHolder, 8);
+            String msgId = MessageDecoder.createMessageId(this.msgIdMemory, 
msgInner.getStoreHostBytes(hostHolder), wroteOffset);
+
+            // Record ConsumeQueue information
+            keyBuilder.setLength(0);
+            keyBuilder.append(msgInner.getTopic());
+            keyBuilder.append('-');
+            keyBuilder.append(msgInner.getQueueId());
+            String key = keyBuilder.toString();
+            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
+            if (null == queueOffset) {
+                queueOffset = 0L;
+                CommitLog.this.topicQueueTable.put(key, queueOffset);
+            }
+
+            // Transaction messages that require special handling
+            final int tranType = 
MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
+            switch (tranType) {
+                // Prepared and Rollback message is not consumed, will not 
enter the
+                // consumer queuec
+                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                    queueOffset = 0L;
+                    break;
+                case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                default:
+                    break;
+            }
+
+            /**
+             * Serialize message
+             */
+            final byte[] propertiesData =
+                    msgInner.getPropertiesString() == null ? null : 
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+            final short propertiesLength = propertiesData == null ? 0 : 
(short) propertiesData.length;
+
+            if (propertiesLength > Short.MAX_VALUE) {
+                log.warn("putMessage message properties length too long. 
length={}", propertiesData.length);
+                return new 
AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+            }
+
+            final byte[] topicData = 
msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+            final int topicLength = topicData == null ? 0 : topicData.length;
+
+            final int bodyLength = msgInner.getBody() == null ? 0 : 
msgInner.getBody().length;
+
+            final int msgLen = calMsgLength(bodyLength, topicLength, 
propertiesLength);
+
+            // Exceeds the maximum message
+            if (msgLen > this.maxMessageSize) {
+                CommitLog.log.warn("message size exceeded, msg total size: " + 
msgLen + ", msg body size: " + bodyLength
+                        + ", maxMessageSize: " + this.maxMessageSize);
+                return new 
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
+            }
+
+            // Determines whether there is sufficient free space
+            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
+                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
+                // 1 TOTALSIZE
+                this.msgStoreItemMemory.putInt(maxBlank);
+                // 2 MAGICCODE
+                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
+                // 3 The remaining space may be any value
+                //
+
+                // Here the length of the specially set maxBlank
+                final long beginTimeMills = 
CommitLog.this.defaultMessageStore.now();
+                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
+                return new 
AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, 
msgId, msgInner.getStoreTimestamp(),
+                        queueOffset, CommitLog.this.defaultMessageStore.now() 
- beginTimeMills);
+            }
+
+            // Initialization of storage space
+            this.resetByteBuffer(msgStoreItemMemory, msgLen);
+            // 1 TOTALSIZE
+            this.msgStoreItemMemory.putInt(msgLen);
+            // 2 MAGICCODE
+            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+            // 3 BODYCRC
+            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
+            // 4 QUEUEID
+            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
+            // 5 FLAG
+            this.msgStoreItemMemory.putInt(msgInner.getFlag());
+            // 6 QUEUEOFFSET
+            this.msgStoreItemMemory.putLong(queueOffset);
+            // 7 PHYSICALOFFSET
+            this.msgStoreItemMemory.putLong(fileFromOffset + 
byteBuffer.position());
+            // 8 SYSFLAG
+            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
+            // 9 BORNTIMESTAMP
+            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+            // 10 BORNHOST
+            this.resetByteBuffer(hostHolder, 8);
+            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
+            // 11 STORETIMESTAMP
+            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+            // 12 STOREHOSTADDRESS
+            this.resetByteBuffer(hostHolder, 8);
+            
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
+            //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
+            // 13 RECONSUMETIMES
+            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+            // 14 Prepared Transaction Offset
+            
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+            // 15 BODY
+            this.msgStoreItemMemory.putInt(bodyLength);
+            if (bodyLength > 0)
+                this.msgStoreItemMemory.put(msgInner.getBody());
+            // 16 TOPIC
+            this.msgStoreItemMemory.put((byte) topicLength);
+            this.msgStoreItemMemory.put(topicData);
+            // 17 PROPERTIES
+            this.msgStoreItemMemory.putShort(propertiesLength);
+            if (propertiesLength > 0)
+                this.msgStoreItemMemory.put(propertiesData);
+
+            final long beginTimeMills = 
CommitLog.this.defaultMessageStore.now();
+            // Write messages to the queue buffer
+            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
+
+            AppendMessageResult result = new 
AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
+                    msgInner.getStoreTimestamp(), queueOffset, 
CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+
+            switch (tranType) {
+                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                    break;
+                case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                    // The next update ConsumeQueue information
+                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
+                    break;
+                default:
+                    break;
+            }
+            return result;
+        }
+
+
+        private void resetByteBuffer(final ByteBuffer byteBuffer, final int 
limit) {
+            byteBuffer.flip();
+            byteBuffer.limit(limit);
+        }
+    }
+
+    public long lockTimeMills() {
+        long diff = 0;
+        long begin = this.beginTimeInLock;
+        if (begin > 0) {
+            diff = this.defaultMessageStore.now() - begin;
+        }
+
+        if (diff < 0) {
+            diff = 0;
+        }
+
+        return diff;
+    }
+
+    /**
+     * Spin util acquired the lock.
+     */
+    private void lockForPutMessage() {
+        if 
(this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage())
 {
+            putMessageNormalLock.lock();
+        } else {
+            boolean flag;
+            do {
+                flag = this.putMessageSpinLock.compareAndSet(true, false);
+            } while (!flag);
+        }
+    }
+
+    private void releasePutMessageLock() {
+        if 
(this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage())
 {
+            putMessageNormalLock.unlock();
+        } else {
+            this.putMessageSpinLock.compareAndSet(false, true);
+        }
+    }
+}

Reply via email to