http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
index cb7304e..4122857 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
@@ -6,19 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
-
 public interface MessageFilter {
     boolean isMessageMatched(final SubscriptionData subscriptionData, final 
Long tagsCode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 328a700..9ed74e6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -6,114 +6,87 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-
 import java.util.HashMap;
 import java.util.Set;
-
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
 public interface MessageStore {
 
     boolean load();
 
-
     void start() throws Exception;
 
-
     void shutdown();
 
-
     void destroy();
 
     PutMessageResult putMessage(final MessageExtBrokerInner msg);
 
-
     GetMessageResult getMessage(final String group, final String topic, final 
int queueId,
-                                final long offset, final int maxMsgNums, final 
SubscriptionData subscriptionData);
-
+        final long offset, final int maxMsgNums, final SubscriptionData 
subscriptionData);
 
     long getMaxOffsetInQuque(final String topic, final int queueId);
 
-
     long getMinOffsetInQuque(final String topic, final int queueId);
 
-
     long getCommitLogOffsetInQueue(final String topic, final int queueId, 
final long cqOffset);
 
-
     long getOffsetInQueueByTime(final String topic, final int queueId, final 
long timestamp);
 
-
     MessageExt lookMessageByOffset(final long commitLogOffset);
 
-
     SelectMappedBufferResult selectOneMessageByOffset(final long 
commitLogOffset);
 
-
     SelectMappedBufferResult selectOneMessageByOffset(final long 
commitLogOffset, final int msgSize);
 
     String getRunningDataInfo();
 
-
     HashMap<String, String> getRuntimeInfo();
 
-
     long getMaxPhyOffset();
 
-
     long getMinPhyOffset();
 
-
     long getEarliestMessageTime(final String topic, final int queueId);
-    long getEarliestMessageTime();
 
+    long getEarliestMessageTime();
 
     long getMessageStoreTimeStamp(final String topic, final int queueId, final 
long offset);
 
-
     long getMessageTotalInQueue(final String topic, final int queueId);
 
     SelectMappedBufferResult getCommitLogData(final long offset);
 
-
     boolean appendToCommitLog(final long startOffset, final byte[] data);
 
     void excuteDeleteFilesManualy();
 
-
     QueryMessageResult queryMessage(final String topic, final String key, 
final int maxNum,
-                                    final long begin, final long end);
-
+        final long begin, final long end);
 
     void updateHaMasterAddress(final String newAddr);
 
-
     long slaveFallBehindMuch();
 
-
     long now();
 
-
     int cleanUnusedTopic(final Set<String> topics);
 
-
     void cleanExpiredConsumerQueue();
 
-
     boolean checkInDiskByConsumeOffset(final String topic, final int queueId, 
long consumeOffset);
 
-
     long dispatchBehindBytes();
 
     long flush();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java 
b/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java
index d36e7ee..b6ede7e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageResult.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
@@ -20,42 +20,35 @@ public class PutMessageResult {
     private PutMessageStatus putMessageStatus;
     private AppendMessageResult appendMessageResult;
 
-
     public PutMessageResult(PutMessageStatus putMessageStatus, 
AppendMessageResult appendMessageResult) {
         this.putMessageStatus = putMessageStatus;
         this.appendMessageResult = appendMessageResult;
     }
 
-
     public boolean isOk() {
         return this.appendMessageResult != null && 
this.appendMessageResult.isOk();
     }
 
-
     public AppendMessageResult getAppendMessageResult() {
         return appendMessageResult;
     }
 
-
     public void setAppendMessageResult(AppendMessageResult 
appendMessageResult) {
         this.appendMessageResult = appendMessageResult;
     }
 
-
     public PutMessageStatus getPutMessageStatus() {
         return putMessageStatus;
     }
 
-
     public void setPutMessageStatus(PutMessageStatus putMessageStatus) {
         this.putMessageStatus = putMessageStatus;
     }
 
-
     @Override
     public String toString() {
         return "PutMessageResult [putMessageStatus=" + putMessageStatus + ", 
appendMessageResult="
-                + appendMessageResult + "]";
+            + appendMessageResult + "]";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java 
b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
index 1b6ec9a..abab63f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java 
b/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java
index 1f300a2..8b45b7c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/QueryMessageResult.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
@@ -20,11 +20,10 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-
 public class QueryMessageResult {
 
     private final List<SelectMappedBufferResult> messageMapedList =
-            new ArrayList<SelectMappedBufferResult>(100);
+        new ArrayList<SelectMappedBufferResult>(100);
 
     private final List<ByteBuffer> messageBufferList = new 
ArrayList<ByteBuffer>(100);
     private long indexLastUpdateTimestamp;
@@ -32,46 +31,38 @@ public class QueryMessageResult {
 
     private int bufferTotalSize = 0;
 
-
     public void addMessage(final SelectMappedBufferResult mapedBuffer) {
         this.messageMapedList.add(mapedBuffer);
         this.messageBufferList.add(mapedBuffer.getByteBuffer());
         this.bufferTotalSize += mapedBuffer.getSize();
     }
 
-
     public void release() {
         for (SelectMappedBufferResult select : this.messageMapedList) {
             select.release();
         }
     }
 
-
     public long getIndexLastUpdateTimestamp() {
         return indexLastUpdateTimestamp;
     }
 
-
     public void setIndexLastUpdateTimestamp(long indexLastUpdateTimestamp) {
         this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
     }
 
-
     public long getIndexLastUpdatePhyoffset() {
         return indexLastUpdatePhyoffset;
     }
 
-
     public void setIndexLastUpdatePhyoffset(long indexLastUpdatePhyoffset) {
         this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset;
     }
 
-
     public List<ByteBuffer> getMessageBufferList() {
         return messageBufferList;
     }
 
-
     public int getBufferTotalSize() {
         return bufferTotalSize;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java 
b/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java
index 6563232..b8b75da 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ReferenceResource.java
@@ -6,26 +6,24 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
 import java.util.concurrent.atomic.AtomicLong;
 
-
 public abstract class ReferenceResource {
     protected final AtomicLong refCount = new AtomicLong(1);
     protected volatile boolean available = true;
     protected volatile boolean cleanupOver = false;
     private volatile long firstShutdownTimestamp = 0;
 
-
     public synchronized boolean hold() {
         if (this.isAvailable()) {
             if (this.refCount.getAndIncrement() > 0) {
@@ -38,21 +36,16 @@ public abstract class ReferenceResource {
         return false;
     }
 
-
     public boolean isAvailable() {
         return this.available;
     }
 
-
-
     public void shutdown(final long intervalForcibly) {
         if (this.available) {
             this.available = false;
             this.firstShutdownTimestamp = System.currentTimeMillis();
             this.release();
-        }
-
-        else if (this.getRefCount() > 0) {
+        } else if (this.getRefCount() > 0) {
             if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= 
intervalForcibly) {
                 this.refCount.set(-1000 - this.getRefCount());
                 this.release();
@@ -77,7 +70,6 @@ public abstract class ReferenceResource {
 
     public abstract boolean cleanup(final long currentRef);
 
-
     public boolean isCleanupOver() {
         return this.refCount.get() <= 0 && this.cleanupOver;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java 
b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 8e7f29f..52c269c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
@@ -29,16 +29,13 @@ public class RunningFlags {
     private static final int DISK_FULL_BIT = 1 << 4;
     private volatile int flagBits = 0;
 
-
     public RunningFlags() {
     }
 
-
     public int getFlagBits() {
         return flagBits;
     }
 
-
     public boolean getAndMakeReadable() {
         boolean result = this.isReadable();
         if (!result) {
@@ -47,7 +44,6 @@ public class RunningFlags {
         return result;
     }
 
-
     public boolean isReadable() {
         if ((this.flagBits & NOT_READABLE_BIT) == 0) {
             return true;
@@ -56,7 +52,6 @@ public class RunningFlags {
         return false;
     }
 
-
     public boolean getAndMakeNotReadable() {
         boolean result = this.isReadable();
         if (result) {
@@ -65,7 +60,6 @@ public class RunningFlags {
         return result;
     }
 
-
     public boolean getAndMakeWriteable() {
         boolean result = this.isWriteable();
         if (!result) {
@@ -74,7 +68,6 @@ public class RunningFlags {
         return result;
     }
 
-
     public boolean isWriteable() {
         if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT 
| DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
             return true;
@@ -83,7 +76,6 @@ public class RunningFlags {
         return false;
     }
 
-
     public boolean getAndMakeNotWriteable() {
         boolean result = this.isWriteable();
         if (result) {
@@ -92,12 +84,10 @@ public class RunningFlags {
         return result;
     }
 
-
     public void makeLogicsQueueError() {
         this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;
     }
 
-
     public boolean isLogicsQueueError() {
         if ((this.flagBits & WRITE_LOGICS_QUEUE_ERROR_BIT) == 
WRITE_LOGICS_QUEUE_ERROR_BIT) {
             return true;
@@ -106,12 +96,10 @@ public class RunningFlags {
         return false;
     }
 
-
     public void makeIndexFileError() {
         this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT;
     }
 
-
     public boolean isIndexFileError() {
         if ((this.flagBits & WRITE_INDEX_FILE_ERROR_BIT) == 
WRITE_INDEX_FILE_ERROR_BIT) {
             return true;
@@ -120,14 +108,12 @@ public class RunningFlags {
         return false;
     }
 
-
     public boolean getAndMakeDiskFull() {
         boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
         this.flagBits |= DISK_FULL_BIT;
         return result;
     }
 
-
     public boolean getAndMakeDiskOK() {
         boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
         this.flagBits &= ~DISK_FULL_BIT;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java 
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 218f809..ddd9383 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -6,19 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
 import java.nio.ByteBuffer;
 
-
 public class SelectMappedBufferResult {
 
     private final long startOffset;
@@ -29,7 +28,6 @@ public class SelectMappedBufferResult {
 
     private MappedFile mappedFile;
 
-
     public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, 
int size, MappedFile mappedFile) {
         this.startOffset = startOffset;
         this.byteBuffer = byteBuffer;
@@ -37,28 +35,23 @@ public class SelectMappedBufferResult {
         this.mappedFile = mappedFile;
     }
 
-
     public ByteBuffer getByteBuffer() {
         return byteBuffer;
     }
 
-
     public int getSize() {
         return size;
     }
 
-
     public void setSize(final int s) {
         this.size = s;
         this.byteBuffer.limit(this.size);
     }
 
-
     public MappedFile getMappedFile() {
         return mappedFile;
     }
 
-
 //    @Override
 //    protected void finalize() {
 //        if (this.mappedFile != null) {
@@ -66,7 +59,6 @@ public class SelectMappedBufferResult {
 //        }
 //    }
 
-
     public synchronized void release() {
         if (this.mappedFile != null) {
             this.mappedFile.release();
@@ -74,7 +66,6 @@ public class SelectMappedBufferResult {
         }
     }
 
-
     public long getStartOffset() {
         return startOffset;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java 
b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
index 7fb5158..49a342e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
@@ -6,28 +6,26 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
-
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StoreCheckpoint {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -38,7 +36,6 @@ public class StoreCheckpoint {
     private volatile long logicsMsgTimestamp = 0;
     private volatile long indexMsgTimestamp = 0;
 
-
     public StoreCheckpoint(final String scpPath) throws IOException {
         File file = new File(scpPath);
         MappedFile.ensureDirOK(file.getParent());
@@ -55,17 +52,16 @@ public class StoreCheckpoint {
             this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
 
             log.info("store checkpoint file physicMsgTimestamp " + 
this.physicMsgTimestamp + ", "
-                    + 
UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
+                + UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
             log.info("store checkpoint file logicsMsgTimestamp " + 
this.logicsMsgTimestamp + ", "
-                    + 
UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
+                + UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
             log.info("store checkpoint file indexMsgTimestamp " + 
this.indexMsgTimestamp + ", "
-                    + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
+                + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
         } else {
             log.info("store checkpoint file not exists, " + scpPath);
         }
     }
 
-
     public void shutdown() {
         this.flush();
 
@@ -79,7 +75,6 @@ public class StoreCheckpoint {
         }
     }
 
-
     public void flush() {
         this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
         this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
@@ -87,36 +82,29 @@ public class StoreCheckpoint {
         this.mappedByteBuffer.force();
     }
 
-
     public long getPhysicMsgTimestamp() {
         return physicMsgTimestamp;
     }
 
-
     public void setPhysicMsgTimestamp(long physicMsgTimestamp) {
         this.physicMsgTimestamp = physicMsgTimestamp;
     }
 
-
     public long getLogicsMsgTimestamp() {
         return logicsMsgTimestamp;
     }
 
-
     public void setLogicsMsgTimestamp(long logicsMsgTimestamp) {
         this.logicsMsgTimestamp = logicsMsgTimestamp;
     }
 
-
     public long getMinTimestampIndex() {
         return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
     }
 
-
     public long getMinTimestamp() {
         long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);
 
-
         // fixed https://github.org/apache/rocketmqissues/467
         min -= 1000 * 3;
         if (min < 0)
@@ -125,12 +113,10 @@ public class StoreCheckpoint {
         return min;
     }
 
-
     public long getIndexMsgTimestamp() {
         return indexMsgTimestamp;
     }
 
-
     public void setIndexMsgTimestamp(long indexMsgTimestamp) {
         this.indexMsgTimestamp = indexMsgTimestamp;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java 
b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
index 91b70fb..d4ba147 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
@@ -16,11 +16,6 @@
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.text.MessageFormat;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -28,7 +23,10 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
-
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StoreStatsService extends ServiceThread {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -36,7 +34,7 @@ public class StoreStatsService extends ServiceThread {
     private static final int FREQUENCY_OF_SAMPLING = 1000;
 
     private static final int MAX_RECORDS_OF_SAMPLING = 60 * 10;
-    private static final String[] PUT_MESSAGE_ENTIRE_TIME_MAX_DESC = new 
String[]{
+    private static final String[] PUT_MESSAGE_ENTIRE_TIME_MAX_DESC = new 
String[] {
         "[<=0ms]", "[0~10ms]", "[10~50ms]", "[50~100ms]", "[100~200ms]", 
"[200~500ms]", "[500ms~1s]", "[1~2s]", "[2~3s]", "[3~4s]", "[4~5s]", "[5~10s]", 
"[10s~]",
     };
 
@@ -45,9 +43,9 @@ public class StoreStatsService extends ServiceThread {
     private final AtomicLong putMessageFailedTimes = new AtomicLong(0);
 
     private final Map<String, AtomicLong> putMessageTopicTimesTotal =
-            new ConcurrentHashMap<String, AtomicLong>(128);
+        new ConcurrentHashMap<String, AtomicLong>(128);
     private final Map<String, AtomicLong> putMessageTopicSizeTotal =
-            new ConcurrentHashMap<String, AtomicLong>(128);
+        new ConcurrentHashMap<String, AtomicLong>(128);
 
     private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0);
     private final AtomicLong getMessageTransferedMsgCount = new AtomicLong(0);
@@ -71,7 +69,6 @@ public class StoreStatsService extends ServiceThread {
     private ReentrantLock lockSampling = new ReentrantLock();
     private long lastPrintTimestamp = System.currentTimeMillis();
 
-
     public StoreStatsService() {
         this.initPutMessageDistributeTime();
     }
@@ -96,7 +93,8 @@ public class StoreStatsService extends ServiceThread {
     public void setPutMessageEntireTimeMax(long value) {
         final AtomicLong[] times = this.putMessageDistributeTime;
 
-        if (null == times) return;
+        if (null == times)
+            return;
 
         // us
         if (value <= 0) {
@@ -140,37 +138,32 @@ public class StoreStatsService extends ServiceThread {
         if (value > this.putMessageEntireTimeMax) {
             this.lockPut.lock();
             this.putMessageEntireTimeMax =
-                    value > this.putMessageEntireTimeMax ? value : 
this.putMessageEntireTimeMax;
+                value > this.putMessageEntireTimeMax ? value : 
this.putMessageEntireTimeMax;
             this.lockPut.unlock();
         }
     }
 
-
     public long getGetMessageEntireTimeMax() {
         return getMessageEntireTimeMax;
     }
 
-
     public void setGetMessageEntireTimeMax(long value) {
         if (value > this.getMessageEntireTimeMax) {
             this.lockGet.lock();
             this.getMessageEntireTimeMax =
-                    value > this.getMessageEntireTimeMax ? value : 
this.getMessageEntireTimeMax;
+                value > this.getMessageEntireTimeMax ? value : 
this.getMessageEntireTimeMax;
             this.lockGet.unlock();
         }
     }
 
-
     public long getDispatchMaxBuffer() {
         return dispatchMaxBuffer;
     }
 
-
     public void setDispatchMaxBuffer(long value) {
         this.dispatchMaxBuffer = value > this.dispatchMaxBuffer ? value : 
this.dispatchMaxBuffer;
     }
 
-
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder(1024);
@@ -184,9 +177,9 @@ public class StoreStatsService extends ServiceThread {
         sb.append("\tputMessageTimesTotal: " + totalTimes + "\r\n");
         sb.append("\tputMessageSizeTotal: " + this.getPutMessageSizeTotal() + 
"\r\n");
         sb.append("\tputMessageDistributeTime: " + 
this.getPutMessageDistributeTimeStringInfo(totalTimes)
-                + "\r\n");
+            + "\r\n");
         sb.append("\tputMessageAverageSize: " + (this.getPutMessageSizeTotal() 
/ totalTimes.doubleValue())
-                + "\r\n");
+            + "\r\n");
         sb.append("\tdispatchMaxBuffer: " + this.dispatchMaxBuffer + "\r\n");
         sb.append("\tgetMessageEntireTimeMax: " + this.getMessageEntireTimeMax 
+ "\r\n");
         sb.append("\tputTps: " + this.getPutTps() + "\r\n");
@@ -218,7 +211,7 @@ public class StoreStatsService extends ServiceThread {
         long hours = (time % day) / hour;
         long minutes = (time % hour) / minute;
         long seconds = (time % minute) / second;
-        return messageFormat.format(new Long[]{days, hours, minutes, seconds});
+        return messageFormat.format(new Long[] {days, hours, minutes, 
seconds});
     }
 
     public long getPutMessageSizeTotal() {
@@ -239,11 +232,9 @@ public class StoreStatsService extends ServiceThread {
         sb.append(this.getPutTps(10));
         sb.append(" ");
 
-
         sb.append(this.getPutTps(60));
         sb.append(" ");
 
-
         sb.append(this.getPutTps(600));
 
         return sb.toString();
@@ -255,11 +246,9 @@ public class StoreStatsService extends ServiceThread {
         sb.append(this.getGetFoundTps(10));
         sb.append(" ");
 
-
         sb.append(this.getGetFoundTps(60));
         sb.append(" ");
 
-
         sb.append(this.getGetFoundTps(600));
 
         return sb.toString();
@@ -271,11 +260,9 @@ public class StoreStatsService extends ServiceThread {
         sb.append(this.getGetMissTps(10));
         sb.append(" ");
 
-
         sb.append(this.getGetMissTps(60));
         sb.append(" ");
 
-
         sb.append(this.getGetMissTps(600));
 
         return sb.toString();
@@ -287,11 +274,9 @@ public class StoreStatsService extends ServiceThread {
         sb.append(this.getGetTotalTps(10));
         sb.append(" ");
 
-
         sb.append(this.getGetTotalTps(60));
         sb.append(" ");
 
-
         sb.append(this.getGetTotalTps(600));
 
         return sb.toString();
@@ -303,11 +288,9 @@ public class StoreStatsService extends ServiceThread {
         sb.append(this.getGetTransferedTps(10));
         sb.append(" ");
 
-
         sb.append(this.getGetTransferedTps(60));
         sb.append(" ");
 
-
         sb.append(this.getGetTransferedTps(600));
 
         return sb.toString();
@@ -315,7 +298,8 @@ public class StoreStatsService extends ServiceThread {
 
     private String putMessageDistributeTimeToString() {
         final AtomicLong[] times = this.putMessageDistributeTime;
-        if (null == times) return null;
+        if (null == times)
+            return null;
 
         final StringBuilder sb = new StringBuilder();
         for (int i = 0; i < times.length; i++) {
@@ -352,7 +336,7 @@ public class StoreStatsService extends ServiceThread {
 
             if (this.getTimesFoundList.size() > time) {
                 CallSnapshot lastBefore =
-                        
this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1));
+                    this.getTimesFoundList.get(this.getTimesFoundList.size() - 
(time + 1));
                 result += CallSnapshot.getTPS(lastBefore, last);
             }
         } finally {
@@ -370,7 +354,7 @@ public class StoreStatsService extends ServiceThread {
 
             if (this.getTimesMissList.size() > time) {
                 CallSnapshot lastBefore =
-                        this.getTimesMissList.get(this.getTimesMissList.size() 
- (time + 1));
+                    this.getTimesMissList.get(this.getTimesMissList.size() - 
(time + 1));
                 result += CallSnapshot.getTPS(lastBefore, last);
             }
 
@@ -391,7 +375,7 @@ public class StoreStatsService extends ServiceThread {
 
                 if (this.getTimesFoundList.size() > time) {
                     CallSnapshot lastBefore =
-                            
this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1));
+                        
this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1));
                     found = CallSnapshot.getTPS(lastBefore, last);
                 }
             }
@@ -400,7 +384,7 @@ public class StoreStatsService extends ServiceThread {
 
                 if (this.getTimesMissList.size() > time) {
                     CallSnapshot lastBefore =
-                            
this.getTimesMissList.get(this.getTimesMissList.size() - (time + 1));
+                        this.getTimesMissList.get(this.getTimesMissList.size() 
- (time + 1));
                     miss = CallSnapshot.getTPS(lastBefore, last);
                 }
             }
@@ -420,7 +404,7 @@ public class StoreStatsService extends ServiceThread {
 
             if (this.transferedMsgCountList.size() > time) {
                 CallSnapshot lastBefore =
-                        
this.transferedMsgCountList.get(this.transferedMsgCountList.size() - (time + 
1));
+                    
this.transferedMsgCountList.get(this.transferedMsgCountList.size() - (time + 
1));
                 result += CallSnapshot.getTPS(lastBefore, last);
             }
 
@@ -445,9 +429,9 @@ public class StoreStatsService extends ServiceThread {
         result.put("putMessageTimesTotal", String.valueOf(totalTimes));
         result.put("putMessageSizeTotal", 
String.valueOf(this.getPutMessageSizeTotal()));
         result.put("putMessageDistributeTime",
-                
String.valueOf(this.getPutMessageDistributeTimeStringInfo(totalTimes)));
+            
String.valueOf(this.getPutMessageDistributeTimeStringInfo(totalTimes)));
         result.put("putMessageAverageSize",
-                String.valueOf(this.getPutMessageSizeTotal() / 
totalTimes.doubleValue()));
+            String.valueOf(this.getPutMessageSizeTotal() / 
totalTimes.doubleValue()));
         result.put("dispatchMaxBuffer", 
String.valueOf(this.dispatchMaxBuffer));
         result.put("getMessageEntireTimeMax", 
String.valueOf(this.getMessageEntireTimeMax));
         result.put("putTps", String.valueOf(this.getPutTps()));
@@ -491,19 +475,19 @@ public class StoreStatsService extends ServiceThread {
             }
 
             this.getTimesFoundList.add(new 
CallSnapshot(System.currentTimeMillis(),
-                    this.getMessageTimesTotalFound.get()));
+                this.getMessageTimesTotalFound.get()));
             if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) 
{
                 this.getTimesFoundList.removeFirst();
             }
 
             this.getTimesMissList.add(new 
CallSnapshot(System.currentTimeMillis(),
-                    this.getMessageTimesTotalMiss.get()));
+                this.getMessageTimesTotalMiss.get()));
             if (this.getTimesMissList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
                 this.getTimesMissList.removeFirst();
             }
 
             this.transferedMsgCountList.add(new 
CallSnapshot(System.currentTimeMillis(),
-                    this.getMessageTransferedMsgCount.get()));
+                this.getMessageTransferedMsgCount.get()));
             if (this.transferedMsgCountList.size() > (MAX_RECORDS_OF_SAMPLING 
+ 1)) {
                 this.transferedMsgCountList.removeFirst();
             }
@@ -518,14 +502,15 @@ public class StoreStatsService extends ServiceThread {
             this.lastPrintTimestamp = System.currentTimeMillis();
 
             log.info("[STORETPS] put_tps {} get_found_tps {} get_miss_tps {} 
get_transfered_tps {}",
-                    this.getPutTps(printTPSInterval),
-                    this.getGetFoundTps(printTPSInterval),
-                    this.getGetMissTps(printTPSInterval),
-                    this.getGetTransferedTps(printTPSInterval)
+                this.getPutTps(printTPSInterval),
+                this.getGetFoundTps(printTPSInterval),
+                this.getGetMissTps(printTPSInterval),
+                this.getGetTransferedTps(printTPSInterval)
             );
 
             final AtomicLong[] times = this.initPutMessageDistributeTime();
-            if (null == times) return;
+            if (null == times)
+                return;
 
             final StringBuilder sb = new StringBuilder();
             long totalPut = 0;
@@ -544,22 +529,18 @@ public class StoreStatsService extends ServiceThread {
         return getMessageTimesTotalFound;
     }
 
-
     public AtomicLong getGetMessageTimesTotalMiss() {
         return getMessageTimesTotalMiss;
     }
 
-
     public AtomicLong getGetMessageTransferedMsgCount() {
         return getMessageTransferedMsgCount;
     }
 
-
     public AtomicLong getPutMessageFailedTimes() {
         return putMessageFailedTimes;
     }
 
-
     public AtomicLong getSinglePutMessageTopicSizeTotal(String topic) {
         AtomicLong rs = putMessageTopicSizeTotal.get(topic);
         if (null == rs) {
@@ -569,7 +550,6 @@ public class StoreStatsService extends ServiceThread {
         return rs;
     }
 
-
     public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) {
         AtomicLong rs = putMessageTopicTimesTotal.get(topic);
         if (null == rs) {
@@ -579,12 +559,10 @@ public class StoreStatsService extends ServiceThread {
         return rs;
     }
 
-
     public Map<String, AtomicLong> getPutMessageTopicTimesTotal() {
         return putMessageTopicTimesTotal;
     }
 
-
     public Map<String, AtomicLong> getPutMessageTopicSizeTotal() {
         return putMessageTopicSizeTotal;
     }
@@ -593,13 +571,11 @@ public class StoreStatsService extends ServiceThread {
         public final long timestamp;
         public final long callTimesTotal;
 
-
         public CallSnapshot(long timestamp, long callTimesTotal) {
             this.timestamp = timestamp;
             this.callTimesTotal = callTimesTotal;
         }
 
-
         public static double getTPS(final CallSnapshot begin, final 
CallSnapshot end) {
             long total = end.callTimesTotal - begin.callTimesTotal;
             Long time = end.timestamp - begin.timestamp;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java 
b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
index d44720c..9cd7145 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
@@ -6,33 +6,31 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 
-
 /**
  *
  */
 public class StoreUtil {
     public static final long TOTAL_PHYSICAL_MEMORY_SIZE = 
getTotalPhysicalMemorySize();
 
-
     @SuppressWarnings("restriction")
     public static long getTotalPhysicalMemorySize() {
         long physicalTotal = 1024 * 1024 * 1024 * 24;
         OperatingSystemMXBean osmxb = 
ManagementFactory.getOperatingSystemMXBean();
         if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {
-            physicalTotal = ((com.sun.management.OperatingSystemMXBean) 
osmxb).getTotalPhysicalMemorySize();
+            physicalTotal = 
((com.sun.management.OperatingSystemMXBean)osmxb).getTotalPhysicalMemorySize();
         }
 
         return physicalTotal;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java 
b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
index bd5b629..3e332ba 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
@@ -16,19 +16,18 @@
  */
 package org.apache.rocketmq.store;
 
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.util.LibC;
-import com.sun.jna.NativeLong;
-import com.sun.jna.Pointer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.nio.ch.DirectBuffer;
 
-import java.nio.ByteBuffer;
-import java.util.Deque;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
 public class TransientStorePool {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
@@ -51,7 +50,7 @@ public class TransientStorePool {
         for (int i = 0; i < poolSize; i++) {
             ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
 
-            final long address = ((DirectBuffer) byteBuffer).address();
+            final long address = ((DirectBuffer)byteBuffer).address();
             Pointer pointer = new Pointer(address);
             LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
 
@@ -61,7 +60,7 @@ public class TransientStorePool {
 
     public void destroy() {
         for (ByteBuffer byteBuffer : availableBuffers) {
-            final long address = ((DirectBuffer) byteBuffer).address();
+            final long address = ((DirectBuffer)byteBuffer).address();
             Pointer pointer = new Pointer(address);
             LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java 
b/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java
index 6ebb723..bebacc9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/BrokerRole.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.config;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java 
b/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java
index 02b0ced..6a52a67 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/FlushDiskType.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.config;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 205adec..d1e2bfe 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -6,22 +6,20 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.config;
 
+import java.io.File;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.store.ConsumeQueue;
 
-import java.io.File;
-
-
 public class MessageStoreConfig {
     //The root directory in which the log data is kept
     @ImportantField
@@ -30,7 +28,7 @@ public class MessageStoreConfig {
     //The directory in which the commitlog is kept
     @ImportantField
     private String storePathCommitLog = System.getProperty("user.home") + 
File.separator + "store"
-            + File.separator + "commitlog";
+        + File.separator + "commitlog";
 
     // CommitLog file size,default is 1G
     private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
@@ -133,7 +131,7 @@ public class MessageStoreConfig {
     private boolean transientStorePoolEnable = false;
     private int transientStorePoolSize = 5;
     private boolean fastFailIfNoBufferInStorePool = false;
-    
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -170,119 +168,96 @@ public class MessageStoreConfig {
         return warmMapedFileEnable;
     }
 
-
     public void setWarmMapedFileEnable(boolean warmMapedFileEnable) {
         this.warmMapedFileEnable = warmMapedFileEnable;
     }
 
-
     public int getMapedFileSizeCommitLog() {
         return mapedFileSizeCommitLog;
     }
 
-
     public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) {
         this.mapedFileSizeCommitLog = mapedFileSizeCommitLog;
     }
 
-
     public int getMapedFileSizeConsumeQueue() {
 
-        int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / 
(ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
-        return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
+        int factor = (int)Math.ceil(this.mapedFileSizeConsumeQueue / 
(ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
+        return (int)(factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
     }
 
-
     public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) {
         this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue;
     }
 
-
     public int getFlushIntervalCommitLog() {
         return flushIntervalCommitLog;
     }
 
-
     public void setFlushIntervalCommitLog(int flushIntervalCommitLog) {
         this.flushIntervalCommitLog = flushIntervalCommitLog;
     }
 
-
     public int getFlushIntervalConsumeQueue() {
         return flushIntervalConsumeQueue;
     }
 
-
     public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) {
         this.flushIntervalConsumeQueue = flushIntervalConsumeQueue;
     }
 
-
     public int getPutMsgIndexHightWater() {
         return putMsgIndexHightWater;
     }
 
-
     public void setPutMsgIndexHightWater(int putMsgIndexHightWater) {
         this.putMsgIndexHightWater = putMsgIndexHightWater;
     }
 
-
     public int getCleanResourceInterval() {
         return cleanResourceInterval;
     }
 
-
     public void setCleanResourceInterval(int cleanResourceInterval) {
         this.cleanResourceInterval = cleanResourceInterval;
     }
 
-
     public int getMaxMessageSize() {
         return maxMessageSize;
     }
 
-
     public void setMaxMessageSize(int maxMessageSize) {
         this.maxMessageSize = maxMessageSize;
     }
 
-
     public boolean isCheckCRCOnRecover() {
         return checkCRCOnRecover;
     }
 
-
     public boolean getCheckCRCOnRecover() {
         return checkCRCOnRecover;
     }
 
-
     public void setCheckCRCOnRecover(boolean checkCRCOnRecover) {
         this.checkCRCOnRecover = checkCRCOnRecover;
     }
 
-
     public String getStorePathCommitLog() {
         return storePathCommitLog;
     }
 
-
     public void setStorePathCommitLog(String storePathCommitLog) {
         this.storePathCommitLog = storePathCommitLog;
     }
 
-
     public String getDeleteWhen() {
         return deleteWhen;
     }
 
-
     public void setDeleteWhen(String deleteWhen) {
         this.deleteWhen = deleteWhen;
     }
 
-
     public int getDiskMaxUsedSpaceRatio() {
         if (this.diskMaxUsedSpaceRatio < 10)
             return 10;
@@ -293,361 +268,294 @@ public class MessageStoreConfig {
         return diskMaxUsedSpaceRatio;
     }
 
-
     public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) {
         this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio;
     }
 
-
     public int getDeleteCommitLogFilesInterval() {
         return deleteCommitLogFilesInterval;
     }
 
-
     public void setDeleteCommitLogFilesInterval(int 
deleteCommitLogFilesInterval) {
         this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval;
     }
 
-
     public int getDeleteConsumeQueueFilesInterval() {
         return deleteConsumeQueueFilesInterval;
     }
 
-
     public void setDeleteConsumeQueueFilesInterval(int 
deleteConsumeQueueFilesInterval) {
         this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval;
     }
 
-
     public int getMaxTransferBytesOnMessageInMemory() {
         return maxTransferBytesOnMessageInMemory;
     }
 
-
     public void setMaxTransferBytesOnMessageInMemory(int 
maxTransferBytesOnMessageInMemory) {
         this.maxTransferBytesOnMessageInMemory = 
maxTransferBytesOnMessageInMemory;
     }
 
-
     public int getMaxTransferCountOnMessageInMemory() {
         return maxTransferCountOnMessageInMemory;
     }
 
-
     public void setMaxTransferCountOnMessageInMemory(int 
maxTransferCountOnMessageInMemory) {
         this.maxTransferCountOnMessageInMemory = 
maxTransferCountOnMessageInMemory;
     }
 
-
     public int getMaxTransferBytesOnMessageInDisk() {
         return maxTransferBytesOnMessageInDisk;
     }
 
-
     public void setMaxTransferBytesOnMessageInDisk(int 
maxTransferBytesOnMessageInDisk) {
         this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk;
     }
 
-
     public int getMaxTransferCountOnMessageInDisk() {
         return maxTransferCountOnMessageInDisk;
     }
 
-
     public void setMaxTransferCountOnMessageInDisk(int 
maxTransferCountOnMessageInDisk) {
         this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk;
     }
 
-
     public int getFlushCommitLogLeastPages() {
         return flushCommitLogLeastPages;
     }
 
-
     public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) {
         this.flushCommitLogLeastPages = flushCommitLogLeastPages;
     }
 
-
     public int getFlushConsumeQueueLeastPages() {
         return flushConsumeQueueLeastPages;
     }
 
-
     public void setFlushConsumeQueueLeastPages(int 
flushConsumeQueueLeastPages) {
         this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages;
     }
 
-
     public int getFlushCommitLogThoroughInterval() {
         return flushCommitLogThoroughInterval;
     }
 
-
     public void setFlushCommitLogThoroughInterval(int 
flushCommitLogThoroughInterval) {
         this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval;
     }
 
-
     public int getFlushConsumeQueueThoroughInterval() {
         return flushConsumeQueueThoroughInterval;
     }
 
-
     public void setFlushConsumeQueueThoroughInterval(int 
flushConsumeQueueThoroughInterval) {
         this.flushConsumeQueueThoroughInterval = 
flushConsumeQueueThoroughInterval;
     }
 
-
     public int getDestroyMapedFileIntervalForcibly() {
         return destroyMapedFileIntervalForcibly;
     }
 
-
     public void setDestroyMapedFileIntervalForcibly(int 
destroyMapedFileIntervalForcibly) {
         this.destroyMapedFileIntervalForcibly = 
destroyMapedFileIntervalForcibly;
     }
 
-
     public int getFileReservedTime() {
         return fileReservedTime;
     }
 
-
     public void setFileReservedTime(int fileReservedTime) {
         this.fileReservedTime = fileReservedTime;
     }
 
-
     public int getRedeleteHangedFileInterval() {
         return redeleteHangedFileInterval;
     }
 
-
     public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) {
         this.redeleteHangedFileInterval = redeleteHangedFileInterval;
     }
 
-
     public int getAccessMessageInMemoryMaxRatio() {
         return accessMessageInMemoryMaxRatio;
     }
 
-
     public void setAccessMessageInMemoryMaxRatio(int 
accessMessageInMemoryMaxRatio) {
         this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio;
     }
 
-
     public boolean isMessageIndexEnable() {
         return messageIndexEnable;
     }
 
-
     public void setMessageIndexEnable(boolean messageIndexEnable) {
         this.messageIndexEnable = messageIndexEnable;
     }
 
-
     public int getMaxHashSlotNum() {
         return maxHashSlotNum;
     }
 
-
     public void setMaxHashSlotNum(int maxHashSlotNum) {
         this.maxHashSlotNum = maxHashSlotNum;
     }
 
-
     public int getMaxIndexNum() {
         return maxIndexNum;
     }
 
-
     public void setMaxIndexNum(int maxIndexNum) {
         this.maxIndexNum = maxIndexNum;
     }
 
-
     public int getMaxMsgsNumBatch() {
         return maxMsgsNumBatch;
     }
 
-
     public void setMaxMsgsNumBatch(int maxMsgsNumBatch) {
         this.maxMsgsNumBatch = maxMsgsNumBatch;
     }
 
-
     public int getHaListenPort() {
         return haListenPort;
     }
 
-
     public void setHaListenPort(int haListenPort) {
         this.haListenPort = haListenPort;
     }
 
-
     public int getHaSendHeartbeatInterval() {
         return haSendHeartbeatInterval;
     }
 
-
     public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) {
         this.haSendHeartbeatInterval = haSendHeartbeatInterval;
     }
 
-
     public int getHaHousekeepingInterval() {
         return haHousekeepingInterval;
     }
 
-
     public void setHaHousekeepingInterval(int haHousekeepingInterval) {
         this.haHousekeepingInterval = haHousekeepingInterval;
     }
 
-
     public BrokerRole getBrokerRole() {
         return brokerRole;
     }
 
-    public void setBrokerRole(BrokerRole brokerRole) {
-        this.brokerRole = brokerRole;
-    }
-
     public void setBrokerRole(String brokerRole) {
         this.brokerRole = BrokerRole.valueOf(brokerRole);
     }
 
+    public void setBrokerRole(BrokerRole brokerRole) {
+        this.brokerRole = brokerRole;
+    }
+
     public int getHaTransferBatchSize() {
         return haTransferBatchSize;
     }
 
-
     public void setHaTransferBatchSize(int haTransferBatchSize) {
         this.haTransferBatchSize = haTransferBatchSize;
     }
 
-
     public int getHaSlaveFallbehindMax() {
         return haSlaveFallbehindMax;
     }
 
-
     public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) {
         this.haSlaveFallbehindMax = haSlaveFallbehindMax;
     }
 
-
     public FlushDiskType getFlushDiskType() {
         return flushDiskType;
     }
 
-    public void setFlushDiskType(FlushDiskType flushDiskType) {
-        this.flushDiskType = flushDiskType;
-    }
-
     public void setFlushDiskType(String type) {
         this.flushDiskType = FlushDiskType.valueOf(type);
     }
 
+    public void setFlushDiskType(FlushDiskType flushDiskType) {
+        this.flushDiskType = flushDiskType;
+    }
+
     public int getSyncFlushTimeout() {
         return syncFlushTimeout;
     }
 
-
     public void setSyncFlushTimeout(int syncFlushTimeout) {
         this.syncFlushTimeout = syncFlushTimeout;
     }
 
-
     public String getHaMasterAddress() {
         return haMasterAddress;
     }
 
-
     public void setHaMasterAddress(String haMasterAddress) {
         this.haMasterAddress = haMasterAddress;
     }
 
-
     public String getMessageDelayLevel() {
         return messageDelayLevel;
     }
 
-
     public void setMessageDelayLevel(String messageDelayLevel) {
         this.messageDelayLevel = messageDelayLevel;
     }
 
-
     public long getFlushDelayOffsetInterval() {
         return flushDelayOffsetInterval;
     }
 
-
     public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) {
         this.flushDelayOffsetInterval = flushDelayOffsetInterval;
     }
 
-
     public boolean isCleanFileForciblyEnable() {
         return cleanFileForciblyEnable;
     }
 
-
     public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) {
         this.cleanFileForciblyEnable = cleanFileForciblyEnable;
     }
 
-
     public boolean isMessageIndexSafe() {
         return messageIndexSafe;
     }
 
-
     public void setMessageIndexSafe(boolean messageIndexSafe) {
         this.messageIndexSafe = messageIndexSafe;
     }
 
-
     public boolean isFlushCommitLogTimed() {
         return flushCommitLogTimed;
     }
 
-
     public void setFlushCommitLogTimed(boolean flushCommitLogTimed) {
         this.flushCommitLogTimed = flushCommitLogTimed;
     }
 
-
     public String getStorePathRootDir() {
         return storePathRootDir;
     }
 
-
     public void setStorePathRootDir(String storePathRootDir) {
         this.storePathRootDir = storePathRootDir;
     }
 
-
     public int getFlushLeastPagesWhenWarmMapedFile() {
         return flushLeastPagesWhenWarmMapedFile;
     }
 
-
     public void setFlushLeastPagesWhenWarmMapedFile(int 
flushLeastPagesWhenWarmMapedFile) {
         this.flushLeastPagesWhenWarmMapedFile = 
flushLeastPagesWhenWarmMapedFile;
     }
 
-
     public boolean isOffsetCheckInSlave() {
         return offsetCheckInSlave;
     }
 
-
     public void setOffsetCheckInSlave(boolean offsetCheckInSlave) {
         this.offsetCheckInSlave = offsetCheckInSlave;
     }
@@ -666,7 +574,7 @@ public class MessageStoreConfig {
      */
     public boolean isTransientStorePoolEnable() {
         return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == 
getFlushDiskType()
-                && BrokerRole.SLAVE != getBrokerRole();
+            && BrokerRole.SLAVE != getBrokerRole();
     }
 
     public void setTransientStorePoolEnable(final boolean 
transientStorePoolEnable) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
 
b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
index 8796436..d1cd7de 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
@@ -6,51 +6,44 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.config;
 
 import java.io.File;
 
-
 public class StorePathConfigHelper {
 
     public static String getStorePathConsumeQueue(final String rootDir) {
         return rootDir + File.separator + "consumequeue";
     }
 
-
     public static String getStorePathIndex(final String rootDir) {
         return rootDir + File.separator + "index";
     }
 
-
     public static String getStoreCheckpoint(final String rootDir) {
         return rootDir + File.separator + "checkpoint";
     }
 
-
     public static String getAbortFile(final String rootDir) {
         return rootDir + File.separator + "abort";
     }
 
-
     public static String getDelayOffsetStorePath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + 
"delayOffset.json";
     }
 
-
     public static String getTranStateTableStorePath(final String rootDir) {
         return rootDir + File.separator + "transaction" + File.separator + 
"statetable";
     }
 
-
     public static String getTranRedoLogStorePath(final String rootDir) {
         return rootDir + File.separator + "transaction" + File.separator + 
"redolog";
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
index e8965d3..a601e81 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
@@ -6,16 +6,21 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.store.ha;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -23,13 +28,6 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-
-
 public class HAConnection {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private final HAService haService;
@@ -41,7 +39,6 @@ public class HAConnection {
     private volatile long slaveRequestOffset = -1;
     private volatile long slaveAckOffset = -1;
 
-
     public HAConnection(final HAService haService, final SocketChannel 
socketChannel) throws IOException {
         this.haService = haService;
         this.socketChannel = socketChannel;
@@ -56,20 +53,17 @@ public class HAConnection {
         this.haService.getConnectionCount().incrementAndGet();
     }
 
-
     public void start() {
         this.readSocketService.start();
         this.writeSocketService.start();
     }
 
-
     public void shutdown() {
         this.writeSocketService.shutdown(true);
         this.readSocketService.shutdown(true);
         this.close();
     }
 
-
     public void close() {
         if (this.socketChannel != null) {
             try {
@@ -80,7 +74,6 @@ public class HAConnection {
         }
     }
 
-
     public SocketChannel getSocketChannel() {
         return socketChannel;
     }
@@ -97,7 +90,6 @@ public class HAConnection {
         private int processPostion = 0;
         private volatile long lastReadTimestamp = System.currentTimeMillis();
 
-
         public ReadSocketService(final SocketChannel socketChannel) throws 
IOException {
             this.selector = RemotingUtil.openSelector();
             this.socketChannel = socketChannel;
@@ -105,7 +97,6 @@ public class HAConnection {
             this.thread.setDaemon(true);
         }
 
-
         @Override
         public void run() {
             HAConnection.log.info(this.getServiceName() + " service started");
@@ -119,7 +110,6 @@ public class HAConnection {
                         break;
                     }
 
-
                     long interval = 
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - 
this.lastReadTimestamp;
                     if (interval > 
HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval())
 {
                         log.warn("ha housekeeping, found this connection[" + 
HAConnection.this.clientAddr + "] expired, " + interval);
@@ -135,10 +125,8 @@ public class HAConnection {
 
             writeSocketService.makeStop();
 
-
             haService.removeConnection(HAConnection.this);
 
-
             HAConnection.this.haService.getConnectionCount().decrementAndGet();
 
             SelectionKey sk = this.socketChannel.keyFor(this.selector);
@@ -180,14 +168,12 @@ public class HAConnection {
                             long readOffset = this.byteBufferRead.getLong(pos 
- 8);
                             this.processPostion = pos;
 
-
                             HAConnection.this.slaveAckOffset = readOffset;
                             if (HAConnection.this.slaveRequestOffset < 0) {
                                 HAConnection.this.slaveRequestOffset = 
readOffset;
                                 log.info("slave[" + 
HAConnection.this.clientAddr + "] request offset " + readOffset);
                             }
 
-
                             
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                         }
                     } else if (readSize == 0) {
@@ -223,7 +209,6 @@ public class HAConnection {
         private boolean lastWriteOver = true;
         private long lastWriteTimestamp = System.currentTimeMillis();
 
-
         public WriteSocketService(final SocketChannel socketChannel) throws 
IOException {
             this.selector = RemotingUtil.openSelector();
             this.socketChannel = socketChannel;
@@ -231,7 +216,6 @@ public class HAConnection {
             this.thread.setDaemon(true);
         }
 
-
         @Override
         public void run() {
             HAConnection.log.info(this.getServiceName() + " service started");
@@ -245,15 +229,13 @@ public class HAConnection {
                         continue;
                     }
 
-
-
                     if (-1 == this.nextTransferFromWhere) {
                         if (0 == HAConnection.this.slaveRequestOffset) {
                             long masterOffset = 
HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                             masterOffset =
-                                    masterOffset
-                                            - (masterOffset % 
HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
-                                            .getMapedFileSizeCommitLog());
+                                masterOffset
+                                    - (masterOffset % 
HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
+                                    .getMapedFileSizeCommitLog());
 
                             if (masterOffset < 0) {
                                 masterOffset = 0;
@@ -265,16 +247,16 @@ public class HAConnection {
                         }
 
                         log.info("master transfer data from " + 
this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
-                                + "], and slave request " + 
HAConnection.this.slaveRequestOffset);
+                            + "], and slave request " + 
HAConnection.this.slaveRequestOffset);
                     }
 
                     if (this.lastWriteOver) {
 
                         long interval =
-                                
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - 
this.lastWriteTimestamp;
+                            
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - 
this.lastWriteTimestamp;
 
                         if (interval > 
HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
-                                .getHaSendHeartbeatInterval()) {
+                            .getHaSendHeartbeatInterval()) {
 
                             // Build Header
                             this.byteBufferHeader.position(0);
@@ -287,16 +269,14 @@ public class HAConnection {
                             if (!this.lastWriteOver)
                                 continue;
                         }
-                    }
-
-                    else {
+                    } else {
                         this.lastWriteOver = this.transferData();
                         if (!this.lastWriteOver)
                             continue;
                     }
 
                     SelectMappedBufferResult selectResult =
-                            
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+                        
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                     if (selectResult != null) {
                         int size = selectResult.getSize();
                         if (size > 
HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize())
 {
@@ -328,7 +308,6 @@ public class HAConnection {
                 }
             }
 
-
             if (this.selectMappedBufferResult != null) {
                 this.selectMappedBufferResult.release();
             }
@@ -337,7 +316,6 @@ public class HAConnection {
 
             readSocketService.makeStop();
 
-
             haService.removeConnection(HAConnection.this);
 
             SelectionKey sk = this.socketChannel.keyFor(this.selector);
@@ -355,7 +333,6 @@ public class HAConnection {
             HAConnection.log.info(this.getServiceName() + " service end");
         }
 
-
         /**
 
          */
@@ -409,13 +386,11 @@ public class HAConnection {
             return result;
         }
 
-
         @Override
         public String getServiceName() {
             return WriteSocketService.class.getSimpleName();
         }
 
-
         @Override
         public void shutdown() {
             super.shutdown();

Reply via email to