http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java index 125fbd3..bdb02c6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java @@ -6,27 +6,27 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.protocol; public enum LanguageCode { - JAVA((byte) 0), - CPP((byte) 1), - DOTNET((byte) 2), - PYTHON((byte) 3), - DELPHI((byte) 4), - ERLANG((byte) 5), - RUBY((byte) 6), - OTHER((byte) 7), - HTTP((byte) 8); + JAVA((byte)0), + CPP((byte)1), + DOTNET((byte)2), + PYTHON((byte)3), + DELPHI((byte)4), + ERLANG((byte)5), + RUBY((byte)6), + OTHER((byte)7), + HTTP((byte)8); private byte code;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 60dd498..6b253dc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -6,24 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.protocol; import com.alibaba.fastjson.annotation.JSONField; -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.annotation.CFNotNull; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -31,22 +24,26 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RemotingCommand { public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type"; public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE"; + public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version"; private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND private static final int RPC_ONEWAY = 1; // 0, RPC - private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP = - new HashMap<Class<? extends CommandCustomHeader>, Field[]>(); + new HashMap<Class<? extends CommandCustomHeader>, Field[]>(); private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>(); + // 1, Oneway // 1, RESPONSE_COMMAND private static final Map<Field, Annotation> NOT_NULL_ANNOTATION_CACHE = new HashMap<Field, Annotation>(); - // 1, Oneway - private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName(); private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName(); private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName(); @@ -56,7 +53,6 @@ public class RemotingCommand { private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName(); private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName(); - public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version"; private static volatile int configVersion = -1; private static AtomicInteger requestId = new AtomicInteger(0); @@ -93,7 +89,6 @@ public class RemotingCommand { */ private transient byte[] body; - protected RemotingCommand() { } @@ -148,11 +143,6 @@ public class RemotingCommand { return cmd; } - public void markResponseType() { - int bits = 1 << RPC_TYPE; - this.flag |= bits; - } - public static RemotingCommand createResponseCommand(int code, String remark) { return createResponseCommand(code, remark, null); } @@ -205,7 +195,7 @@ public class RemotingCommand { } public static SerializeType getProtocolType(int source) { - return SerializeType.valueOf((byte) ((source >> 24) & 0xFF)); + return SerializeType.valueOf((byte)((source >> 24) & 0xFF)); } public static int createNewRequestId() { @@ -229,6 +219,21 @@ public class RemotingCommand { return true; } + public static byte[] markProtocolType(int source, SerializeType type) { + byte[] result = new byte[4]; + + result[0] = type.getCode(); + result[1] = (byte)((source >> 16) & 0xFF); + result[2] = (byte)((source >> 8) & 0xFF); + result[3] = (byte)(source & 0xFF); + return result; + } + + public void markResponseType() { + int bits = 1 << RPC_TYPE; + this.flag |= bits; + } + public CommandCustomHeader readCustomHeader() { return customHeader; } @@ -376,16 +381,6 @@ public class RemotingCommand { } } - public static byte[] markProtocolType(int source, SerializeType type) { - byte[] result = new byte[4]; - - result[0] = type.getCode(); - result[1] = (byte) ((source >> 16) & 0xFF); - result[2] = (byte) ((source >> 8) & 0xFF); - result[3] = (byte) (source & 0xFF); - return result; - } - public void makeCustomHeaderToNet() { if (this.customHeader != null) { Field[] fields = getClazzFields(customHeader.getClass()); @@ -550,16 +545,14 @@ public class RemotingCommand { @Override public String toString() { return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)=" - + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC=" - + serializeTypeCurrentRPC + "]"; + + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC=" + + serializeTypeCurrentRPC + "]"; } - public SerializeType getSerializeTypeCurrentRPC() { return serializeTypeCurrentRPC; } - public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) { this.serializeTypeCurrentRPC = serializeTypeCurrentRPC; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java index 3adf06f..de4a5c9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.protocol; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java index e543ce1..8a5d76e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java @@ -6,21 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.protocol; import com.alibaba.fastjson.JSON; - import java.nio.charset.Charset; - /** * */ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java index e92bc49..f2836fe 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysResponseCode.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.protocol; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java index 6b0d825..64b37db 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.protocol; @@ -22,7 +22,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; - /** * */ @@ -52,11 +51,11 @@ public class RocketMQSerializable { // ################### content ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); // int code(~32767) - headerBuffer.putShort((short) cmd.getCode()); + headerBuffer.putShort((short)cmd.getCode()); // LanguageCode language headerBuffer.put(cmd.getLanguage().getCode()); // int version(~32767) - headerBuffer.putShort((short) cmd.getVersion()); + headerBuffer.putShort((short)cmd.getVersion()); // int opaque headerBuffer.putInt(cmd.getOpaque()); // int flag @@ -92,10 +91,10 @@ public class RocketMQSerializable { Map.Entry<String, String> entry = it.next(); if (entry.getKey() != null && entry.getValue() != null) { kvLength = - // keySize + Key - 2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length - // valSize + val - + 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length; + // keySize + Key + 2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length + // valSize + val + + 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length; totalLength += kvLength; } } @@ -110,7 +109,7 @@ public class RocketMQSerializable { key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8); val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8); - content.putShort((short) key.length); + content.putShort((short)key.length); content.put(key); content.putInt(val.length); @@ -124,18 +123,18 @@ public class RocketMQSerializable { private static int calTotalLen(int remark, int ext) { // int code(~32767) int length = 2 - // LanguageCode language - + 1 - // int version(~32767) - + 2 - // int opaque - + 4 - // int flag - + 4 - // String remark - + 4 + remark - // HashMap<String, String> extFields - + 4 + ext; + // LanguageCode language + + 1 + // int version(~32767) + + 2 + // int opaque + + 4 + // int flag + + 4 + // String remark + + 4 + remark + // HashMap<String, String> extFields + + 4 + ext; return length; } @@ -192,12 +191,11 @@ public class RocketMQSerializable { byteBuffer.get(valContent); map.put(new String(keyContent, RemotingSerializable.CHARSET_UTF8), new String(valContent, - RemotingSerializable.CHARSET_UTF8)); + RemotingSerializable.CHARSET_UTF8)); } return map; } - public static boolean isBlank(String str) { int strLen; if (str == null || (strLen = str.length()) == 0) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java index cebd48f..6bfd42c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java @@ -6,20 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.remoting.protocol; public enum SerializeType { - JSON((byte) 0), - ROCKETMQ((byte) 1); + JSON((byte)0), + ROCKETMQ((byte)1); private byte code; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java ---------------------------------------------------------------------- diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java index 3b73e46..984ecd1 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: MixTest.java 1831 2013-05-16 01:39:51Z vintagew...@apache.org $ */ /** @@ -22,7 +24,6 @@ package org.apache.rocketmq.remoting; import org.junit.Test; - public class MixTest { @Test public void test_extFieldsValue() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java ---------------------------------------------------------------------- diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java index 15330bc..15a9aa3 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java @@ -6,13 +6,15 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z vintagew...@apache.org $ */ /** @@ -20,47 +22,32 @@ */ package org.apache.rocketmq.remoting; +import io.netty.channel.ChannelHandlerContext; +import java.util.concurrent.Executors; import org.apache.rocketmq.remoting.annotation.CFNullable; -import org.apache.rocketmq.remoting.exception.*; -import org.apache.rocketmq.remoting.netty.*; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; import org.junit.Test; -import java.util.concurrent.Executors; - import static org.junit.Assert.assertTrue; - public class NettyRPCTest { - @Test - public void test_RPC_Sync() throws InterruptedException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException { - RemotingServer server = createRemotingServer(); - RemotingClient client = createRemotingClient(); - - for (int i = 0; i < 100; i++) { - TestRequestHeader requestHeader = new TestRequestHeader(); - requestHeader.setCount(i); - requestHeader.setMessageTitle("HelloMessageTitle"); - RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader); - RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000); - System.out.println("invoke result = " + response); - assertTrue(response != null); - } - - client.shutdown(); - server.shutdown(); - System.out.println("-----------------------------------------------------------------"); - } - public static RemotingServer createRemotingServer() throws InterruptedException { NettyServerConfig config = new NettyServerConfig(); RemotingServer remotingServer = new NettyRemotingServer(config); remotingServer.registerProcessor(0, new NettyRequestProcessor() { private int i = 0; - @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { System.out.println("processRequest=" + request + " " + (i++)); @@ -85,8 +72,29 @@ public class NettyRPCTest { } @Test + public void test_RPC_Sync() throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + RemotingServer server = createRemotingServer(); + RemotingClient client = createRemotingClient(); + + for (int i = 0; i < 100; i++) { + TestRequestHeader requestHeader = new TestRequestHeader(); + requestHeader.setCount(i); + requestHeader.setMessageTitle("HelloMessageTitle"); + RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader); + RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000); + System.out.println("invoke result = " + response); + assertTrue(response != null); + } + + client.shutdown(); + server.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + + @Test public void test_RPC_Oneway() throws InterruptedException, RemotingConnectException, - RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { + RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { RemotingServer server = createRemotingServer(); RemotingClient client = createRemotingClient(); @@ -101,10 +109,9 @@ public class NettyRPCTest { System.out.println("-----------------------------------------------------------------"); } - @Test public void test_RPC_Async() throws InterruptedException, RemotingConnectException, - RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { + RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { RemotingServer server = createRemotingServer(); RemotingClient client = createRemotingClient(); @@ -126,10 +133,9 @@ public class NettyRPCTest { System.out.println("-----------------------------------------------------------------"); } - @Test public void test_server_call_client() throws InterruptedException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException { + RemotingSendRequestException, RemotingTimeoutException { final RemotingServer server = createRemotingServer(); final RemotingClient client = createRemotingClient(); @@ -183,7 +189,6 @@ public class NettyRPCTest { } - class TestRequestHeader implements CommandCustomHeader { @CFNullable private Integer count; @@ -191,33 +196,27 @@ class TestRequestHeader implements CommandCustomHeader { @CFNullable private String messageTitle; - @Override public void checkFields() throws RemotingCommandException { } - public Integer getCount() { return count; } - public void setCount(Integer count) { this.count = count; } - public String getMessageTitle() { return messageTitle; } - public void setMessageTitle(String messageTitle) { this.messageTitle = messageTitle; } } - class TestResponseHeader implements CommandCustomHeader { @CFNullable private Integer count; @@ -246,5 +245,4 @@ class TestResponseHeader implements CommandCustomHeader { this.messageTitle = messageTitle; } - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java ---------------------------------------------------------------------- diff --git a/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java b/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java index 3c27697..16aa9b1 100644 --- a/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java +++ b/remoting/src/test/java/org/apache/rocketmq/subclass/TestSubClassAuto.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ /** @@ -22,7 +22,6 @@ package org.apache.rocketmq.subclass; import org.junit.Test; - public class TestSubClassAuto { @Test public void test_sub() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/srvutil/pom.xml ---------------------------------------------------------------------- diff --git a/srvutil/pom.xml b/srvutil/pom.xml index 0873ee8..af884cb 100644 --- a/srvutil/pom.xml +++ b/srvutil/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java ---------------------------------------------------------------------- diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java index 0a25dd1..1774150 100644 --- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java +++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java @@ -6,19 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.srvutil; -import org.apache.commons.cli.*; - import java.util.Properties; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; public class ServerUtil { @@ -28,17 +32,16 @@ public class ServerUtil { options.addOption(opt); opt = - new Option("n", "namesrvAddr", true, - "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876"); + new Option("n", "namesrvAddr", true, + "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876"); opt.setRequired(false); options.addOption(opt); return options; } - public static CommandLine parseCmdLine(final String appName, String[] args, Options options, - CommandLineParser parser) { + CommandLineParser parser) { HelpFormatter hf = new HelpFormatter(); hf.setWidth(110); CommandLine commandLine = null; @@ -55,14 +58,12 @@ public class ServerUtil { return commandLine; } - public static void printCommandLineHelp(final String appName, final Options options) { HelpFormatter hf = new HelpFormatter(); hf.setWidth(110); hf.printHelp(appName, options, true); } - public static Properties commandLine2Properties(final CommandLine commandLine) { Properties properties = new Properties(); Option[] opts = commandLine.getOptions(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/pom.xml ---------------------------------------------------------------------- diff --git a/store/pom.xml b/store/pom.xml index 10b13b9..29be589 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 94362ea..27b957f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -16,13 +16,6 @@ */ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.store.config.BrokerRole; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.util.ServiceLoader; @@ -30,33 +23,35 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; - +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.store.config.BrokerRole; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Create MappedFile in advance - * */ public class AllocateMappedFileService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static int waitTimeOut = 1000 * 5; private ConcurrentHashMap<String, AllocateRequest> requestTable = - new ConcurrentHashMap<String, AllocateRequest>(); + new ConcurrentHashMap<String, AllocateRequest>(); private PriorityBlockingQueue<AllocateRequest> requestQueue = - new PriorityBlockingQueue<AllocateRequest>(); + new PriorityBlockingQueue<AllocateRequest>(); private volatile boolean hasException = false; private DefaultMessageStore messageStore; - public AllocateMappedFileService(DefaultMessageStore messageStore) { this.messageStore = messageStore; } - public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) { int canSubmitRequests = 2; if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() - && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool + && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size(); } } @@ -67,7 +62,7 @@ public class AllocateMappedFileService extends ServiceThread { if (nextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + - "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs()); + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs()); this.requestTable.remove(nextFilePath); return null; } @@ -83,7 +78,7 @@ public class AllocateMappedFileService extends ServiceThread { if (nextNextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + - "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs()); + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs()); this.requestTable.remove(nextNextFilePath); } else { boolean offerOK = this.requestQueue.offer(nextNextReq); @@ -119,13 +114,11 @@ public class AllocateMappedFileService extends ServiceThread { return null; } - @Override public String getServiceName() { return AllocateMappedFileService.class.getSimpleName(); } - public void shutdown() { this.stopped = true; this.thread.interrupt(); @@ -144,7 +137,6 @@ public class AllocateMappedFileService extends ServiceThread { } } - public void run() { log.info(this.getServiceName() + " service started"); @@ -154,7 +146,6 @@ public class AllocateMappedFileService extends ServiceThread { log.info(this.getServiceName() + " service end"); } - /** * Only interrupted by the external thread, will return false */ @@ -166,12 +157,12 @@ public class AllocateMappedFileService extends ServiceThread { AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath()); if (null == expectedRequest) { log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " " - + req.getFileSize()); + + req.getFileSize()); return true; } if (expectedRequest != req) { log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " " - + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest); + + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest); return true; } @@ -195,16 +186,16 @@ public class AllocateMappedFileService extends ServiceThread { if (eclipseTime > 10) { int queueSize = this.requestQueue.size(); log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize - + " " + req.getFilePath() + " " + req.getFileSize()); + + " " + req.getFilePath() + " " + req.getFileSize()); } // pre write mappedFile if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig() - .getMapedFileSizeCommitLog() - && - this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + .getMapedFileSizeCommitLog() + && + this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(), - this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); + this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); } req.setMappedFile(mappedFile); @@ -239,53 +230,43 @@ public class AllocateMappedFileService extends ServiceThread { private CountDownLatch countDownLatch = new CountDownLatch(1); private volatile MappedFile mappedFile = null; - public AllocateRequest(String filePath, int fileSize) { this.filePath = filePath; this.fileSize = fileSize; } - public String getFilePath() { return filePath; } - public void setFilePath(String filePath) { this.filePath = filePath; } - public int getFileSize() { return fileSize; } - public void setFileSize(int fileSize) { this.fileSize = fileSize; } - public CountDownLatch getCountDownLatch() { return countDownLatch; } - public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } - public MappedFile getMappedFile() { return mappedFile; } - public void setMappedFile(MappedFile mappedFile) { this.mappedFile = mappedFile; } - public int compareTo(AllocateRequest other) { if (this.fileSize < other.fileSize) return 1; @@ -308,7 +289,6 @@ public class AllocateMappedFileService extends ServiceThread { // other.fileSize ? -1 : 0; } - @Override public int hashCode() { final int prime = 31; @@ -318,7 +298,6 @@ public class AllocateMappedFileService extends ServiceThread { return result; } - @Override public boolean equals(Object obj) { if (this == obj) @@ -327,7 +306,7 @@ public class AllocateMappedFileService extends ServiceThread { return false; if (getClass() != obj.getClass()) return false; - AllocateRequest other = (AllocateRequest) obj; + AllocateRequest other = (AllocateRequest)obj; if (filePath == null) { if (other.filePath != null) return false; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java index 0e8678c..6d158d3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; import java.nio.ByteBuffer; - /** * Write messages callback interface * @@ -36,5 +35,5 @@ public interface AppendMessageCallback { * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBrokerInner msg); + final int maxBlank, final MessageExtBrokerInner msg); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java index 8541208..965097f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; @@ -40,7 +40,7 @@ public class AppendMessageResult { } public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId, - long storeTimestamp, long logicsOffset, long pagecacheRT) { + long storeTimestamp, long logicsOffset, long pagecacheRT) { this.status = status; this.wroteOffset = wroteOffset; this.wroteBytes = wroteBytes; @@ -62,62 +62,50 @@ public class AppendMessageResult { return this.status == AppendMessageStatus.PUT_OK; } - public AppendMessageStatus getStatus() { return status; } - public void setStatus(AppendMessageStatus status) { this.status = status; } - public long getWroteOffset() { return wroteOffset; } - public void setWroteOffset(long wroteOffset) { this.wroteOffset = wroteOffset; } - public int getWroteBytes() { return wroteBytes; } - public void setWroteBytes(int wroteBytes) { this.wroteBytes = wroteBytes; } - public String getMsgId() { return msgId; } - public void setMsgId(String msgId) { this.msgId = msgId; } - public long getStoreTimestamp() { return storeTimestamp; } - public void setStoreTimestamp(long storeTimestamp) { this.storeTimestamp = storeTimestamp; } - public long getLogicsOffset() { return logicsOffset; } - public void setLogicsOffset(long logicsOffset) { this.logicsOffset = logicsOffset; } @@ -125,13 +113,13 @@ public class AppendMessageResult { @Override public String toString() { return "AppendMessageResult{" + - "status=" + status + - ", wroteOffset=" + wroteOffset + - ", wroteBytes=" + wroteBytes + - ", msgId='" + msgId + '\'' + - ", storeTimestamp=" + storeTimestamp + - ", logicsOffset=" + logicsOffset + - ", pagecacheRT=" + pagecacheRT + - '}'; + "status=" + status + + ", wroteOffset=" + wroteOffset + + ", wroteBytes=" + wroteBytes + + ", msgId='" + msgId + '\'' + + ", storeTimestamp=" + storeTimestamp + + ", logicsOffset=" + logicsOffset + + ", pagecacheRT=" + pagecacheRT + + '}'; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java index 34f70b1..39cf9fa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index ddd6be3..17625f4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -16,6 +16,15 @@ */ package org.apache.rocketmq.store; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -31,20 +40,8 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; - - /** * Store all metadata downtime for recovery, data protection reliability - * */ public class CommitLog { // Message's MAGIC CODE daa320a7 @@ -72,7 +69,7 @@ public class CommitLog { public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), - defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); + defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -126,17 +123,15 @@ public class CommitLog { return this.mappedFileQueue.remainHowManyDataToFlush(); } - public int deleteExpiredFile(// - final long expiredTime, // - final int deleteFilesInterval, // - final long intervalForcibly, // - final boolean cleanImmediately// + final long expiredTime, // + final int deleteFilesInterval, // + final long intervalForcibly, // + final boolean cleanImmediately// ) { return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); } - /** * Read CommitLog data, use data replication */ @@ -144,12 +139,11 @@ public class CommitLog { return this.getData(offset, offset == 0); } - public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound); if (mappedFile != null) { - int pos = (int) (offset % mappedFileSize); + int pos = (int)(offset % mappedFileSize); SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); return result; } @@ -157,7 +151,6 @@ public class CommitLog { return null; } - /** * When the normal exit, data recovery, all memory data have been flush */ @@ -227,8 +220,7 @@ public class CommitLog { /** * check the message and returns the message size * - * @return 0 Come the end of the file // >0 Normal messages // -1 Message - * checksum failure + * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure */ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { try { @@ -340,7 +332,7 @@ public class CommitLog { if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, - storeTimestamp); + storeTimestamp); } } } @@ -354,23 +346,23 @@ public class CommitLog { doNothingForDeadCode(byteBuffer1); doNothingForDeadCode(byteBuffer2); log.error( - "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}", - totalSize, readLength, bodyLen, topicLen, propertiesLength); + "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}", + totalSize, readLength, bodyLen, topicLen, propertiesLength); return new DispatchRequest(totalSize, false/* success */); } return new DispatchRequest(// - topic, // 1 - queueId, // 2 - physicOffset, // 3 - totalSize, // 4 - tagsCode, // 5 - storeTimestamp, // 6 - queueOffset, // 7 - keys, // 8 - uniqKey, //9 - sysFlag, // 9 - preparedTransactionOffset// 10 + topic, // 1 + queueId, // 2 + physicOffset, // 3 + totalSize, // 4 + tagsCode, // 5 + storeTimestamp, // 6 + queueOffset, // 7 + keys, // 8 + uniqKey, //9 + sysFlag, // 9 + preparedTransactionOffset// 10 ); } catch (Exception e) { } @@ -380,24 +372,24 @@ public class CommitLog { private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { final int msgLen = 4 // 1 TOTALSIZE - + 4 // 2 MAGICCODE - + 4 // 3 BODYCRC - + 4 // 4 QUEUEID - + 4 // 5 FLAG - + 8 // 6 QUEUEOFFSET - + 8 // 7 PHYSICALOFFSET - + 4 // 8 SYSFLAG - + 8 // 9 BORNTIMESTAMP - + 8 // 10 BORNHOST - + 8 // 11 STORETIMESTAMP - + 8 // 12 STOREHOSTADDRESS - + 4 // 13 RECONSUMETIMES - + 8 // 14 Prepared Transaction Offset - + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY - + 1 + topicLength // 15 TOPIC - + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16 - // propertiesLength - + 0; + + 4 // 2 MAGICCODE + + 4 // 3 BODYCRC + + 4 // 4 QUEUEID + + 4 // 5 FLAG + + 8 // 6 QUEUEOFFSET + + 8 // 7 PHYSICALOFFSET + + 4 // 8 SYSFLAG + + 8 // 9 BORNTIMESTAMP + + 8 // 10 BORNHOST + + 8 // 11 STORETIMESTAMP + + 8 // 12 STOREHOSTADDRESS + + 4 // 13 RECONSUMETIMES + + 8 // 14 Prepared Transaction Offset + + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY + + 1 + topicLength // 15 TOPIC + + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16 + // propertiesLength + + 0; return msgLen; } @@ -441,7 +433,6 @@ public class CommitLog { if (size > 0) { mappedFileOffset += size; - if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { this.defaultMessageStore.doDispatch(dispatchRequest); @@ -505,18 +496,18 @@ public class CommitLog { } if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()// - && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { + && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { log.info("find check timestamp, {} {}", // - storeTimestamp, // - UtilAll.timeMillisToHumanString(storeTimestamp)); + storeTimestamp, // + UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } } else { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { log.info("find check timestamp, {} {}", // - storeTimestamp, // - UtilAll.timeMillisToHumanString(storeTimestamp)); + storeTimestamp, // + UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } } @@ -552,7 +543,7 @@ public class CommitLog { final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// - || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { @@ -636,7 +627,6 @@ public class CommitLog { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } - PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics @@ -647,14 +637,14 @@ public class CommitLog { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { - final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; + final GroupCommitService service = (GroupCommitService)this.flushCommitLogService; if (msg.isWaitStoreMsgOK()) { request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() - + " client address: " + msg.getBornHostString()); + + " client address: " + msg.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { @@ -684,11 +674,11 @@ public class CommitLog { service.getWaitNotifyObject().wakeupAll(); boolean flushOK = - // TODO - request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); + // TODO + request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " - + msg.getTags() + " client address: " + msg.getBornHostString()); + + msg.getTags() + " client address: " + msg.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } @@ -739,7 +729,7 @@ public class CommitLog { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { - int pos = (int) (offset % mappedFileSize); + int pos = (int)(offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); } return null; @@ -754,17 +744,14 @@ public class CommitLog { return topicQueueTable; } - public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) { this.topicQueueTable = topicQueueTable; } - public void destroy() { this.mappedFileQueue.destroy(); } - public boolean appendData(long startOffset, byte[] data) { lockForPutMessage(); //spin... try { @@ -780,7 +767,6 @@ public class CommitLog { } } - public boolean retryDeleteFirstFile(final long intervalForcibly) { return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly); } @@ -798,6 +784,72 @@ public class CommitLog { mappedFileQueue.checkSelf(); } + public long lockTimeMills() { + long diff = 0; + long begin = this.beginTimeInLock; + if (begin > 0) { + diff = this.defaultMessageStore.now() - begin; + } + + if (diff < 0) { + diff = 0; + } + + return diff; + } + + /** + * Spin util acquired the lock. + */ + private void lockForPutMessage() { + if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { + putMessageNormalLock.lock(); + } else { + boolean flag; + do { + flag = this.putMessageSpinLock.compareAndSet(true, false); + } + while (!flag); + } + } + + private void releasePutMessageLock() { + if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { + putMessageNormalLock.unlock(); + } else { + this.putMessageSpinLock.compareAndSet(false, true); + } + } + + public static class GroupCommitRequest { + private final long nextOffset; + private final CountDownLatch countDownLatch = new CountDownLatch(1); + private volatile boolean flushOK = false; + + public GroupCommitRequest(long nextOffset) { + this.nextOffset = nextOffset; + } + + public long getNextOffset() { + return nextOffset; + } + + public void wakeupCustomer(final boolean flushOK) { + this.flushOK = flushOK; + this.countDownLatch.countDown(); + } + + public boolean waitForFlush(long timeout) { + try { + this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); + return this.flushOK; + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + } + } + abstract class FlushCommitLogService extends ServiceThread { protected static final int RETRY_TIMES_OVER = 10; } @@ -820,7 +872,7 @@ public class CommitLog { int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval = - CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); + CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { @@ -859,7 +911,6 @@ public class CommitLog { private long lastFlushTimestamp = 0; private long printTimes = 0; - public void run() { CommitLog.log.info(this.getServiceName() + " service started"); @@ -870,7 +921,7 @@ public class CommitLog { int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = - CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); + CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; @@ -921,58 +972,22 @@ public class CommitLog { CommitLog.log.info(this.getServiceName() + " service end"); } - @Override public String getServiceName() { return FlushRealTimeService.class.getSimpleName(); } - private void printFlushProgress() { // CommitLog.log.info("how much disk fall behind memory, " // + CommitLog.this.mappedFileQueue.howMuchFallBehind()); } - @Override public long getJointime() { return 1000 * 60 * 5; } } - public static class GroupCommitRequest { - private final long nextOffset; - private final CountDownLatch countDownLatch = new CountDownLatch(1); - private volatile boolean flushOK = false; - - - public GroupCommitRequest(long nextOffset) { - this.nextOffset = nextOffset; - } - - - public long getNextOffset() { - return nextOffset; - } - - - public void wakeupCustomer(final boolean flushOK) { - this.flushOK = flushOK; - this.countDownLatch.countDown(); - } - - - public boolean waitForFlush(long timeout) { - try { - this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - return this.flushOK; - } catch (InterruptedException e) { - e.printStackTrace(); - return false; - } - } - } - /** * GroupCommit Service */ @@ -980,7 +995,6 @@ public class CommitLog { private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); - public void putRequest(final GroupCommitRequest request) { synchronized (this) { this.requestsWrite.add(request); @@ -990,14 +1004,12 @@ public class CommitLog { } } - private void swapRequests() { List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } - private void doCommit() { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { @@ -1028,7 +1040,6 @@ public class CommitLog { } } - public void run() { CommitLog.log.info(this.getServiceName() + " service started"); @@ -1058,19 +1069,16 @@ public class CommitLog { CommitLog.log.info(this.getServiceName() + " service end"); } - @Override protected void onWaitEnd() { this.swapRequests(); } - @Override public String getServiceName() { return GroupCommitService.class.getSimpleName(); } - @Override public long getJointime() { return 1000 * 60 * 5; @@ -1090,19 +1098,16 @@ public class CommitLog { private final ByteBuffer hostHolder = ByteBuffer.allocate(8); - DefaultAppendMessageCallback(final int size) { this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH); this.maxMessageSize = size; } - public ByteBuffer getMsgStoreItemMemory() { return msgStoreItemMemory; } - public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> @@ -1143,9 +1148,9 @@ public class CommitLog { * Serialize message */ final byte[] propertiesData = - msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length; + final short propertiesLength = propertiesData == null ? 0 : (short)propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); @@ -1162,7 +1167,7 @@ public class CommitLog { // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength - + ", maxMessageSize: " + this.maxMessageSize); + + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } @@ -1180,7 +1185,7 @@ public class CommitLog { final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), - queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // Initialization of storage space @@ -1221,7 +1226,7 @@ public class CommitLog { if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC - this.msgStoreItemMemory.put((byte) topicLength); + this.msgStoreItemMemory.put((byte)topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort(propertiesLength); @@ -1233,7 +1238,7 @@ public class CommitLog { byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, - msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: @@ -1250,46 +1255,9 @@ public class CommitLog { return result; } - private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); } } - - public long lockTimeMills() { - long diff = 0; - long begin = this.beginTimeInLock; - if (begin > 0) { - diff = this.defaultMessageStore.now() - begin; - } - - if (diff < 0) { - diff = 0; - } - - return diff; - } - - /** - * Spin util acquired the lock. - */ - private void lockForPutMessage() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.lock(); - } else { - boolean flag; - do { - flag = this.putMessageSpinLock.compareAndSet(true, false); - } while (!flag); - } - } - - private void releasePutMessageLock() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.unlock(); - } else { - this.putMessageSpinLock.compareAndSet(false, true); - } - } }